Added graph and table to document

This commit is contained in:
2025-03-27 23:57:31 +01:00
parent 5a7a89d93c
commit fc29671331
16 changed files with 687 additions and 65 deletions

View File

@@ -37,16 +37,17 @@ def create_document_for_organization(confirmation_email):
selected_controls = get_controls_for_risk(risk ,organization=organization)
for control_id, weight in selected_controls:
for control_id, weight, likelihood in selected_controls:
control = Control.objects.filter(id=control_id).first()
if control:
DocumentRiskControl.objects.create(
document=document,
risk=risk,
control=control,
weight=weight
weight=weight,
likelihood=likelihood
)
controls_content += f" - Control: {control.name} (Impact Weight: {weight}/10)\n"
controls_content += f" - Control: {control.name} (Impact Weight: {weight}/10) (Likelihood: {likelihood}/10)\n"
controls_content += "\n"

View File

View File

@@ -0,0 +1,56 @@
from unittest.mock import patch
from django.test import TestCase
from backend.core.models import Organization, Document, Risk, Control, DocumentRiskControl
from backend.accounts.tasks import create_document_for_organization
class CeleryTaskTests(TestCase):
def setUp(self):
self.organization = Organization.objects.create(
id=1,
name="Test Organization",
email="test@example.com",
employee_headcount="100-500",
annual_revenue="$1M-$10M",
critical_applications="5-10",
compliance_frameworks=["Ab", "Ba"],
industry_sector="Technology",
it_dependency=8,
data_sensitivity="High",
network_infrastructure="Cloud-based",
remote_workforce_percentage="50%",
third_party_vendor_access="10-20",
internal_software_development="Moderate",
geographic_scope="Global",
customer_base="Enterprise",
customer_type="B2B",
product_portfolio="Diverse",
supplier_base="International",
it_infrastructure=["Cloud", "On-Premise"],
intellectual_property=["Patents", "Trademarks"],
sensitive_data=["PII", "Financial Data"],
integration_level="Highly Integrated"
)
self.risk = Risk.objects.create(risk_id="1", risk_name="Test Risk", category="Category1", primary_impact="High")
self.control = Control.objects.create(name="Test Control")
@patch("backend.accounts.tasks.get_top_risk")
@patch("backend.accounts.tasks.get_controls_for_risk")
@patch("backend.accounts.tasks.send_payment_email")
def test_create_document_for_organization(self, mock_send_payment_email, mock_get_controls_for_risk, mock_get_top_risk):
mock_get_top_risk.return_value = [self.risk.risk_id]
mock_get_controls_for_risk.return_value = [(self.control.id, 5, 7)]
create_document_for_organization(self.organization.email)
document = Document.objects.first()
self.assertIsNotNone(document)
document_risk_control = DocumentRiskControl.objects.first()
self.assertIsNotNone(document_risk_control)
self.assertEqual(document_risk_control.document, document)
self.assertEqual(document_risk_control.risk, self.risk)
self.assertEqual(document_risk_control.control, self.control)
self.assertEqual(document_risk_control.weight, 5)
self.assertEqual(document_risk_control.likelihood, 7)
mock_send_payment_email.assert_called_once_with(self.organization.email)

View File

