updated get top risks prompt
This commit is contained in:
@@ -13,14 +13,17 @@ def create_document_for_organization(confirmation_email):
|
||||
is_incomplete = False
|
||||
|
||||
organization = get_object_or_404(Organization, email=confirmation_email)
|
||||
top_risk_ids = get_top_risk(organization)
|
||||
top_risks_with_explanation = get_top_risk(organization)
|
||||
top_risk_ids = [r['risk_id'] for r in top_risks_with_explanation]
|
||||
if len(top_risk_ids) != 10:
|
||||
is_incomplete = True
|
||||
top_risk_ids = get_top_risk(organization)
|
||||
top_risks_with_explanation = get_top_risk(organization)
|
||||
top_risk_ids = [r['risk_id'] for r in top_risks_with_explanation]
|
||||
top_risks = Risk.objects.filter(risk_id__in=top_risk_ids)
|
||||
organization.risks.set(top_risks)
|
||||
|
||||
explanation_map = {r['risk_id']: r['explanation'] for r in top_risks_with_explanation}
|
||||
document = Document.objects.create(organization=organization)
|
||||
document.risk_explanations = explanation_map
|
||||
document.add_segment('h1', "Top 10 Risks Identified")
|
||||
|
||||
risk_content = "\n\n".join([
|
||||
@@ -32,6 +35,7 @@ def create_document_for_organization(confirmation_email):
|
||||
f"Detection Difficulty: {risk.detection_difficulty} \n"
|
||||
f"Recovery Complexity: {risk.recovery_complexity} \n"
|
||||
f"Business Impact Severity: {risk.businnes_impact_severity}\n"
|
||||
f"Explanation: {explanation_map.get(risk.risk_id, '')}\n"
|
||||
for risk in top_risks
|
||||
])
|
||||
document.add_segment('body', f"Identified Risks: \n\n{risk_content}")
|
||||
|
||||
@@ -16,7 +16,6 @@ class CeleryTaskTests(TestCase):
|
||||
compliance_frameworks=["Ab", "Ba"],
|
||||
industry_sector="Technology",
|
||||
it_dependency=8,
|
||||
data_sensitivity="High",
|
||||
network_infrastructure="Cloud-based",
|
||||
remote_workforce_percentage="50%",
|
||||
third_party_vendor_access="10-20",
|
||||
@@ -36,7 +35,11 @@ class CeleryTaskTests(TestCase):
|
||||
@patch("backend.accounts.tasks.get_controls_for_risk")
|
||||
@patch("backend.accounts.tasks.send_payment_email")
|
||||
def test_create_document_for_organization(self, mock_send_payment_email, mock_get_controls_for_risk, mock_get_top_risk):
|
||||
mock_get_top_risk.return_value = [self.risk.risk_id]
|
||||
mock_get_top_risk.return_value = [{
|
||||
"risk_id": int(self.risk.risk_id),
|
||||
"risk_name": self.risk.risk_name,
|
||||
"explanation": "Test explanation"
|
||||
}]
|
||||
mock_get_controls_for_risk.return_value = [(self.control.id, 5, 7)]
|
||||
create_document_for_organization(self.organization.email)
|
||||
|
||||
|
||||
@@ -21,7 +21,6 @@ class EmailTests(TestCase):
|
||||
compliance_frameworks=["Ab", "Ba"],
|
||||
industry_sector="Technology",
|
||||
it_dependency=8,
|
||||
data_sensitivity="High",
|
||||
network_infrastructure="Cloud-based",
|
||||
remote_workforce_percentage="50%",
|
||||
third_party_vendor_access="10-20",
|
||||
|
||||
@@ -5,7 +5,7 @@ from django.utils.html import format_html
|
||||
from .utils import generate_demo_code, get_top_risk, get_controls_for_risk, generate_key_findings, generate_recommendations
|
||||
from .tables import get_risk_table
|
||||
from django.shortcuts import render, redirect
|
||||
from .forms import GenerateCodesForm
|
||||
from .forms import GenerateCodesForm, RiskExplanationWidget
|
||||
from django.conf import settings
|
||||
from backend.accounts.utils import send_document_email
|
||||
from django import forms
|
||||
@@ -13,7 +13,7 @@ from django.contrib.admin.widgets import FilteredSelectMultiple
|
||||
import logging
|
||||
from django.contrib import messages
|
||||
from django.db import transaction
|
||||
|
||||
import re
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -45,10 +45,16 @@ class DocumentAdminForm(forms.ModelForm):
|
||||
widget=FilteredSelectMultiple(verbose_name="Risks", is_stacked=False),
|
||||
help_text="Edit the AI-selected risks for this organization."
|
||||
)
|
||||
risk_explanations = forms.Field(
|
||||
required=False,
|
||||
widget=RiskExplanationWidget,
|
||||
help_text="Edit explanations for each risk.",
|
||||
label=''
|
||||
)
|
||||
|
||||
class Meta:
|
||||
model = Document
|
||||
fields = ['organization', 'status', 'key_findings', 'recomendations']
|
||||
fields = ['organization', 'risk_explanations', 'status', 'key_findings', 'recomendations']
|
||||
|
||||
class Media:
|
||||
css = { 'all': ('admin/css/widgets.css',) }
|
||||
@@ -62,6 +68,22 @@ class DocumentAdminForm(forms.ModelForm):
|
||||
if self.instance and getattr(self.instance, 'organization_id', None):
|
||||
self.fields['organization_risks'].initial = self.instance.organization.risks.all()
|
||||
|
||||
def clean_risk_explanations(self):
|
||||
data = self.data
|
||||
explanations = {}
|
||||
for key in data:
|
||||
if key.startswith('risk_explanations_risk_') and not key.startswith('risk_explanations_risk_new_'):
|
||||
risk_id = data[key]
|
||||
explanation_key = f"risk_explanations_explanation_{risk_id}"
|
||||
explanation = data.get(explanation_key, "")
|
||||
if risk_id:
|
||||
explanations[str(risk_id)] = explanation
|
||||
for i in range(10):
|
||||
risk_id = data.get(f"risk_explanations_risk_new_{i}", "")
|
||||
explanation = data.get(f"risk_explanations_explanation_new_{i}", "")
|
||||
if risk_id:
|
||||
explanations[str(risk_id)] = explanation
|
||||
return explanations
|
||||
|
||||
class DocumentAdmin(admin.ModelAdmin):
|
||||
change_form_template = "admin/core/document/change_form.html"
|
||||
@@ -78,7 +100,7 @@ class DocumentAdmin(admin.ModelAdmin):
|
||||
|
||||
fieldsets = (
|
||||
('Organization & Risks', {
|
||||
'fields': ('organization', 'regen_note_action', 'regen_document_action', 'organization_risks', 'regen_top_risks_action')
|
||||
'fields': ('organization', 'regen_note_action', 'regen_document_action', 'organization_risks', 'risk_explanations', 'regen_top_risks_action')
|
||||
}),
|
||||
('Key Findings', {
|
||||
'fields': ('key_findings', 'regen_keyfindings_action')
|
||||
@@ -138,9 +160,21 @@ class DocumentAdmin(admin.ModelAdmin):
|
||||
def save_model(self, request, obj, form, change):
|
||||
super().save_model(request, obj, form, change)
|
||||
org_risks = form.cleaned_data.get('organization_risks')
|
||||
explanations = form.cleaned_data.get('risk_explanations', {})
|
||||
old_explanations = obj.risk_explanations or {}
|
||||
if org_risks is not None and obj.organization_id:
|
||||
obj.organization.risks.set(org_risks)
|
||||
|
||||
new_explanations = {}
|
||||
for risk in org_risks:
|
||||
key = str(risk.risk_id)
|
||||
new_explanations[key] = explanations.get(key, old_explanations.get(key, ""))
|
||||
obj.risk_explanations = new_explanations
|
||||
obj.save(update_fields=['risk_explanations'])
|
||||
else:
|
||||
if explanations:
|
||||
obj.risk_explanations = explanations
|
||||
obj.save(update_fields=['risk_explanations'])
|
||||
|
||||
def _apply_post_org_risks(self, request, obj):
|
||||
try:
|
||||
if 'organization_risks' in request.POST and obj.organization_id:
|
||||
@@ -159,7 +193,7 @@ class DocumentAdmin(admin.ModelAdmin):
|
||||
except Exception:
|
||||
logger.exception("Failed to clear segments for document %s", getattr(obj, 'pk', None))
|
||||
|
||||
def _risk_content(self, risks):
|
||||
def _risk_content(self, risks, explanation_map):
|
||||
return "\n\n".join([
|
||||
f"Risk: {risk.risk_id} - {risk.risk_name} \n"
|
||||
f"Category: {risk.category}\n"
|
||||
@@ -169,6 +203,7 @@ class DocumentAdmin(admin.ModelAdmin):
|
||||
f"Detection Difficulty: {risk.detection_difficulty} \n"
|
||||
f"Recovery Complexity: {risk.recovery_complexity} \n"
|
||||
f"Business Impact Severity: {risk.businnes_impact_severity}\n"
|
||||
f"Explanation: {explanation_map.get(risk.risk_id, '')}\n"
|
||||
for risk in risks
|
||||
])
|
||||
|
||||
@@ -177,7 +212,8 @@ class DocumentAdmin(admin.ModelAdmin):
|
||||
return
|
||||
self._clear_segments(obj, startswith=["Identified Risks"], exact=["Top 10 Risks Identified"])
|
||||
obj.add_segment('h1', "Top 10 Risks Identified")
|
||||
obj.add_segment('body', f"Identified Risks: \n\n{self._risk_content(risks)}")
|
||||
explanation_map = obj.risk_explanations or {}
|
||||
obj.add_segment('body', f"Identified Risks: \n\n{self._risk_content(risks, explanation_map)}")
|
||||
|
||||
def _clear_document_mappings(self, obj, clear_org_risks=True):
|
||||
try:
|
||||
@@ -209,9 +245,13 @@ class DocumentAdmin(admin.ModelAdmin):
|
||||
return ok
|
||||
|
||||
def _regen_top_risks(self, obj):
|
||||
top_risk_ids = get_top_risk(obj.organization)
|
||||
top_risks_with_explanation = get_top_risk(obj.organization)
|
||||
top_risk_ids = [r['risk_id'] for r in top_risks_with_explanation]
|
||||
top_risks = Risk.objects.filter(risk_id__in=top_risk_ids)
|
||||
explanation_map = {r['risk_id']: r['explanation'] for r in top_risks_with_explanation}
|
||||
obj.organization.risks.set(top_risks)
|
||||
obj.risk_explanations = explanation_map
|
||||
obj.save(update_fields=['risk_explanations', 'modified_at'])
|
||||
self._add_identified_risks(obj, top_risks)
|
||||
return True
|
||||
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
from django import forms
|
||||
from .models import Organization
|
||||
import json
|
||||
from django.utils.safestring import mark_safe
|
||||
|
||||
|
||||
class OrganizationForm(forms.ModelForm):
|
||||
class Meta:
|
||||
@@ -66,3 +69,36 @@ class ContactForm(forms.Form):
|
||||
name = forms.CharField(label="Name", max_length=100)
|
||||
email = forms.EmailField(label="Email")
|
||||
message = forms.CharField(label="Message", widget=forms.Textarea(attrs={"rows": 6}), max_length=5000)
|
||||
|
||||
|
||||
class RiskExplanationWidget(forms.Widget):
|
||||
def render(self, name, value, attrs=None, renderer=None):
|
||||
if isinstance(value, str):
|
||||
try:
|
||||
value = json.loads(value)
|
||||
except Exception:
|
||||
value = {}
|
||||
value = value or {}
|
||||
html = (
|
||||
'<table style="width:100%; margin:0; padding:0; border-collapse:collapse;">'
|
||||
'<tr>'
|
||||
'<th style="width:5%;">Risk ID</th>'
|
||||
'<th style="width:80%;">Explanation</th>'
|
||||
'</tr>'
|
||||
)
|
||||
for risk_id, explanation in value.items():
|
||||
html += (
|
||||
f'<tr>'
|
||||
f'<td style="width:5%;"><input type="number" name="{name}_risk_{risk_id}" value="{risk_id}" readonly style="width:100%;"></td>'
|
||||
f'<td style="width:80%;"><input type="text" name="{name}_explanation_{risk_id}" value="{explanation}" style="width:99%;"></td>'
|
||||
f'</tr>'
|
||||
)
|
||||
for i in range(10 - len(value)):
|
||||
html += (
|
||||
f'<tr>'
|
||||
f'<td style="width:5%;"><input type="number" name="{name}_risk_new_{i}" value="" style="width:100%;"></td>'
|
||||
f'<td style="width:80%;"><input type="text" name="{name}_explanation_new_{i}" value="" style="width:99%;"></td>'
|
||||
f'</tr>'
|
||||
)
|
||||
html += '</table>'
|
||||
return mark_safe(html)
|
||||
18
backend/core/migrations/0028_document_risk_explanations.py
Normal file
18
backend/core/migrations/0028_document_risk_explanations.py
Normal file
@@ -0,0 +1,18 @@
|
||||
# Generated by Django 5.1.3 on 2025-09-25 13:50
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('core', '0027_remove_organization_data_sensitivity_and_more'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name='document',
|
||||
name='risk_explanations',
|
||||
field=models.JSONField(blank=True, default=dict, help_text='Map of risk_id to explanation for each risk in this document.'),
|
||||
),
|
||||
]
|
||||
@@ -112,6 +112,7 @@ class Document(models.Model):
|
||||
(STATUS_INCOMPLETE, 'Incomplete'),
|
||||
)
|
||||
status = models.CharField(max_length=16, choices=STATUS_CHOICES, default=STATUS_WAITING)
|
||||
risk_explanations = models.JSONField(default=dict, blank=True, help_text="Map of risk_id to explanation for each risk in this document.")
|
||||
|
||||
key_findings = models.TextField(blank=True, null=True, help_text="Key findings")
|
||||
recomendations = models.TextField(blank=True, null=True, help_text="Recommendations")
|
||||
|
||||
@@ -27,7 +27,7 @@
|
||||
.form-row.field-regen_recommendations_action label { display: none; }
|
||||
.form-row.field-regen_top_risks_action label { display: none; }
|
||||
.form-row.field-regen_controls_action { display: none !important; }
|
||||
|
||||
.form-row label:empty { display: none; !important; }
|
||||
.ai-callout {
|
||||
margin: 8px 0 0;
|
||||
padding: 10px 12px;
|
||||
|
||||
0
backend/core/templatetags/__init__.py
Normal file
0
backend/core/templatetags/__init__.py
Normal file
6
backend/core/templatetags/risk_explanations.py
Normal file
6
backend/core/templatetags/risk_explanations.py
Normal file
@@ -0,0 +1,6 @@
|
||||
from django import template
|
||||
register = template.Library()
|
||||
|
||||
@register.filter
|
||||
def dict_get(d, key):
|
||||
return d.get(str(key), "")
|
||||
@@ -22,7 +22,6 @@ class UtilsTests(TestCase):
|
||||
compliance_frameworks=["Ab", "Ba"],
|
||||
industry_sector="Technology",
|
||||
it_dependency=8,
|
||||
data_sensitivity="High",
|
||||
network_infrastructure="Cloud-based",
|
||||
remote_workforce_percentage="50%",
|
||||
third_party_vendor_access="10-20",
|
||||
@@ -65,11 +64,16 @@ class UtilsTests(TestCase):
|
||||
mock_client = MagicMock()
|
||||
mock_openai.return_value = mock_client
|
||||
mock_response = MagicMock()
|
||||
mock_response.choices[0].message.content = "1,2,3"
|
||||
mock_response.choices[0].message.content = (
|
||||
"1. **Risk ID 1 (Privacy Regulation Violation)**: Critical because the company's operations are governed by NIS2 regulations, and any data breach could lead to severe financial penalties and reputational damage.\n"
|
||||
"2. **Risk ID 2 (Third Party Code Compromise)**: This risk is critical given the company's reliance on more than five third-party vendors, which increases the potential for system compromises and data breaches through external partnerships.\n"
|
||||
"3. **Risk ID 3 (Misconfigured Cloud Services)**: Critical due to the company's hybrid IT infrastructure, which may lead to increased data exposure if cloud services are not properly configured, impacting compliance and customer trust.\n"
|
||||
)
|
||||
mock_client.chat.completions.create.return_value = mock_response
|
||||
|
||||
risks = get_top_risk(self.organization)
|
||||
self.assertEqual(risks, [1,2,3])
|
||||
top_risk_ids = [r['risk_id'] for r in risks]
|
||||
self.assertEqual(top_risk_ids, [1, 2, 3])
|
||||
|
||||
@patch('backend.core.utils.OpenAI')
|
||||
def test_get_controls_for_risk(self, mock_openai):
|
||||
|
||||
@@ -24,7 +24,6 @@ class DocumentViewTest(TestCase):
|
||||
compliance_frameworks=["Ab", "Ba"],
|
||||
industry_sector="Technology",
|
||||
it_dependency=8,
|
||||
data_sensitivity="High",
|
||||
network_infrastructure="Cloud-based",
|
||||
remote_workforce_percentage="50%",
|
||||
third_party_vendor_access="10-20",
|
||||
|
||||
@@ -13,7 +13,7 @@ from django.contrib.staticfiles.finders import find
|
||||
import matplotlib.image as mpimg
|
||||
site_domain = settings.SITE_DOMAIN
|
||||
import random
|
||||
|
||||
import re
|
||||
|
||||
|
||||
def extract_organization_details(organization):
|
||||
@@ -51,26 +51,84 @@ def get_top_risk(organization):
|
||||
organization_details = extract_organization_details(organization)
|
||||
|
||||
prompt = f"""
|
||||
You are an AI risk assessor. Based on the following company details and list of known risks,
|
||||
identify the 10 most critical risks for this company. Respond only with risk IDs.
|
||||
You are an expert cybersecurity risk analyst. Your task is to identify
|
||||
the top 10 most critical cybersecurity risks for a client based on
|
||||
their specific company profile and a comprehensive risk catalog. Your
|
||||
analysis must be logical, evidence-based, and directly tied to the
|
||||
client's details.
|
||||
|
||||
Company Details:
|
||||
{organization_details}
|
||||
Methodology:
|
||||
|
||||
List of Risks:
|
||||
{risk_list}
|
||||
Analyze the Company Profile: Carefully review all details provided
|
||||
about the company, including its industry, size (revenue and
|
||||
employees), IT dependency, regulatory requirements, and operational
|
||||
characteristics (e.g., remote work, third-party vendors, internal
|
||||
development).
|
||||
|
||||
Provide only the 10 most critical risk IDs in a simple comma-separated format, e.g "1,3,7,12,..."
|
||||
Evaluate the Risk Catalog: Review the provided list of known risks.
|
||||
|
||||
Map Profile to Risks: Correlate specific details from the company
|
||||
profile to the risks in the catalog. For example:
|
||||
|
||||
A company in the Financial sector subject to GDPR is highly
|
||||
susceptible to "Privacy Regulation Violation" (Risk ID 61).
|
||||
|
||||
A company with significant "Internal Software Development" is more
|
||||
vulnerable to "CI/CD Pipeline Compromise" (Risk ID 30) and "Source
|
||||
Code Exposure" (Risk ID 9).
|
||||
|
||||
High dependency on a "Cloud Provider" increases the criticality of
|
||||
"Cloud Provider Service Outage" (Risk ID 20).
|
||||
|
||||
Prioritize by Impact: Determine the most critical risks by assessing
|
||||
the potential impact (financial, operational, reputational, and
|
||||
regulatory) on this specific company. A risk is critical if it poses a
|
||||
severe threat to the company's core operations, data, or compliance
|
||||
standing.
|
||||
|
||||
Final Selection: Select the 10 risks with the highest criticality and
|
||||
provide a clear, concise justification for each choice.
|
||||
|
||||
Company Details:
|
||||
{organization_details}
|
||||
|
||||
List of Risks:
|
||||
{risk_list}
|
||||
|
||||
Required Output Format:
|
||||
|
||||
Provide your response as a numbered list from 1 to 10. For each item,
|
||||
include the Risk ID, the Risk Name, and a brief, one-sentence
|
||||
justification that links a specific company detail to why that risk is
|
||||
critical.
|
||||
|
||||
Example:
|
||||
|
||||
Risk ID 18 (Ransomware Infection): Critical due to the company's high
|
||||
IT dependency and the severe operational and financial impact a
|
||||
ransomware event would cause.
|
||||
|
||||
Risk ID 61 (Privacy Regulation Violation): Critical because the
|
||||
company operates under GDPR, making any breach of personal data a
|
||||
significant legal and financial liability.
|
||||
"""
|
||||
|
||||
response = client.chat.completions.create(
|
||||
model="gpt-4o-mini",
|
||||
messages=[{"role": "system", "content": prompt}]
|
||||
)
|
||||
|
||||
risk_ids = response.choices[0].message.content.strip().split(",")
|
||||
|
||||
return [int(risk_id) for risk_id in risk_ids if risk_id.isdigit()]
|
||||
content = response.choices[0].message.content.strip()
|
||||
matches = re.findall(
|
||||
r'Risk ID\s*(\d+)\s*\((.*?)\)\*\*:\s*(.+?)(?=\n\d+\.|\Z)', content, re.DOTALL
|
||||
)
|
||||
results = []
|
||||
for risk_id, risk_name, explanation in matches:
|
||||
results.append({
|
||||
"risk_id": int(risk_id),
|
||||
"risk_name": risk_name.strip(),
|
||||
"explanation": explanation.strip()
|
||||
})
|
||||
return results
|
||||
|
||||
def get_controls_for_risk(risk, organization):
|
||||
client = OpenAI(api_key=settings.OPENAI_API_KEY)
|
||||
|
||||
Reference in New Issue
Block a user