diff --git a/backend/accounts/tasks.py b/backend/accounts/tasks.py index 0b22d94..15ed444 100644 --- a/backend/accounts/tasks.py +++ b/backend/accounts/tasks.py @@ -37,16 +37,17 @@ def create_document_for_organization(confirmation_email): selected_controls = get_controls_for_risk(risk ,organization=organization) - for control_id, weight in selected_controls: + for control_id, weight, likelihood in selected_controls: control = Control.objects.filter(id=control_id).first() if control: DocumentRiskControl.objects.create( document=document, risk=risk, control=control, - weight=weight + weight=weight, + likelihood=likelihood ) - controls_content += f" - Control: {control.name} (Impact Weight: {weight}/10)\n" + controls_content += f" - Control: {control.name} (Impact Weight: {weight}/10) (Likelihood: {likelihood}/10)\n" controls_content += "\n" diff --git a/backend/accounts/tests/__init__.py b/backend/accounts/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/accounts/tests/test_tasks.py b/backend/accounts/tests/test_tasks.py new file mode 100644 index 0000000..138e92d --- /dev/null +++ b/backend/accounts/tests/test_tasks.py @@ -0,0 +1,56 @@ +from unittest.mock import patch +from django.test import TestCase +from backend.core.models import Organization, Document, Risk, Control, DocumentRiskControl +from backend.accounts.tasks import create_document_for_organization + +class CeleryTaskTests(TestCase): + + def setUp(self): + self.organization = Organization.objects.create( + id=1, + name="Test Organization", + email="test@example.com", + employee_headcount="100-500", + annual_revenue="$1M-$10M", + critical_applications="5-10", + compliance_frameworks=["Ab", "Ba"], + industry_sector="Technology", + it_dependency=8, + data_sensitivity="High", + network_infrastructure="Cloud-based", + remote_workforce_percentage="50%", + third_party_vendor_access="10-20", + internal_software_development="Moderate", + geographic_scope="Global", + customer_base="Enterprise", + customer_type="B2B", + product_portfolio="Diverse", + supplier_base="International", + it_infrastructure=["Cloud", "On-Premise"], + intellectual_property=["Patents", "Trademarks"], + sensitive_data=["PII", "Financial Data"], + integration_level="Highly Integrated" + ) + self.risk = Risk.objects.create(risk_id="1", risk_name="Test Risk", category="Category1", primary_impact="High") + self.control = Control.objects.create(name="Test Control") + + @patch("backend.accounts.tasks.get_top_risk") + @patch("backend.accounts.tasks.get_controls_for_risk") + @patch("backend.accounts.tasks.send_payment_email") + def test_create_document_for_organization(self, mock_send_payment_email, mock_get_controls_for_risk, mock_get_top_risk): + mock_get_top_risk.return_value = [self.risk.risk_id] + mock_get_controls_for_risk.return_value = [(self.control.id, 5, 7)] + create_document_for_organization(self.organization.email) + + document = Document.objects.first() + self.assertIsNotNone(document) + + document_risk_control = DocumentRiskControl.objects.first() + self.assertIsNotNone(document_risk_control) + self.assertEqual(document_risk_control.document, document) + self.assertEqual(document_risk_control.risk, self.risk) + self.assertEqual(document_risk_control.control, self.control) + self.assertEqual(document_risk_control.weight, 5) + self.assertEqual(document_risk_control.likelihood, 7) + + mock_send_payment_email.assert_called_once_with(self.organization.email) \ No newline at end of file diff --git a/backend/accounts/tests/test_utils.py b/backend/accounts/tests/test_utils.py new file mode 100644 index 0000000..8cf284d --- /dev/null +++ b/backend/accounts/tests/test_utils.py @@ -0,0 +1,66 @@ +from django.test import TestCase +from django.core import mail +from unittest.mock import patch, MagicMock +from backend.core.models import Organization, Document +from backend.accounts.models import EmailConfirmation +from backend.core.utils import generate_first_page_image +from backend.accounts.utils import send_confirmation_email, send_payment_email, send_document_email +import uuid +from django.utils.timezone import now + +class EmailTests(TestCase): + def setUp(self): + self.email = "test@example.com" + self.organization = Organization.objects.create( + id=1, + name="Test Organization", + email="test@example.com", + employee_headcount="100-500", + annual_revenue="$1M-$10M", + critical_applications="5-10", + compliance_frameworks=["Ab", "Ba"], + industry_sector="Technology", + it_dependency=8, + data_sensitivity="High", + network_infrastructure="Cloud-based", + remote_workforce_percentage="50%", + third_party_vendor_access="10-20", + internal_software_development="Moderate", + geographic_scope="Global", + customer_base="Enterprise", + customer_type="B2B", + product_portfolio="Diverse", + supplier_base="International", + it_infrastructure=["Cloud", "On-Premise"], + intellectual_property=["Patents", "Trademarks"], + sensitive_data=["PII", "Financial Data"], + integration_level="Highly Integrated" + ) + self.document = Document.objects.create(organization=self.organization) + + @patch("backend.accounts.utils.send_mail") + def test_send_confirmation_email(self, mock_send_mail): + confirmation = EmailConfirmation.objects.create(email=self.email, uuid=uuid.uuid4(), created_at=now()) + send_confirmation_email(self.email) + + confirmation.refresh_from_db() + self.assertIsNotNone(confirmation.uuid) + self.assertEqual(mock_send_mail.call_count, 1) + + @patch("backend.accounts.utils.send_mail") + def test_send_payment_email(self, mock_send_mail): + send_payment_email(self.email) + self.assertEqual(mock_send_mail.call_count, 1) + + @patch("backend.accounts.utils.EmailMultiAlternatives.send") + @patch("backend.accounts.utils.generate_first_page_image") + def test_send_document_email(self, mock_generate_image, mock_send): + mock_image_io = MagicMock() + mock_image_io.getvalue.return_value = b"fake image data" + mock_generate_image.return_value = mock_image_io + + document_link = "https://example.com/document.pdf" + send_document_email(self.email, document_link, self.document) + + mock_generate_image.assert_called_once_with(self.document) + mock_send.assert_called_once() diff --git a/backend/accounts/tests/test_views.py b/backend/accounts/tests/test_views.py new file mode 100644 index 0000000..f0a62cc --- /dev/null +++ b/backend/accounts/tests/test_views.py @@ -0,0 +1,34 @@ +import uuid +from django.test import TestCase +from django.urls import reverse +from unittest.mock import patch +from backend.accounts.models import EmailConfirmation +from backend.accounts.utils import send_confirmation_email +from django.utils.timezone import now, timedelta + + +class EmailConfirmationTests(TestCase): + def setUp(self): + """Set up test data.""" + self.valid_email = "test@example.com" + self.confirmation = EmailConfirmation.objects.create( + email=self.valid_email, + uuid=uuid.uuid4(), + created_at=now() + ) + + @patch("backend.accounts.views.create_document_for_organization.delay") + def test_confirm_email_valid(self, mock_task): + """Test valid email confirmation.""" + response = self.client.get(reverse("confirm_email", args=[self.confirmation.uuid])) + self.assertTemplateUsed(response, "accounts/confirmation_success.html") + self.assertContains(response, self.valid_email) + mock_task.assert_called_once_with(self.valid_email) + + @patch("backend.accounts.views.send_confirmation_email") + def test_resend_confirmation(self, mock_send): + """Test resending confirmation email.""" + response = self.client.post(reverse("resend_confirmation", args=[self.valid_email])) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.content.decode(), "Confirmation email resent") + mock_send.assert_called_once_with(self.valid_email) diff --git a/backend/core/admin.py b/backend/core/admin.py index 87beba0..8623efb 100644 --- a/backend/core/admin.py +++ b/backend/core/admin.py @@ -39,7 +39,7 @@ class ControlAdmin(admin.ModelAdmin): list_display = ('id', 'name') class DocumentRiskControlAdmin(admin.ModelAdmin): - list_display = ('document', 'risk', 'control', 'weight') + list_display = ('document', 'risk', 'control', 'weight','likelihood') admin.site.register(Document, DocumentAdmin) diff --git a/backend/core/migrations/0009_documentriskcontrol_likelihood_and_more.py b/backend/core/migrations/0009_documentriskcontrol_likelihood_and_more.py new file mode 100644 index 0000000..3aca2a2 --- /dev/null +++ b/backend/core/migrations/0009_documentriskcontrol_likelihood_and_more.py @@ -0,0 +1,128 @@ +# Generated by Django 5.1.3 on 2025-03-26 18:51 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0008_documentriskcontrol'), + ] + + operations = [ + migrations.AddField( + model_name='documentriskcontrol', + name='likelihood', + field=models.IntegerField(blank=True, null=True), + ), + migrations.AlterField( + model_name='organization', + name='annual_revenue', + field=models.CharField(help_text="What is your organization's annual revenue range?", max_length=20), + ), + migrations.AlterField( + model_name='organization', + name='compliance_frameworks', + field=models.JSONField(help_text='Which regulatory frameworks is your organization required to comply with?'), + ), + migrations.AlterField( + model_name='organization', + name='critical_applications', + field=models.CharField(help_text='How many critical business applications do your employees use daily?', max_length=20), + ), + migrations.AlterField( + model_name='organization', + name='customer_base', + field=models.CharField(blank=True, help_text='How would you characterize your customer base distribution?', max_length=20, null=True), + ), + migrations.AlterField( + model_name='organization', + name='customer_type', + field=models.CharField(blank=True, help_text='What is your primary customer type?', max_length=20, null=True), + ), + migrations.AlterField( + model_name='organization', + name='data_sensitivity', + field=models.CharField(help_text='What level of sensitive data does your organization process?', max_length=20), + ), + migrations.AlterField( + model_name='organization', + name='email', + field=models.EmailField(help_text='What is your email?', max_length=254, unique=True), + ), + migrations.AlterField( + model_name='organization', + name='employee_headcount', + field=models.CharField(help_text="What is your organization's current employee headcount?", max_length=20), + ), + migrations.AlterField( + model_name='organization', + name='geographic_scope', + field=models.CharField(blank=True, help_text="What is your organization's geographic operational scope?", max_length=20, null=True), + ), + migrations.AlterField( + model_name='organization', + name='industry_sector', + field=models.CharField(help_text='What is your primary industry sector?', max_length=255), + ), + migrations.AlterField( + model_name='organization', + name='integration_level', + field=models.CharField(blank=True, help_text='How integrated are your critical business systems?', max_length=20, null=True), + ), + migrations.AlterField( + model_name='organization', + name='intellectual_property', + field=models.JSONField(blank=True, help_text='How does your organization protect and manage intellectual property?', null=True), + ), + migrations.AlterField( + model_name='organization', + name='internal_software_development', + field=models.CharField(help_text='What is the extent of your internal software development activities?', max_length=20), + ), + migrations.AlterField( + model_name='organization', + name='it_dependency', + field=models.IntegerField(help_text='On a scale from 1-10, how dependent is your business operations on technology?'), + ), + migrations.AlterField( + model_name='organization', + name='it_infrastructure', + field=models.JSONField(blank=True, help_text='What is your primary IT infrastructure model?', null=True), + ), + migrations.AlterField( + model_name='organization', + name='name', + field=models.CharField(help_text='What is the name of your organization?', max_length=255, unique=True), + ), + migrations.AlterField( + model_name='organization', + name='network_infrastructure', + field=models.CharField(help_text="What best describes your organization's network infrastructure model?", max_length=20), + ), + migrations.AlterField( + model_name='organization', + name='product_portfolio', + field=models.CharField(blank=True, help_text='How diversified is your product/service portfolio?', max_length=20, null=True), + ), + migrations.AlterField( + model_name='organization', + name='remote_workforce_percentage', + field=models.CharField(help_text='What percentage of your workforce operates remotely?', max_length=20), + ), + migrations.AlterField( + model_name='organization', + name='sensitive_data', + field=models.JSONField(blank=True, help_text='What type of sensitive data does your organization handle?', null=True), + ), + migrations.AlterField( + model_name='organization', + name='supplier_base', + field=models.CharField(blank=True, help_text='What is your supplier base structure?', max_length=20, null=True), + ), + migrations.AlterField( + model_name='organization', + name='third_party_vendor_access', + field=models.CharField(help_text='How many third-party vendors have access to your systems?', max_length=20), + ), + ] diff --git a/backend/core/models.py b/backend/core/models.py index 6c42916..bfdd03f 100644 --- a/backend/core/models.py +++ b/backend/core/models.py @@ -164,6 +164,7 @@ class DocumentRiskControl(models.Model): risk = models.ForeignKey(Risk, on_delete=models.CASCADE) control = models.ForeignKey(Control, on_delete=models.CASCADE) weight = models.IntegerField() + likelihood = models.IntegerField(null=True, blank=True) class Meta: unique_together = ('document', 'risk', 'control') \ No newline at end of file diff --git a/backend/core/static/img/graph_matrix (3).png b/backend/core/static/img/graph_matrix (3).png new file mode 100644 index 0000000..94e3152 Binary files /dev/null and b/backend/core/static/img/graph_matrix (3).png differ diff --git a/backend/core/static/img/graph_matrix.png b/backend/core/static/img/graph_matrix.png new file mode 100644 index 0000000..1751129 Binary files /dev/null and b/backend/core/static/img/graph_matrix.png differ diff --git a/backend/core/templates/document.html b/backend/core/templates/document.html index b48bf35..a713923 100644 --- a/backend/core/templates/document.html +++ b/backend/core/templates/document.html @@ -1,3 +1,5 @@ +{% load static %} +
{% if error %}