@@ -0,0 +1,66 @@
from django.test import TestCase
from django.core import mail
from unittest.mock import patch, MagicMock
from backend.core.models import Organization, Document
from backend.accounts.models import EmailConfirmation
from backend.core.utils import generate_first_page_image
from backend.accounts.utils import send_confirmation_email, send_payment_email, send_document_email
import uuid
from django.utils.timezone import now
class EmailTests(TestCase):
def setUp(self):
self.email = "test@example.com"
self.organization = Organization.objects.create(
id=1,
name="Test Organization",
email="test@example.com",
employee_headcount="100-500",
annual_revenue="$1M-$10M",
critical_applications="5-10",
compliance_frameworks=["Ab", "Ba"],
industry_sector="Technology",
it_dependency=8,
data_sensitivity="High",
network_infrastructure="Cloud-based",
remote_workforce_percentage="50%",
third_party_vendor_access="10-20",
internal_software_development="Moderate",
geographic_scope="Global",
customer_base="Enterprise",
customer_type="B2B",
product_portfolio="Diverse",
supplier_base="International",
it_infrastructure=["Cloud", "On-Premise"],
intellectual_property=["Patents", "Trademarks"],
sensitive_data=["PII", "Financial Data"],
integration_level="Highly Integrated"
)
self.document = Document.objects.create(organization=self.organization)
@patch("backend.accounts.utils.send_mail")
def test_send_confirmation_email(self, mock_send_mail):
confirmation = EmailConfirmation.objects.create(email=self.email, uuid=uuid.uuid4(), created_at=now())
send_confirmation_email(self.email)
confirmation.refresh_from_db()
self.assertIsNotNone(confirmation.uuid)
self.assertEqual(mock_send_mail.call_count, 1)
@patch("backend.accounts.utils.send_mail")
def test_send_payment_email(self, mock_send_mail):
send_payment_email(self.email)
self.assertEqual(mock_send_mail.call_count, 1)
@patch("backend.accounts.utils.EmailMultiAlternatives.send")
@patch("backend.accounts.utils.generate_first_page_image")
def test_send_document_email(self, mock_generate_image, mock_send):
mock_image_io = MagicMock()
mock_image_io.getvalue.return_value = b"fake image data"
mock_generate_image.return_value = mock_image_io
document_link = "https://example.com/document.pdf"
send_document_email(self.email, document_link, self.document)
mock_generate_image.assert_called_once_with(self.document)
mock_send.assert_called_once()

View File

@@ -0,0 +1,34 @@
import uuid
from django.test import TestCase
from django.urls import reverse
from unittest.mock import patch
from backend.accounts.models import EmailConfirmation
from backend.accounts.utils import send_confirmation_email
from django.utils.timezone import now, timedelta
class EmailConfirmationTests(TestCase):
def setUp(self):
"""Set up test data."""
self.valid_email = "test@example.com"
self.confirmation = EmailConfirmation.objects.create(
email=self.valid_email,
uuid=uuid.uuid4(),
created_at=now()
)
@patch("backend.accounts.views.create_document_for_organization.delay")
def test_confirm_email_valid(self, mock_task):
"""Test valid email confirmation."""
response = self.client.get(reverse("confirm_email", args=[self.confirmation.uuid]))
self.assertTemplateUsed(response, "accounts/confirmation_success.html")
self.assertContains(response, self.valid_email)
mock_task.assert_called_once_with(self.valid_email)
@patch("backend.accounts.views.send_confirmation_email")
def test_resend_confirmation(self, mock_send):
"""Test resending confirmation email."""
response = self.client.post(reverse("resend_confirmation", args=[self.valid_email]))
self.assertEqual(response.status_code, 200)
self.assertEqual(response.content.decode(), "Confirmation email resent")
mock_send.assert_called_once_with(self.valid_email)

View File

@@ -39,7 +39,7 @@ class ControlAdmin(admin.ModelAdmin):
list_display = ('id', 'name')
class DocumentRiskControlAdmin(admin.ModelAdmin):
list_display = ('document', 'risk', 'control', 'weight')
list_display = ('document', 'risk', 'control', 'weight','likelihood')
admin.site.register(Document, DocumentAdmin)

View File

@@ -0,0 +1,128 @@
# Generated by Django 5.1.3 on 2025-03-26 18:51
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('core', '0008_documentriskcontrol'),
]
operations = [
migrations.AddField(
model_name='documentriskcontrol',
name='likelihood',
field=models.IntegerField(blank=True, null=True),
),
migrations.AlterField(
model_name='organization',
name='annual_revenue',
field=models.CharField(help_text="What is your organization's annual revenue range?", max_length=20),
),
migrations.AlterField(
model_name='organization',
name='compliance_frameworks',
field=models.JSONField(help_text='Which regulatory frameworks is your organization required to comply with?'),
),
migrations.AlterField(
model_name='organization',
name='critical_applications',
field=models.CharField(help_text='How many critical business applications do your employees use daily?', max_length=20),
),
migrations.AlterField(
model_name='organization',
name='customer_base',
field=models.CharField(blank=True, help_text='How would you characterize your customer base distribution?', max_length=20, null=True),
),
migrations.AlterField(
model_name='organization',
name='customer_type',
field=models.CharField(blank=True, help_text='What is your primary customer type?', max_length=20, null=True),
),
migrations.AlterField(
model_name='organization',
name='data_sensitivity',
field=models.CharField(help_text='What level of sensitive data does your organization process?', max_length=20),
),
migrations.AlterField(
model_name='organization',
name='email',
field=models.EmailField(help_text='What is your email?', max_length=254, unique=True),
),
migrations.AlterField(
model_name='organization',
name='employee_headcount',
field=models.CharField(help_text="What is your organization's current employee headcount?", max_length=20),
),
migrations.AlterField(
model_name='organization',
name='geographic_scope',
field=models.CharField(blank=True, help_text="What is your organization's geographic operational scope?", max_length=20, null=True),
),
migrations.AlterField(
model_name='organization',
name='industry_sector',
field=models.CharField(help_text='What is your primary industry sector?', max_length=255),
),
migrations.AlterField(
model_name='organization',
name='integration_level',
field=models.CharField(blank=True, help_text='How integrated are your critical business systems?', max_length=20, null=True),
),
migrations.AlterField(
model_name='organization',
name='intellectual_property',
field=models.JSONField(blank=True, help_text='How does your organization protect and manage intellectual property?', null=True),
),
migrations.AlterField(
model_name='organization',
name='internal_software_development',
field=models.CharField(help_text='What is the extent of your internal software development activities?', max_length=20),
),
migrations.AlterField(
model_name='organization',
name='it_dependency',
field=models.IntegerField(help_text='On a scale from 1-10, how dependent is your business operations on technology?'),
),
migrations.AlterField(
model_name='organization',
name='it_infrastructure',
field=models.JSONField(blank=True, help_text='What is your primary IT infrastructure model?', null=True),
),
migrations.AlterField(
model_name='organization',
name='name',
field=models.CharField(help_text='What is the name of your organization?', max_length=255, unique=True),
),
migrations.AlterField(
model_name='organization',
name='network_infrastructure',
field=models.CharField(help_text="What best describes your organization's network infrastructure model?", max_length=20),
),
migrations.AlterField(
model_name='organization',
name='product_portfolio',
field=models.CharField(blank=True, help_text='How diversified is your product/service portfolio?', max_length=20, null=True),
),
migrations.AlterField(
model_name='organization',
name='remote_workforce_percentage',
field=models.CharField(help_text='What percentage of your workforce operates remotely?', max_length=20),
),
migrations.AlterField(
model_name='organization',
name='sensitive_data',
field=models.JSONField(blank=True, help_text='What type of sensitive data does your organization handle?', null=True),
),
migrations.AlterField(
model_name='organization',
name='supplier_base',
field=models.CharField(blank=True, help_text='What is your supplier base structure?', max_length=20, null=True),
),
migrations.AlterField(
model_name='organization',
name='third_party_vendor_access',
field=models.CharField(help_text='How many third-party vendors have access to your systems?', max_length=20),
),
]

View File

@@ -164,6 +164,7 @@ class DocumentRiskControl(models.Model):
risk = models.ForeignKey(Risk, on_delete=models.CASCADE)
control = models.ForeignKey(Control, on_delete=models.CASCADE)
weight = models.IntegerField()
likelihood = models.IntegerField(null=True, blank=True)
class Meta:
unique_together = ('document', 'risk', 'control')

Binary file not shown.

After

Width:  |  Height:  |  Size: 154 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 284 KiB

View File

@@ -1,3 +1,5 @@
{% load static %}
<div class="document-container">
{% if error %}
<p style="color: red;">{{ error }}</p>

View File