{{ error }}

diff --git a/backend/core/tests/test_utils.py b/backend/core/tests/test_utils.py new file mode 100644 index 0000000..1048d64 --- /dev/null +++ b/backend/core/tests/test_utils.py @@ -0,0 +1,131 @@ +from django.test import TestCase +from unittest.mock import patch, MagicMock +from django.contrib.staticfiles.testing import StaticLiveServerTestCase +from io import BytesIO +import base64 +import matplotlib +matplotlib.use('Agg') +import matplotlib.pyplot as plt + +from backend.core.models import Organization, Risk, Control, Document +from backend.core.utils import * + +class UtilsTests(TestCase): + def setUp(self): + self.organization = Organization.objects.create( + id=1, + name="Test Organization", + email="test@example.com", + employee_headcount="100-500", + annual_revenue="$1M-$10M", + critical_applications="5-10", + compliance_frameworks=["Ab", "Ba"], + industry_sector="Technology", + it_dependency=8, + data_sensitivity="High", + network_infrastructure="Cloud-based", + remote_workforce_percentage="50%", + third_party_vendor_access="10-20", + internal_software_development="Moderate", + geographic_scope="Global", + customer_base="Enterprise", + customer_type="B2B", + product_portfolio="Diverse", + supplier_base="International", + it_infrastructure=["Cloud", "On-Premise"], + intellectual_property=["Patents", "Trademarks"], + sensitive_data=["PII", "Financial Data"], + integration_level="Highly Integrated" + ) + + self.risk = Risk.objects.create( + risk_id=1, + risk_name="Test Risk", + category="Security", + primary_impact="Financial" + ) + + self.controls = [Control.objects.create(id=i, name=f"Control {i}") for i in range(1, 11)] + + def test_extract_organization_details(self): + details = extract_organization_details(self.organization) + self.assertNotIn('name', details) + self.assertNotIn('email', details) + self.assertIn("What is your organization's current employee headcount?", details) + self.assertEqual(details["What is your organization's current employee headcount?"], "100-500") + + @patch('backend.core.utils.OpenAI') + def test_get_top_risk(self, mock_openai): + mock_client = MagicMock() + mock_openai.return_value = mock_client + mock_response = MagicMock() + mock_response.choices[0].message.content = "1,2,3" + mock_client.chat.completions.create.return_value = mock_response + + risks = get_top_risk(self.organization) + self.assertEqual(risks, [1,2,3]) + + @patch('backend.core.utils.OpenAI') + def test_get_controls_for_risk(self, mock_openai): + mock_client = MagicMock() + mock_openai.return_value = mock_client + mock_response = MagicMock() + + control_lines = [f"{i} : 8 : 5" for i in range(1, 11)] + mock_response.choices[0].message.content = "\n".join(control_lines) + mock_client.chat.completions.create.return_value = mock_response + + controls = get_controls_for_risk(self.risk, self.organization) + self.assertEqual(len(controls), 10) + self.assertEqual(controls[0][0], 1) + + @patch('backend.core.utils.HTML') + def test_generate_pdf(self, mock_html): + mock_instance = MagicMock() + mock_instance.write_pdf.return_value = b'PDF_CONTENT' + mock_html.return_value = mock_instance + + doc = Document.objects.create(organization=self.organization) + response = generate_pdf(doc) + + self.assertEqual(response.status_code, 200) + self.assertEqual(response['Content-Type'], 'application/pdf') + + def test_calculate_aggregate_weight(self): + controls = [{'weight': 5}, {'weight': 3}] + self.assertEqual(calculate_aggregate_weight(controls), 8) + + def test_calculate_aggregate_likelihood(self): + controls = [{'likelihood': 2}, {'likelihood': 4}] + self.assertEqual(calculate_aggregate_likelihood(controls), 6) + + def test_map_weight_to_impact_likelihood(self): + impact, likelihood = map_weight_to_impact_likelihood(50, 30, 100) + self.assertAlmostEqual(impact, 5.0) + self.assertAlmostEqual(likelihood, 3.0) + + @patch('pdf2image.convert_from_bytes') + @patch('backend.core.utils.HTML') + def test_generate_first_page_image(self, mock_html, mock_convert): + mock_pdf_instance = MagicMock() + mock_pdf_instance.write_pdf.return_value = b'PDF_CONTENT' + mock_html.return_value = mock_pdf_instance + + mock_image = MagicMock() + mock_convert.return_value = [mock_image] + + doc = Document.objects.create(organization=self.organization) + img_io = generate_first_page_image(doc) + + self.assertIsInstance(img_io, BytesIO) + mock_convert.assert_called_once_with(b'PDF_CONTENT', first_page=1, last_page=1) + + def test_generate_risk_graph(self): + risks_with_controls = [ + {'risk': {'id': 1}, 'impact': 5.0, 'likelihood': 3.0}, + {'risk': {'id': 2}, 'impact': 7.0, 'likelihood': 4.0} + ] + + graph_data = generate_risk_graph(risks_with_controls) + self.assertIsInstance(graph_data, str) + self.assertTrue(len(graph_data) > 1000) \ No newline at end of file diff --git a/backend/core/tests/test_views.py b/backend/core/tests/test_views.py index a755f4f..deb2536 100644 --- a/backend/core/tests/test_views.py +++ b/backend/core/tests/test_views.py @@ -1,11 +1,18 @@ from django.test import TestCase, Client from django.urls import reverse from uuid import uuid4 +from unittest.mock import patch from backend.core.models import Organization, Document, Risk, Control, DocumentRiskControl, DocumentTemplate +from django.conf import settings +from django.contrib.auth.models import User +from django.http import HttpResponse + class DocumentViewTest(TestCase): def setUp(self): self.client = Client() + self.staff_user = User.objects.create_user(username='staff', password='password', is_staff=True) + self.client.login(username='staff', password='password') self.organization = Organization.objects.create( id=1, @@ -40,19 +47,43 @@ class DocumentViewTest(TestCase): self.control1 = Control.objects.create(id=1, name="Control A") self.control2 = Control.objects.create(id=2, name="Control B") - DocumentRiskControl.objects.create(id=1, document=self.document, risk=self.risk1, control=self.control1, weight=5) - DocumentRiskControl.objects.create(id=2, document=self.document, risk=self.risk1, control=self.control2, weight=7) - DocumentRiskControl.objects.create(id=3, document=self.document, risk=self.risk2, control=self.control1, weight=8) + DocumentRiskControl.objects.create(id=1, document=self.document, risk=self.risk1, control=self.control1, weight=5, likelihood=3) + DocumentRiskControl.objects.create(id=2, document=self.document, risk=self.risk1, control=self.control2, weight=7, likelihood=4) + DocumentRiskControl.objects.create(id=3, document=self.document, risk=self.risk2, control=self.control1, weight=8, likelihood=2) template_content = """ - segment_type: "h1" content: "{{ document.organization.name }} - Risk Report" - - segment_type: "body" - content: "Document ID: {{ document.id }}" - - segment_type: "body" + - segment_type: "p" content: "Created at: {{ document.created_at|date:'Y-m-d' }}" - segment_type: "h2" - content: "Risks" + content: "Top 10 Risk Identified" + - segment_type: "table" + content: | + + + + + + + + + + {% for item in risks_with_controls %} + + + + + + + + + {% endfor %} +
Risk IDRisk NameInherent ImpactInherent LikelihoodInherent Risk ScoreDescription of Risk
{{ item.risk.id }}{{ item.risk.name }} - - - -
+ - segment_type: "image" + content: "data:image/png;base64,{{ graph }}" + - segment_type: "h2" + content: "Risks with Controls" - segment_type: "body" content: | {% for item in risks_with_controls %} @@ -73,12 +104,44 @@ class DocumentViewTest(TestCase): self.template = DocumentTemplate.objects.create(id=1, name="Default Template", content=template_content) def test_document_view(self): - url = reverse('core:document', kwargs={'document_id': self.document.id}) - response = self.client.get(url) + response = self.client.get(reverse('core:document', kwargs={'document_id': self.document.id})) self.assertEqual(response.status_code, 200) - self.assertContains(response, str(self.document.id)) - self.assertContains(response, self.organization.name) - self.assertContains(response, self.document.created_at.strftime('%Y-%m-%d')) - self.assertContains(response, self.risk1.risk_name) - self.assertContains(response, self.control1.name) - self.assertContains(response, "Weight: 5") \ No newline at end of file + self.assertTemplateUsed(response, 'document.html') + self.assertContains(response, self.organization.name) + + def test_index_view(self): + response = self.client.get(reverse('core:index')) + self.assertEqual(response.status_code, 200) + self.assertTemplateUsed(response, 'index.html') + + def test_signup_view_get(self): + response = self.client.get(reverse('core:signup')) + self.assertEqual(response.status_code, 200) + self.assertTemplateUsed(response, 'signup.html') + + def test_thankyou_view(self): + response = self.client.get(reverse('core:thankyou')) + self.assertEqual(response.status_code, 200) + self.assertTemplateUsed(response, 'thankyou.html') + + def test_payment_page_view_get(self): + response = self.client.get(reverse('core:payment_page') + '?email=test@example.com') + self.assertEqual(response.status_code, 200) + self.assertTemplateUsed(response, 'payment.html') + + def test_template_preview_view(self): + response = self.client.get(reverse('core:template_preview', args=[self.template.name])) + self.assertEqual(response.status_code, 200) + self.assertTemplateUsed(response, 'template_preview.html') + + def test_pdf_view(self): + with patch('backend.core.views.generate_pdf') as mock_generate_pdf: + mock_response = HttpResponse(b'%PDF-1.4...', content_type='application/pdf') + mock_generate_pdf.return_value = mock_response + + response = self.client.get(reverse('core:pdf_view', args=[self.document.id])) + + self.assertEqual(response.status_code, 200) + self.assertEqual(response['Content-Type'], 'application/pdf') + self.assertIn(b'%PDF', response.content[:10]) + mock_generate_pdf.assert_called_once_with(self.document) \ No newline at end of file diff --git a/backend/core/utils.py b/backend/core/utils.py index 308fd15..c696b23 100644 --- a/backend/core/utils.py +++ b/backend/core/utils.py @@ -5,6 +5,14 @@ from weasyprint import HTML from django.http import HttpResponse from PIL import Image import io +import base64 +import matplotlib +matplotlib.use('Agg') +import matplotlib.pyplot as plt +from django.contrib.staticfiles.finders import find +import matplotlib.image as mpimg + + def extract_organization_details(organization): excluded_fields = {"name", "email"} @@ -59,7 +67,6 @@ def get_top_risk(organization): ) risk_ids = response.choices[0].message.content.strip().split(",") - print(f"Risks: {risk_ids}") return [int(risk_id) for risk_id in risk_ids if risk_id.isdigit()] @@ -80,22 +87,28 @@ def get_controls_for_risk(risk, organization): prompt = f""" You are an expert in cybersecurity risk management. Given the risk "{risk.risk_name}" and its associated organization details "{organization_details}", - your task is to select **exactly 10 unique controls** from the provided list that best mitigate this risk. Each control should be assigned a weight between **1 and 10** based on its effectiveness in reducing the risk. + your task is to select **exactly 10 unique controls** from the provided list that best mitigate this risk. Each control should be assigned: + - A weight between **1 and 10** (1 = low impact, 10 = high impact). + - A likelihood score between **1 and 10** (1 = rare occurrence, 10 = highly likely). + ### Rules: 1. **Each control ID must be unique** (no duplicates). - 2. **Only return control IDs and weights** in the exact format below. + 2. **Only return control IDs, weights, and likelihood scores** in the exact format below. 3. **Weights must be between 1 and 10** (1 = low impact, 10 = high impact). - 4. **Do NOT add explanations, descriptions, or extra text.** - 5. **Ensure that control IDs are randomly distributed and diverse across different categories.** + 4. **Likelihood scores must be between 1 and 10** (1 = rare occurrence, 10 = highly likely). + 5. **Do NOT add explanations, descriptions, or extra text.** + 6. **Ensure that control IDs are randomly distributed and diverse across different categories.** ### Available Controls: {control_list} ### Expected Response Format (STRICTLY FOLLOW THIS FORMAT): - : - : + : : + : : + ### Example Correct Response (NO DUPLICATES): - 12 : 8 - 45 : 7 + 12 : 8 : 90 + 45 : 7 : 60 + ⚠️ **If you provide duplicate control IDs, your response will be rejected. Ensure all control IDs are unique.** ⚠️ **Follow the response format exactly. Any deviation will be considered invalid.** """ @@ -108,22 +121,23 @@ def get_controls_for_risk(risk, organization): for line in result.split("\n"): line = line.strip() parts = line.split(":") - if len(parts) == 2: + if len(parts) == 3: control_id_str = parts[0].replace("ID:", "").replace("id:", "").replace("Id:", "").strip() weight_str = parts[1].strip().replace("Weight:", "").replace("weight:", "").strip() - print(f"Control:{control_id_str} Weight:{weight_str}") - print(f"ControlType: {type(control_id_str)} WeightType: {type(weight_str)}") + likelihood_str = parts[2].strip().replace("Likelihood:", "").replace("likelihood:", "").strip() control_id_str = ''.join(filter(str.isdigit, control_id_str)) weight_str = ''.join(filter(str.isdigit, weight_str)) + likelihood_str = ''.join(filter(str.isdigit, likelihood_str)) - if control_id_str and weight_str: + if control_id_str and weight_str and likelihood_str: try: control_id = int(control_id_str) weight = int(weight_str) + likelihood = int(likelihood_str) - if control_id in valid_control_ids and 1 <= weight <= 10 and control_id not in control_ids_seen: - selected_controls.append((control_id, weight)) + if control_id in valid_control_ids and 1 <= weight <= 10 and 1 <= likelihood <= 10 and control_id not in control_ids_seen: + selected_controls.append((control_id, weight, likelihood)) control_ids_seen.add(control_id) except ValueError: continue @@ -137,23 +151,30 @@ def get_controls_for_risk(risk, organization): remaining_controls_list = [f"Control ID: {cid}, Control Name: {control_map[cid]}" for cid in remaining_controls] retry_prompt = f""" - You are an expert in cybersecurity risk management. Given the risk "{risk.risk_name}" and its associated organization details "{organization_details}", - your task is to select **exactly {missing_count} unique controls** from the provided list that best mitigate this risk. Each control should be assigned a weight between **1 and 10** based on its effectiveness in reducing the risk. + You are an expert in cybersecurity risk management. Given the risk "{risk.risk_name}" and the organization's details "{organization_details}", + your task is to select **exactly {missing_count} unique controls** from the provided list that best mitigate this risk. Each control should be assigned: + - A **weight** between **1 and 10** based on its effectiveness in reducing the risk. + - A likelihood score between **1 and 10** (1 = rare occurrence, 10 = highly likely). + ### Rules: 1. **Each control ID must be unique** (no duplicates). - 2. **Only return control IDs and weights** in the exact format below. + 2. **Only return control IDs, weights, and likelihood scores** in the exact format below. 3. **Weights must be between 1 and 10** (1 = low impact, 10 = high impact). - 4. **Do NOT add explanations, descriptions, or extra text.** - 5. **Ensure that control IDs are randomly distributed and diverse across different categories.** + 4. **Likelihood scores must be between 1 and 10** (1 = rare occurrence, 10 = highly likely). + 5. **Do NOT add explanations, descriptions, or extra text.** + 6. **Ensure that control IDs are diverse and well-distributed across different categories.** + ### Available Controls: {remaining_controls_list} ### Expected Response Format (STRICTLY FOLLOW THIS FORMAT): - : - : + : : + : : + ### Example Correct Response (NO DUPLICATES): - 12 : 8 - 45 : 7 + 12 : 8 : 85 + 45 : 7 : 60 + ⚠️ **If you provide duplicate control IDs, your response will be rejected. Ensure all control IDs are unique.** ⚠️ **Follow the response format exactly. Any deviation will be considered invalid.** """ @@ -162,22 +183,24 @@ def get_controls_for_risk(risk, organization): for line in result.split("\n"): line = line.strip() parts = line.split(":") - if len(parts) == 2: + if len(parts) == 3: control_id_str = parts[0].replace("ID:", "").replace("id:", "").replace("Id:", "").strip() weight_str = parts[1].strip().replace("Weight:", "").replace("weight:", "").strip() - print(f"Control:{control_id} Weight:{weight_str}") - print(f"ControlType: {type(control_id)} WeightType: {type(weight_str)}") - + likelihood_str = parts[2].strip().replace("Likelihood:", "").replace("likelihood:", "").strip() + + control_id_str = ''.join(filter(str.isdigit, control_id_str)) weight_str = ''.join(filter(str.isdigit, weight_str)) + likelihood_str = ''.join(filter(str.isdigit, likelihood_str)) - if control_id_str and weight_str: + if control_id_str and weight_str and likelihood_str: try: control_id = int(control_id_str) weight = int(weight_str) - - if control_id in valid_control_ids and 1 <= weight <= 10 and control_id not in control_ids_seen: - selected_controls.append((control_id, weight)) + likelihood = int(likelihood_str) + + if control_id in valid_control_ids and 1 <= weight <= 10 and 1 <= likelihood <= 10 and control_id not in control_ids_seen: + selected_controls.append((control_id, weight, likelihood)) control_ids_seen.add(control_id) except ValueError: continue @@ -207,4 +230,68 @@ def generate_first_page_image(document): images[0].save(img_io, format="JPEG", quality=90) img_io.seek(0) - return img_io \ No newline at end of file + return img_io + +def calculate_aggregate_weight(controls): + total_weight = sum(control['weight']for control in controls) + return total_weight + +def calculate_aggregate_likelihood(controls): + total_likelihood = sum(control['likelihood'] for control in controls) + return total_likelihood + +def map_weight_to_impact_likelihood(total_weight, total_likelihood, max_weight): + normalized_weight = total_weight / max_weight + + impact = min(10.0, max(1.0, normalized_weight * 10.0)) + likelihood = min(10.0, max(1.0, total_likelihood / 10.0)) + + return impact, likelihood + + +def generate_risk_graph(risks_with_controls): + impacts = [risk['impact'] for risk in risks_with_controls] + likelihoods = [risk['likelihood'] for risk in risks_with_controls] + risk_ids = [risk['risk']['id'] for risk in risks_with_controls] + + bg_img_path = find('img/graph_matrix (3).png') + bg_img = mpimg.imread(bg_img_path) + + fig, ax = plt.subplots(figsize=(10, 8)) + + ax.imshow(bg_img, extent=[0, 11.2, 0, 11.2], aspect='auto') + + scatter = ax.scatter( + likelihoods, impacts, + c="blue", edgecolors="white", s=500, alpha=0.9 + ) + + for i, risk_id in enumerate(risk_ids): + ax.annotate( + str(risk_id), + (likelihoods[i], impacts[i]), + color="white", + fontsize=12, + ha="center", + va="center", + weight="bold", + ) + + ax.set_xticks([]) + ax.set_yticks([]) + ax.set_xticklabels([]) + ax.set_yticklabels([]) + + ax.spines['top'].set_visible(False) + ax.spines['right'].set_visible(False) + ax.spines['left'].set_visible(False) + ax.spines['bottom'].set_visible(False) + + buffer = io.BytesIO() + plt.savefig(buffer, format="png", transparent=True, bbox_inches='tight', pad_inches=0) + buffer.seek(0) + image_png = buffer.getvalue() + buffer.close() + plt.close() + + return base64.b64encode(image_png).decode("utf-8") \ No newline at end of file diff --git a/backend/core/views.py b/backend/core/views.py index 94cf41e..39d5e61 100644 --- a/backend/core/views.py +++ b/backend/core/views.py @@ -3,11 +3,11 @@ import yaml from django.shortcuts import render, redirect , get_object_or_404 from .forms import OrganizationForm -from .models import Organization,Document, DocumentTemplate,DocumentRiskControl +from .models import Organization,Document, DocumentTemplate,DocumentRiskControl,Risk from backend.accounts.utils import send_confirmation_email, send_document_email from django.contrib.admin.views.decorators import staff_member_required from django.template import Template, Context -from .utils import generate_pdf +from .utils import generate_pdf, map_weight_to_impact_likelihood, calculate_aggregate_weight, calculate_aggregate_likelihood, generate_risk_graph from django.conf import settings site_domain = settings.SITE_DOMAIN @@ -67,15 +67,24 @@ def document(request, document_id): controls = ( DocumentRiskControl.objects .filter(document=document, risk_id=risk['id']) - .values('control', 'control__name', 'weight') + .values('control', 'control__name', 'weight', 'likelihood') .distinct() ) - + max_weight = 10*10 + total_weight = calculate_aggregate_weight(controls) + total_likelihood = calculate_aggregate_likelihood(controls) + impact, likelihood = map_weight_to_impact_likelihood(total_weight, total_likelihood, max_weight) risks_with_controls.append({ 'risk': risk, - 'controls': list(controls) + 'controls': list(controls), + 'total_weight': total_weight, + 'impact': impact, + 'likelihood': likelihood, + 'risk_score': (round(impact) * round(likelihood)) }) + graph_base64 = generate_risk_graph(risks_with_controls) + template_obj = get_object_or_404(DocumentTemplate, name="Default Template") template_content = template_obj.content @@ -85,14 +94,29 @@ def document(request, document_id): return render(request, 'error.html', {'error_message': 'Error parsing template.'}) context = { 'document': document, - 'risks_with_controls': risks_with_controls + 'risks_with_controls': risks_with_controls, + 'graph': graph_base64, } rendered_content = "" for segment in template_segments: - content = segment['content'] - django_template = Template(content) - rendered_content += django_template.render(Context(context)) + content = segment.get('content', '') + segment_type = segment.get('segment_type', '') + django_template = Template(content) + processed_content = django_template.render(Context(context)) + + if segment_type == "h1": + rendered_content += f"

{processed_content}

\n" + elif segment_type == "h2": + rendered_content += f"

{processed_content}

\n" + elif segment_type == "h3": + rendered_content += f"

{processed_content}

\n" + elif segment_type == "p": + rendered_content += f"

{processed_content}

\n" + elif segment_type == "image": + rendered_content += f'Risk Graph\n' + else: + rendered_content += processed_content return render(request, 'document.html', {'rendered_html': rendered_content}) diff --git a/document_template.yml b/document_template.yml index b2321ba..30f3979 100644 --- a/document_template.yml +++ b/document_template.yml @@ -1,11 +1,40 @@ - segment_type: "h1" - content: "{{ document.organization.name }} - Risk Report" + content: "{{ document.organization.name }} - Risk Report " -- segment_type: "body" +- segment_type: "p" content: "Created at: {{ document.created_at|date:'Y-m-d' }}" - segment_type: "h2" - content: "Risks" + content: "Top 10 Risk Identified" + +- segment_type: "table" + content: | + + + + + + + + + + {% for item in risks_with_controls %} + + + + + + + + + {% endfor %} +
Risk IDRisk NameInherent Impact Interent Liklihood Inherent Risk Score Description of Risk
{{ item.risk.id }}{{ item.risk.name }}{{ item.impact|floatformat:0 }}{{ item.likelihood|floatformat:0 }}{{ item.risk_score }} -
+ +- segment_type: "image" + content: "data:image/png;base64,{{ graph }}" + +- segment_type: "h2" + content: "Risks with Controls" - segment_type: "body" content: | @@ -22,4 +51,4 @@ {% endfor %}
- {% endfor %} \ No newline at end of file + {% endfor %}