@@ -0,0 +1,131 @@
from django.test import TestCase
from unittest.mock import patch, MagicMock
from django.contrib.staticfiles.testing import StaticLiveServerTestCase
from io import BytesIO
import base64
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from backend.core.models import Organization, Risk, Control, Document
from backend.core.utils import *
class UtilsTests(TestCase):
def setUp(self):
self.organization = Organization.objects.create(
id=1,
name="Test Organization",
email="test@example.com",
employee_headcount="100-500",
annual_revenue="$1M-$10M",
critical_applications="5-10",
compliance_frameworks=["Ab", "Ba"],
industry_sector="Technology",
it_dependency=8,
data_sensitivity="High",
network_infrastructure="Cloud-based",
remote_workforce_percentage="50%",
third_party_vendor_access="10-20",
internal_software_development="Moderate",
geographic_scope="Global",
customer_base="Enterprise",
customer_type="B2B",
product_portfolio="Diverse",
supplier_base="International",
it_infrastructure=["Cloud", "On-Premise"],
intellectual_property=["Patents", "Trademarks"],
sensitive_data=["PII", "Financial Data"],
integration_level="Highly Integrated"
)
self.risk = Risk.objects.create(
risk_id=1,
risk_name="Test Risk",
category="Security",
primary_impact="Financial"
)
self.controls = [Control.objects.create(id=i, name=f"Control {i}") for i in range(1, 11)]
def test_extract_organization_details(self):
details = extract_organization_details(self.organization)
self.assertNotIn('name', details)
self.assertNotIn('email', details)
self.assertIn("What is your organization's current employee headcount?", details)
self.assertEqual(details["What is your organization's current employee headcount?"], "100-500")
@patch('backend.core.utils.OpenAI')
def test_get_top_risk(self, mock_openai):
mock_client = MagicMock()
mock_openai.return_value = mock_client
mock_response = MagicMock()
mock_response.choices[0].message.content = "1,2,3"
mock_client.chat.completions.create.return_value = mock_response
risks = get_top_risk(self.organization)
self.assertEqual(risks, [1,2,3])
@patch('backend.core.utils.OpenAI')
def test_get_controls_for_risk(self, mock_openai):
mock_client = MagicMock()
mock_openai.return_value = mock_client
mock_response = MagicMock()
control_lines = [f"{i} : 8 : 5" for i in range(1, 11)]
mock_response.choices[0].message.content = "\n".join(control_lines)
mock_client.chat.completions.create.return_value = mock_response
controls = get_controls_for_risk(self.risk, self.organization)
self.assertEqual(len(controls), 10)
self.assertEqual(controls[0][0], 1)
@patch('backend.core.utils.HTML')
def test_generate_pdf(self, mock_html):
mock_instance = MagicMock()
mock_instance.write_pdf.return_value = b'PDF_CONTENT'
mock_html.return_value = mock_instance
doc = Document.objects.create(organization=self.organization)
response = generate_pdf(doc)
self.assertEqual(response.status_code, 200)
self.assertEqual(response['Content-Type'], 'application/pdf')
def test_calculate_aggregate_weight(self):
controls = [{'weight': 5}, {'weight': 3}]
self.assertEqual(calculate_aggregate_weight(controls), 8)
def test_calculate_aggregate_likelihood(self):
controls = [{'likelihood': 2}, {'likelihood': 4}]
self.assertEqual(calculate_aggregate_likelihood(controls), 6)
def test_map_weight_to_impact_likelihood(self):
impact, likelihood = map_weight_to_impact_likelihood(50, 30, 100)
self.assertAlmostEqual(impact, 5.0)
self.assertAlmostEqual(likelihood, 3.0)
@patch('pdf2image.convert_from_bytes')
@patch('backend.core.utils.HTML')
def test_generate_first_page_image(self, mock_html, mock_convert):
mock_pdf_instance = MagicMock()
mock_pdf_instance.write_pdf.return_value = b'PDF_CONTENT'
mock_html.return_value = mock_pdf_instance
mock_image = MagicMock()
mock_convert.return_value = [mock_image]
doc = Document.objects.create(organization=self.organization)
img_io = generate_first_page_image(doc)
self.assertIsInstance(img_io, BytesIO)
mock_convert.assert_called_once_with(b'PDF_CONTENT', first_page=1, last_page=1)
def test_generate_risk_graph(self):
risks_with_controls = [
{'risk': {'id': 1}, 'impact': 5.0, 'likelihood': 3.0},
{'risk': {'id': 2}, 'impact': 7.0, 'likelihood': 4.0}
]
graph_data = generate_risk_graph(risks_with_controls)
self.assertIsInstance(graph_data, str)
self.assertTrue(len(graph_data) > 1000)

View File

@@ -1,11 +1,18 @@
from django.test import TestCase, Client
from django.urls import reverse
from uuid import uuid4
from unittest.mock import patch
from backend.core.models import Organization, Document, Risk, Control, DocumentRiskControl, DocumentTemplate
from django.conf import settings
from django.contrib.auth.models import User
from django.http import HttpResponse
class DocumentViewTest(TestCase):
def setUp(self):
self.client = Client()
self.staff_user = User.objects.create_user(username='staff', password='password', is_staff=True)
self.client.login(username='staff', password='password')
self.organization = Organization.objects.create(
id=1,
@@ -40,19 +47,43 @@ class DocumentViewTest(TestCase):
self.control1 = Control.objects.create(id=1, name="Control A")
self.control2 = Control.objects.create(id=2, name="Control B")
DocumentRiskControl.objects.create(id=1, document=self.document, risk=self.risk1, control=self.control1, weight=5)
DocumentRiskControl.objects.create(id=2, document=self.document, risk=self.risk1, control=self.control2, weight=7)
DocumentRiskControl.objects.create(id=3, document=self.document, risk=self.risk2, control=self.control1, weight=8)
DocumentRiskControl.objects.create(id=1, document=self.document, risk=self.risk1, control=self.control1, weight=5, likelihood=3)
DocumentRiskControl.objects.create(id=2, document=self.document, risk=self.risk1, control=self.control2, weight=7, likelihood=4)
DocumentRiskControl.objects.create(id=3, document=self.document, risk=self.risk2, control=self.control1, weight=8, likelihood=2)
template_content = """
- segment_type: "h1"
content: "{{ document.organization.name }} - Risk Report"
- segment_type: "body"
content: "Document ID: {{ document.id }}"
- segment_type: "body"
- segment_type: "p"
content: "Created at: {{ document.created_at|date:'Y-m-d' }}"
- segment_type: "h2"
content: "Risks"
content: "Top 10 Risk Identified"
- segment_type: "table"
content: |
<table>
<tr>
<th>Risk ID</th>
<th>Risk Name</th>
<th>Inherent Impact</th>
<th>Inherent Likelihood</th>
<th>Inherent Risk Score</th>
<th>Description of Risk</th>
</tr>
{% for item in risks_with_controls %}
<tr>
<td>{{ item.risk.id }}</td>
<td>{{ item.risk.name }}</td>
<td> - </td>
<td> - </td>
<td> - </td>
<td> - </td>
</tr>
{% endfor %}
</table>
- segment_type: "image"
content: "data:image/png;base64,{{ graph }}"
- segment_type: "h2"
content: "Risks with Controls"
- segment_type: "body"
content: |
{% for item in risks_with_controls %}
@@ -73,12 +104,44 @@ class DocumentViewTest(TestCase):
self.template = DocumentTemplate.objects.create(id=1, name="Default Template", content=template_content)
def test_document_view(self):
url = reverse('core:document', kwargs={'document_id': self.document.id})
response = self.client.get(url)
response = self.client.get(reverse('core:document', kwargs={'document_id': self.document.id}))
self.assertEqual(response.status_code, 200)
self.assertContains(response, str(self.document.id))
self.assertContains(response, self.organization.name)
self.assertContains(response, self.document.created_at.strftime('%Y-%m-%d'))
self.assertContains(response, self.risk1.risk_name)
self.assertContains(response, self.control1.name)
self.assertContains(response, "Weight: 5")
self.assertTemplateUsed(response, 'document.html')
self.assertContains(response, self.organization.name)
def test_index_view(self):
response = self.client.get(reverse('core:index'))
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, 'index.html')
def test_signup_view_get(self):
response = self.client.get(reverse('core:signup'))
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, 'signup.html')
def test_thankyou_view(self):
response = self.client.get(reverse('core:thankyou'))
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, 'thankyou.html')
def test_payment_page_view_get(self):
response = self.client.get(reverse('core:payment_page') + '?email=test@example.com')
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, 'payment.html')
def test_template_preview_view(self):
response = self.client.get(reverse('core:template_preview', args=[self.template.name]))
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, 'template_preview.html')
def test_pdf_view(self):
with patch('backend.core.views.generate_pdf') as mock_generate_pdf:
mock_response = HttpResponse(b'%PDF-1.4...', content_type='application/pdf')
mock_generate_pdf.return_value = mock_response
response = self.client.get(reverse('core:pdf_view', args=[self.document.id]))
self.assertEqual(response.status_code, 200)
self.assertEqual(response['Content-Type'], 'application/pdf')
self.assertIn(b'%PDF', response.content[:10])
mock_generate_pdf.assert_called_once_with(self.document)

View File

@@ -5,6 +5,14 @@ from weasyprint import HTML
from django.http import HttpResponse
from PIL import Image
import io
import base64
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from django.contrib.staticfiles.finders import find
import matplotlib.image as mpimg
def extract_organization_details(organization):
excluded_fields = {"name", "email"}
@@ -59,7 +67,6 @@ def get_top_risk(organization):
)
risk_ids = response.choices[0].message.content.strip().split(",")
print(f"Risks: {risk_ids}")
return [int(risk_id) for risk_id in risk_ids if risk_id.isdigit()]
@@ -80,22 +87,28 @@ def get_controls_for_risk(risk, organization):
prompt = f"""
You are an expert in cybersecurity risk management. Given the risk "{risk.risk_name}" and its associated organization details "{organization_details}",
your task is to select **exactly 10 unique controls** from the provided list that best mitigate this risk. Each control should be assigned a weight between **1 and 10** based on its effectiveness in reducing the risk.
your task is to select **exactly 10 unique controls** from the provided list that best mitigate this risk. Each control should be assigned:
- A weight between **1 and 10** (1 = low impact, 10 = high impact).
- A likelihood score between **1 and 10** (1 = rare occurrence, 10 = highly likely).
### Rules:
1. **Each control ID must be unique** (no duplicates).
2. **Only return control IDs and weights** in the exact format below.
2. **Only return control IDs, weights, and likelihood scores** in the exact format below.
3. **Weights must be between 1 and 10** (1 = low impact, 10 = high impact).
4. **Do NOT add explanations, descriptions, or extra text.**
5. **Ensure that control IDs are randomly distributed and diverse across different categories.**
4. **Likelihood scores must be between 1 and 10** (1 = rare occurrence, 10 = highly likely).
5. **Do NOT add explanations, descriptions, or extra text.**
6. **Ensure that control IDs are randomly distributed and diverse across different categories.**
### Available Controls:
{control_list}
### Expected Response Format (STRICTLY FOLLOW THIS FORMAT):
<control_id> : <weight>
<control_id> : <weight>
<control_id> : <weight> : <likelihood>
<control_id> : <weight> : <likelihood>
### Example Correct Response (NO DUPLICATES):
12 : 8
45 : 7
12 : 8 : 90
45 : 7 : 60
⚠️ **If you provide duplicate control IDs, your response will be rejected. Ensure all control IDs are unique.**
⚠️ **Follow the response format exactly. Any deviation will be considered invalid.**
"""
@@ -108,22 +121,23 @@ def get_controls_for_risk(risk, organization):
for line in result.split("\n"):
line = line.strip()
parts = line.split(":")
if len(parts) == 2:
if len(parts) == 3:
control_id_str = parts[0].replace("ID:", "").replace("id:", "").replace("Id:", "").strip()
weight_str = parts[1].strip().replace("Weight:", "").replace("weight:", "").strip()
print(f"Control:{control_id_str} Weight:{weight_str}")
print(f"ControlType: {type(control_id_str)} WeightType: {type(weight_str)}")
likelihood_str = parts[2].strip().replace("Likelihood:", "").replace("likelihood:", "").strip()
control_id_str = ''.join(filter(str.isdigit, control_id_str))
weight_str = ''.join(filter(str.isdigit, weight_str))
likelihood_str = ''.join(filter(str.isdigit, likelihood_str))
if control_id_str and weight_str:
if control_id_str and weight_str and likelihood_str:
try:
control_id = int(control_id_str)
weight = int(weight_str)
likelihood = int(likelihood_str)
if control_id in valid_control_ids and 1 <= weight <= 10 and control_id not in control_ids_seen:
selected_controls.append((control_id, weight))
if control_id in valid_control_ids and 1 <= weight <= 10 and 1 <= likelihood <= 10 and control_id not in control_ids_seen:
selected_controls.append((control_id, weight, likelihood))
control_ids_seen.add(control_id)
except ValueError:
continue
@@ -137,23 +151,30 @@ def get_controls_for_risk(risk, organization):
remaining_controls_list = [f"Control ID: {cid}, Control Name: {control_map[cid]}" for cid in remaining_controls]
retry_prompt = f"""
You are an expert in cybersecurity risk management. Given the risk "{risk.risk_name}" and its associated organization details "{organization_details}",
your task is to select **exactly {missing_count} unique controls** from the provided list that best mitigate this risk. Each control should be assigned a weight between **1 and 10** based on its effectiveness in reducing the risk.
You are an expert in cybersecurity risk management. Given the risk "{risk.risk_name}" and the organization's details "{organization_details}",
your task is to select **exactly {missing_count} unique controls** from the provided list that best mitigate this risk. Each control should be assigned:
- A **weight** between **1 and 10** based on its effectiveness in reducing the risk.
- A likelihood score between **1 and 10** (1 = rare occurrence, 10 = highly likely).
### Rules:
1. **Each control ID must be unique** (no duplicates).
2. **Only return control IDs and weights** in the exact format below.
2. **Only return control IDs, weights, and likelihood scores** in the exact format below.
3. **Weights must be between 1 and 10** (1 = low impact, 10 = high impact).
4. **Do NOT add explanations, descriptions, or extra text.**
5. **Ensure that control IDs are randomly distributed and diverse across different categories.**
4. **Likelihood scores must be between 1 and 10** (1 = rare occurrence, 10 = highly likely).
5. **Do NOT add explanations, descriptions, or extra text.**
6. **Ensure that control IDs are diverse and well-distributed across different categories.**
### Available Controls:
{remaining_controls_list}
### Expected Response Format (STRICTLY FOLLOW THIS FORMAT):
<control_id> : <weight>
<control_id> : <weight>
<control_id> : <weight> : <likelihood>
<control_id> : <weight> : <likelihood>
### Example Correct Response (NO DUPLICATES):
12 : 8
45 : 7
12 : 8 : 85
45 : 7 : 60
⚠️ **If you provide duplicate control IDs, your response will be rejected. Ensure all control IDs are unique.**
⚠️ **Follow the response format exactly. Any deviation will be considered invalid.**
"""
@@ -162,22 +183,24 @@ def get_controls_for_risk(risk, organization):
for line in result.split("\n"):
line = line.strip()
parts = line.split(":")
if len(parts) == 2:
if len(parts) == 3:
control_id_str = parts[0].replace("ID:", "").replace("id:", "").replace("Id:", "").strip()
weight_str = parts[1].strip().replace("Weight:", "").replace("weight:", "").strip()
print(f"Control:{control_id} Weight:{weight_str}")
print(f"ControlType: {type(control_id)} WeightType: {type(weight_str)}")
likelihood_str = parts[2].strip().replace("Likelihood:", "").replace("likelihood:", "").strip()
control_id_str = ''.join(filter(str.isdigit, control_id_str))
weight_str = ''.join(filter(str.isdigit, weight_str))
likelihood_str = ''.join(filter(str.isdigit, likelihood_str))
if control_id_str and weight_str:
if control_id_str and weight_str and likelihood_str:
try:
control_id = int(control_id_str)
weight = int(weight_str)
if control_id in valid_control_ids and 1 <= weight <= 10 and control_id not in control_ids_seen:
selected_controls.append((control_id, weight))
likelihood = int(likelihood_str)
if control_id in valid_control_ids and 1 <= weight <= 10 and 1 <= likelihood <= 10 and control_id not in control_ids_seen:
selected_controls.append((control_id, weight, likelihood))
control_ids_seen.add(control_id)
except ValueError:
continue
@@ -207,4 +230,68 @@ def generate_first_page_image(document):
images[0].save(img_io, format="JPEG", quality=90)
img_io.seek(0)
return img_io
return img_io
def calculate_aggregate_weight(controls):
total_weight = sum(control['weight']for control in controls)
return total_weight
def calculate_aggregate_likelihood(controls):
total_likelihood = sum(control['likelihood'] for control in controls)
return total_likelihood
def map_weight_to_impact_likelihood(total_weight, total_likelihood, max_weight):
normalized_weight = total_weight / max_weight
impact = min(10.0, max(1.0, normalized_weight * 10.0))
likelihood = min(10.0, max(1.0, total_likelihood / 10.0))
return impact, likelihood
def generate_risk_graph(risks_with_controls):
impacts = [risk['impact'] for risk in risks_with_controls]
likelihoods = [risk['likelihood'] for risk in risks_with_controls]
risk_ids = [risk['risk']['id'] for risk in risks_with_controls]
bg_img_path = find('img/graph_matrix (3).png')
bg_img = mpimg.imread(bg_img_path)
fig, ax = plt.subplots(figsize=(10, 8))
ax.imshow(bg_img, extent=[0, 11.2, 0, 11.2], aspect='auto')
scatter = ax.scatter(
likelihoods, impacts,
c="blue", edgecolors="white", s=500, alpha=0.9
)
for i, risk_id in enumerate(risk_ids):
ax.annotate(
str(risk_id),
(likelihoods[i], impacts[i]),
color="white",
fontsize=12,
ha="center",
va="center",
weight="bold",
)
ax.set_xticks([])
ax.set_yticks([])
ax.set_xticklabels([])
ax.set_yticklabels([])
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.spines['left'].set_visible(False)
ax.spines['bottom'].set_visible(False)
buffer = io.BytesIO()
plt.savefig(buffer, format="png", transparent=True, bbox_inches='tight', pad_inches=0)
buffer.seek(0)
image_png = buffer.getvalue()
buffer.close()
plt.close()
return base64.b64encode(image_png).decode("utf-8")

View File

@@ -3,11 +3,11 @@ import yaml
from django.shortcuts import render, redirect , get_object_or_404
from .forms import OrganizationForm
from .models import Organization,Document, DocumentTemplate,DocumentRiskControl
from .models import Organization,Document, DocumentTemplate,DocumentRiskControl,Risk
from backend.accounts.utils import send_confirmation_email, send_document_email
from django.contrib.admin.views.decorators import staff_member_required
from django.template import Template, Context
from .utils import generate_pdf
from .utils import generate_pdf, map_weight_to_impact_likelihood, calculate_aggregate_weight, calculate_aggregate_likelihood, generate_risk_graph
from django.conf import settings
site_domain = settings.SITE_DOMAIN
@@ -67,15 +67,24 @@ def document(request, document_id):
controls = (
DocumentRiskControl.objects
.filter(document=document, risk_id=risk['id'])
.values('control', 'control__name', 'weight')
.values('control', 'control__name', 'weight', 'likelihood')
.distinct()
)
max_weight = 10*10
total_weight = calculate_aggregate_weight(controls)
total_likelihood = calculate_aggregate_likelihood(controls)
impact, likelihood = map_weight_to_impact_likelihood(total_weight, total_likelihood, max_weight)
risks_with_controls.append({
'risk': risk,
'controls': list(controls)
'controls': list(controls),
'total_weight': total_weight,
'impact': impact,
'likelihood': likelihood,
'risk_score': (round(impact) * round(likelihood))
})
graph_base64 = generate_risk_graph(risks_with_controls)
template_obj = get_object_or_404(DocumentTemplate, name="Default Template")
template_content = template_obj.content
@@ -85,14 +94,29 @@ def document(request, document_id):
return render(request, 'error.html', {'error_message': 'Error parsing template.'})
context = {
'document': document,
'risks_with_controls': risks_with_controls
'risks_with_controls': risks_with_controls,
'graph': graph_base64,
}
rendered_content = ""
for segment in template_segments:
content = segment['content']
django_template = Template(content)
rendered_content += django_template.render(Context(context))
content = segment.get('content', '')
segment_type = segment.get('segment_type', '')
django_template = Template(content)
processed_content = django_template.render(Context(context))
if segment_type == "h1":
rendered_content += f"<h1>{processed_content}</h1>\n"
elif segment_type == "h2":
rendered_content += f"<h2>{processed_content}</h2>\n"
elif segment_type == "h3":
rendered_content += f"<h3>{processed_content}</h3>\n"
elif segment_type == "p":
rendered_content += f"<p>{processed_content}</p>\n"
elif segment_type == "image":
rendered_content += f'<img src="{processed_content}" alt="Risk Graph" style="max-width:100%; height:auto;">\n'
else:
rendered_content += processed_content
return render(request, 'document.html', {'rendered_html': rendered_content})

View File

@@ -1,11 +1,40 @@
- segment_type: "h1"
content: "{{ document.organization.name }} - Risk Report"
content: "{{ document.organization.name }} - Risk Report "
- segment_type: "body"
- segment_type: "p"
content: "Created at: {{ document.created_at|date:'Y-m-d' }}"
- segment_type: "h2"
content: "Risks"
content: "Top 10 Risk Identified"
- segment_type: "table"
content: |
<table>
<tr>
<th>Risk ID</th>
<th>Risk Name</th>
<th>Inherent Impact </th>
<th>Interent Liklihood </th>
<th>Inherent Risk Score </th>
<th>Description of Risk </th>
</tr>
{% for item in risks_with_controls %}
<tr>
<td>{{ item.risk.id }}</td>
<td>{{ item.risk.name }}</td>
<td>{{ item.impact|floatformat:0 }}</td>
<td>{{ item.likelihood|floatformat:0 }}</td>
<td>{{ item.risk_score }}</td>
<td> - </td>
</tr>
{% endfor %}
</table>
- segment_type: "image"
content: "data:image/png;base64,{{ graph }}"
- segment_type: "h2"
content: "Risks with Controls"
- segment_type: "body"
content: |
@@ -22,4 +51,4 @@
{% endfor %}
</div>
</div>
{% endfor %}
{% endfor %}