2025-02-10 13:49:15 +01:00
|
|
|
|
from openai import OpenAI
|
|
|
|
|
|
from django.conf import settings
|
2025-02-17 21:40:08 +01:00
|
|
|
|
from .models import Risk, Control
|
2025-02-24 10:19:27 +01:00
|
|
|
|
from weasyprint import HTML
|
|
|
|
|
|
from django.http import HttpResponse
|
|
|
|
|
|
from PIL import Image
|
|
|
|
|
|
import io
|
2025-03-27 23:57:31 +01:00
|
|
|
|
import base64
|
|
|
|
|
|
import matplotlib
|
|
|
|
|
|
matplotlib.use('Agg')
|
|
|
|
|
|
import matplotlib.pyplot as plt
|
|
|
|
|
|
from django.contrib.staticfiles.finders import find
|
|
|
|
|
|
import matplotlib.image as mpimg
|
2025-04-19 19:34:21 +02:00
|
|
|
|
site_domain = settings.SITE_DOMAIN
|
|
|
|
|
|
|
2025-03-27 23:57:31 +01:00
|
|
|
|
|
|
|
|
|
|
|
2025-02-10 13:49:15 +01:00
|
|
|
|
|
2025-02-18 11:40:26 +01:00
|
|
|
|
def extract_organization_details(organization):
|
|
|
|
|
|
excluded_fields = {"name", "email"}
|
2025-02-10 13:49:15 +01:00
|
|
|
|
risk_data = {}
|
|
|
|
|
|
|
|
|
|
|
|
for field in organization._meta.get_fields():
|
|
|
|
|
|
if field.name not in excluded_fields and hasattr(organization, field.name):
|
|
|
|
|
|
value = getattr(organization, field.name)
|
|
|
|
|
|
if value:
|
2025-02-18 11:40:26 +01:00
|
|
|
|
help_text = getattr(field, 'help_text', '').strip()
|
|
|
|
|
|
key = help_text if help_text else field.name
|
|
|
|
|
|
risk_data[key] = value
|
2025-02-10 13:49:15 +01:00
|
|
|
|
return risk_data
|
|
|
|
|
|
|
|
|
|
|
|
def get_top_risk(organization):
|
|
|
|
|
|
client = OpenAI(api_key=settings.OPENAI_API_KEY)
|
|
|
|
|
|
|
|
|
|
|
|
all_risks = Risk.objects.all()
|
|
|
|
|
|
|
|
|
|
|
|
risk_list = []
|
|
|
|
|
|
for risk in all_risks:
|
|
|
|
|
|
risk_list.append(f"""
|
|
|
|
|
|
Risk ID: {risk.risk_id}
|
|
|
|
|
|
Category: {risk.category}
|
|
|
|
|
|
Name: {risk.risk_name}
|
|
|
|
|
|
Primary Impact: {risk.primary_impact}
|
2025-02-17 21:40:08 +01:00
|
|
|
|
Secondary Impact: {risk.secondary_impact}
|
|
|
|
|
|
Tertiary Impact: {risk.tretiary_impact}
|
|
|
|
|
|
Detection Difficulty: {risk.detection_difficulty}
|
|
|
|
|
|
Recovery Complexity: {risk.recovery_complexity}
|
|
|
|
|
|
Business Impact Severity: {risk.businnes_impact_severity}
|
2025-02-10 13:49:15 +01:00
|
|
|
|
""")
|
|
|
|
|
|
|
2025-02-18 11:40:26 +01:00
|
|
|
|
organization_details = extract_organization_details(organization)
|
2025-02-10 13:49:15 +01:00
|
|
|
|
|
|
|
|
|
|
prompt = f"""
|
|
|
|
|
|
You are an AI risk assessor. Based on the following company details and list of known risks,
|
|
|
|
|
|
identify the 10 most critical risks for this company. Respond only with risk IDs.
|
|
|
|
|
|
|
|
|
|
|
|
Company Details:
|
2025-02-18 11:40:26 +01:00
|
|
|
|
{organization_details}
|
2025-02-10 13:49:15 +01:00
|
|
|
|
|
|
|
|
|
|
List of Risks:
|
|
|
|
|
|
{risk_list}
|
|
|
|
|
|
|
|
|
|
|
|
Provide only the 10 most critical risk IDs in a simple comma-separated format, e.g "1,3,7,12,..."
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
response = client.chat.completions.create(
|
2025-02-17 21:40:08 +01:00
|
|
|
|
model="gpt-4o-mini",
|
2025-02-10 13:49:15 +01:00
|
|
|
|
messages=[{"role": "system", "content": prompt}]
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
risk_ids = response.choices[0].message.content.strip().split(",")
|
|
|
|
|
|
|
|
|
|
|
|
return [int(risk_id) for risk_id in risk_ids if risk_id.isdigit()]
|
2025-02-14 17:52:51 +01:00
|
|
|
|
|
2025-02-17 21:40:08 +01:00
|
|
|
|
def get_controls_for_risk(risk, organization):
|
2025-02-14 17:52:51 +01:00
|
|
|
|
client = OpenAI(api_key=settings.OPENAI_API_KEY)
|
|
|
|
|
|
all_controls = Control.objects.all()
|
2025-02-18 11:40:26 +01:00
|
|
|
|
organization_details = extract_organization_details(organization)
|
2025-02-18 21:49:07 +01:00
|
|
|
|
control_list = [f"Control ID: {control.id}, Control Name: {control.name}" for control in all_controls]
|
2025-02-17 21:40:08 +01:00
|
|
|
|
valid_control_ids = {control.id for control in all_controls}
|
2025-02-18 13:54:30 +01:00
|
|
|
|
control_map = {control.id: control.name for control in all_controls}
|
2025-02-17 21:40:08 +01:00
|
|
|
|
|
2025-02-18 21:49:07 +01:00
|
|
|
|
def fetch_controls(prompt):
|
|
|
|
|
|
response = client.chat.completions.create(
|
|
|
|
|
|
model="gpt-4o-mini",
|
|
|
|
|
|
messages=[{"role": "system", "content": prompt}]
|
|
|
|
|
|
)
|
|
|
|
|
|
return response.choices[0].message.content.strip()
|
2025-02-14 17:52:51 +01:00
|
|
|
|
|
|
|
|
|
|
prompt = f"""
|
2025-02-18 11:49:49 +01:00
|
|
|
|
You are an expert in cybersecurity risk management. Given the risk "{risk.risk_name}" and its associated organization details "{organization_details}",
|
2025-03-27 23:57:31 +01:00
|
|
|
|
your task is to select **exactly 10 unique controls** from the provided list that best mitigate this risk. Each control should be assigned:
|
|
|
|
|
|
- A weight between **1 and 10** (1 = low impact, 10 = high impact).
|
|
|
|
|
|
- A likelihood score between **1 and 10** (1 = rare occurrence, 10 = highly likely).
|
|
|
|
|
|
|
2025-02-17 21:40:08 +01:00
|
|
|
|
### Rules:
|
|
|
|
|
|
1. **Each control ID must be unique** (no duplicates).
|
2025-03-27 23:57:31 +01:00
|
|
|
|
2. **Only return control IDs, weights, and likelihood scores** in the exact format below.
|
2025-02-17 21:40:08 +01:00
|
|
|
|
3. **Weights must be between 1 and 10** (1 = low impact, 10 = high impact).
|
2025-03-27 23:57:31 +01:00
|
|
|
|
4. **Likelihood scores must be between 1 and 10** (1 = rare occurrence, 10 = highly likely).
|
|
|
|
|
|
5. **Do NOT add explanations, descriptions, or extra text.**
|
|
|
|
|
|
6. **Ensure that control IDs are randomly distributed and diverse across different categories.**
|
2025-02-17 21:40:08 +01:00
|
|
|
|
### Available Controls:
|
2025-02-14 17:52:51 +01:00
|
|
|
|
{control_list}
|
|
|
|
|
|
|
2025-02-17 21:40:08 +01:00
|
|
|
|
### Expected Response Format (STRICTLY FOLLOW THIS FORMAT):
|
2025-03-27 23:57:31 +01:00
|
|
|
|
<control_id> : <weight> : <likelihood>
|
|
|
|
|
|
<control_id> : <weight> : <likelihood>
|
|
|
|
|
|
|
2025-02-17 21:40:08 +01:00
|
|
|
|
### Example Correct Response (NO DUPLICATES):
|
2025-03-27 23:57:31 +01:00
|
|
|
|
12 : 8 : 90
|
|
|
|
|
|
45 : 7 : 60
|
|
|
|
|
|
|
2025-02-17 21:40:08 +01:00
|
|
|
|
⚠️ **If you provide duplicate control IDs, your response will be rejected. Ensure all control IDs are unique.**
|
|
|
|
|
|
⚠️ **Follow the response format exactly. Any deviation will be considered invalid.**
|
|
|
|
|
|
"""
|
2025-02-14 17:52:51 +01:00
|
|
|
|
|
2025-02-18 21:49:07 +01:00
|
|
|
|
selected_controls = []
|
|
|
|
|
|
control_ids_seen = set()
|
|
|
|
|
|
|
|
|
|
|
|
result = fetch_controls(prompt)
|
|
|
|
|
|
|
|
|
|
|
|
for line in result.split("\n"):
|
|
|
|
|
|
line = line.strip()
|
|
|
|
|
|
parts = line.split(":")
|
2025-03-27 23:57:31 +01:00
|
|
|
|
if len(parts) == 3:
|
2025-02-18 21:49:07 +01:00
|
|
|
|
control_id_str = parts[0].replace("ID:", "").replace("id:", "").replace("Id:", "").strip()
|
|
|
|
|
|
weight_str = parts[1].strip().replace("Weight:", "").replace("weight:", "").strip()
|
2025-03-27 23:57:31 +01:00
|
|
|
|
likelihood_str = parts[2].strip().replace("Likelihood:", "").replace("likelihood:", "").strip()
|
2025-02-18 21:49:07 +01:00
|
|
|
|
|
|
|
|
|
|
control_id_str = ''.join(filter(str.isdigit, control_id_str))
|
|
|
|
|
|
weight_str = ''.join(filter(str.isdigit, weight_str))
|
2025-03-27 23:57:31 +01:00
|
|
|
|
likelihood_str = ''.join(filter(str.isdigit, likelihood_str))
|
2025-02-18 21:49:07 +01:00
|
|
|
|
|
2025-03-27 23:57:31 +01:00
|
|
|
|
if control_id_str and weight_str and likelihood_str:
|
2025-02-18 21:49:07 +01:00
|
|
|
|
try:
|
|
|
|
|
|
control_id = int(control_id_str)
|
|
|
|
|
|
weight = int(weight_str)
|
2025-03-27 23:57:31 +01:00
|
|
|
|
likelihood = int(likelihood_str)
|
2025-02-18 21:49:07 +01:00
|
|
|
|
|
2025-03-27 23:57:31 +01:00
|
|
|
|
if control_id in valid_control_ids and 1 <= weight <= 10 and 1 <= likelihood <= 10 and control_id not in control_ids_seen:
|
|
|
|
|
|
selected_controls.append((control_id, weight, likelihood))
|
2025-02-18 21:49:07 +01:00
|
|
|
|
control_ids_seen.add(control_id)
|
|
|
|
|
|
except ValueError:
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
if len(selected_controls) == 10:
|
|
|
|
|
|
return selected_controls
|
|
|
|
|
|
|
|
|
|
|
|
while len(selected_controls) < 10:
|
|
|
|
|
|
missing_count = 10 - len(selected_controls)
|
|
|
|
|
|
remaining_controls = valid_control_ids - control_ids_seen
|
|
|
|
|
|
remaining_controls_list = [f"Control ID: {cid}, Control Name: {control_map[cid]}" for cid in remaining_controls]
|
|
|
|
|
|
|
|
|
|
|
|
retry_prompt = f"""
|
2025-03-27 23:57:31 +01:00
|
|
|
|
You are an expert in cybersecurity risk management. Given the risk "{risk.risk_name}" and the organization's details "{organization_details}",
|
|
|
|
|
|
your task is to select **exactly {missing_count} unique controls** from the provided list that best mitigate this risk. Each control should be assigned:
|
|
|
|
|
|
- A **weight** between **1 and 10** based on its effectiveness in reducing the risk.
|
|
|
|
|
|
- A likelihood score between **1 and 10** (1 = rare occurrence, 10 = highly likely).
|
|
|
|
|
|
|
2025-02-18 21:49:07 +01:00
|
|
|
|
### Rules:
|
|
|
|
|
|
1. **Each control ID must be unique** (no duplicates).
|
2025-03-27 23:57:31 +01:00
|
|
|
|
2. **Only return control IDs, weights, and likelihood scores** in the exact format below.
|
2025-02-18 21:49:07 +01:00
|
|
|
|
3. **Weights must be between 1 and 10** (1 = low impact, 10 = high impact).
|
2025-03-27 23:57:31 +01:00
|
|
|
|
4. **Likelihood scores must be between 1 and 10** (1 = rare occurrence, 10 = highly likely).
|
|
|
|
|
|
5. **Do NOT add explanations, descriptions, or extra text.**
|
|
|
|
|
|
6. **Ensure that control IDs are diverse and well-distributed across different categories.**
|
|
|
|
|
|
|
2025-02-18 21:49:07 +01:00
|
|
|
|
### Available Controls:
|
|
|
|
|
|
{remaining_controls_list}
|
|
|
|
|
|
|
|
|
|
|
|
### Expected Response Format (STRICTLY FOLLOW THIS FORMAT):
|
2025-03-27 23:57:31 +01:00
|
|
|
|
<control_id> : <weight> : <likelihood>
|
|
|
|
|
|
<control_id> : <weight> : <likelihood>
|
|
|
|
|
|
|
2025-02-18 21:49:07 +01:00
|
|
|
|
### Example Correct Response (NO DUPLICATES):
|
2025-03-27 23:57:31 +01:00
|
|
|
|
12 : 8 : 85
|
|
|
|
|
|
45 : 7 : 60
|
|
|
|
|
|
|
2025-02-18 21:49:07 +01:00
|
|
|
|
⚠️ **If you provide duplicate control IDs, your response will be rejected. Ensure all control IDs are unique.**
|
|
|
|
|
|
⚠️ **Follow the response format exactly. Any deviation will be considered invalid.**
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
result = fetch_controls(retry_prompt)
|
2025-02-17 21:40:08 +01:00
|
|
|
|
for line in result.split("\n"):
|
|
|
|
|
|
line = line.strip()
|
|
|
|
|
|
parts = line.split(":")
|
2025-03-27 23:57:31 +01:00
|
|
|
|
if len(parts) == 3:
|
2025-02-17 21:40:08 +01:00
|
|
|
|
control_id_str = parts[0].replace("ID:", "").replace("id:", "").replace("Id:", "").strip()
|
|
|
|
|
|
weight_str = parts[1].strip().replace("Weight:", "").replace("weight:", "").strip()
|
2025-03-27 23:57:31 +01:00
|
|
|
|
likelihood_str = parts[2].strip().replace("Likelihood:", "").replace("likelihood:", "").strip()
|
|
|
|
|
|
|
|
|
|
|
|
|
2025-02-17 21:40:08 +01:00
|
|
|
|
control_id_str = ''.join(filter(str.isdigit, control_id_str))
|
|
|
|
|
|
weight_str = ''.join(filter(str.isdigit, weight_str))
|
2025-03-27 23:57:31 +01:00
|
|
|
|
likelihood_str = ''.join(filter(str.isdigit, likelihood_str))
|
2025-02-18 21:49:07 +01:00
|
|
|
|
|
2025-03-27 23:57:31 +01:00
|
|
|
|
if control_id_str and weight_str and likelihood_str:
|
2025-02-18 21:49:07 +01:00
|
|
|
|
try:
|
|
|
|
|
|
control_id = int(control_id_str)
|
|
|
|
|
|
weight = int(weight_str)
|
2025-03-27 23:57:31 +01:00
|
|
|
|
likelihood = int(likelihood_str)
|
|
|
|
|
|
|
|
|
|
|
|
if control_id in valid_control_ids and 1 <= weight <= 10 and 1 <= likelihood <= 10 and control_id not in control_ids_seen:
|
|
|
|
|
|
selected_controls.append((control_id, weight, likelihood))
|
2025-02-18 21:49:07 +01:00
|
|
|
|
control_ids_seen.add(control_id)
|
|
|
|
|
|
except ValueError:
|
2025-02-18 16:21:00 +01:00
|
|
|
|
continue
|
2025-02-17 21:40:08 +01:00
|
|
|
|
|
2025-02-18 21:49:07 +01:00
|
|
|
|
if not remaining_controls:
|
|
|
|
|
|
break
|
|
|
|
|
|
return selected_controls if len(selected_controls) == 10 else []
|
2025-02-24 10:19:27 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_pdf(document):
|
2025-04-19 19:34:21 +02:00
|
|
|
|
document_link = f"{site_domain}/document/{document.id}/"
|
2025-02-24 10:19:27 +01:00
|
|
|
|
pdf_content = HTML(url=document_link).write_pdf()
|
|
|
|
|
|
|
|
|
|
|
|
response = HttpResponse(pdf_content, content_type='application/pdf')
|
|
|
|
|
|
response['Content-Disposition'] = f'inline; filename=document_{document.id}.pdf'
|
|
|
|
|
|
return response
|
|
|
|
|
|
|
|
|
|
|
|
def generate_first_page_image(document):
|
2025-04-19 19:34:21 +02:00
|
|
|
|
document_link = f"{site_domain}/document/{document.id}/"
|
2025-02-24 10:19:27 +01:00
|
|
|
|
|
|
|
|
|
|
pdf_bytes = HTML(url=document_link).write_pdf()
|
|
|
|
|
|
|
|
|
|
|
|
from pdf2image import convert_from_bytes
|
|
|
|
|
|
images = convert_from_bytes(pdf_bytes, first_page=1, last_page=1)
|
|
|
|
|
|
|
|
|
|
|
|
img_io = io.BytesIO()
|
|
|
|
|
|
images[0].save(img_io, format="JPEG", quality=90)
|
|
|
|
|
|
img_io.seek(0)
|
|
|
|
|
|
|
2025-03-27 23:57:31 +01:00
|
|
|
|
return img_io
|
|
|
|
|
|
|
|
|
|
|
|
def calculate_aggregate_weight(controls):
|
|
|
|
|
|
total_weight = sum(control['weight']for control in controls)
|
|
|
|
|
|
return total_weight
|
|
|
|
|
|
|
|
|
|
|
|
def calculate_aggregate_likelihood(controls):
|
|
|
|
|
|
total_likelihood = sum(control['likelihood'] for control in controls)
|
|
|
|
|
|
return total_likelihood
|
|
|
|
|
|
|
|
|
|
|
|
def map_weight_to_impact_likelihood(total_weight, total_likelihood, max_weight):
|
|
|
|
|
|
normalized_weight = total_weight / max_weight
|
|
|
|
|
|
|
|
|
|
|
|
impact = min(10.0, max(1.0, normalized_weight * 10.0))
|
|
|
|
|
|
likelihood = min(10.0, max(1.0, total_likelihood / 10.0))
|
|
|
|
|
|
|
|
|
|
|
|
return impact, likelihood
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_risk_graph(risks_with_controls):
|
|
|
|
|
|
impacts = [risk['impact'] for risk in risks_with_controls]
|
|
|
|
|
|
likelihoods = [risk['likelihood'] for risk in risks_with_controls]
|
|
|
|
|
|
risk_ids = [risk['risk']['id'] for risk in risks_with_controls]
|
|
|
|
|
|
|
|
|
|
|
|
bg_img_path = find('img/graph_matrix (3).png')
|
|
|
|
|
|
bg_img = mpimg.imread(bg_img_path)
|
|
|
|
|
|
|
|
|
|
|
|
fig, ax = plt.subplots(figsize=(10, 8))
|
|
|
|
|
|
|
|
|
|
|
|
ax.imshow(bg_img, extent=[0, 11.2, 0, 11.2], aspect='auto')
|
|
|
|
|
|
|
|
|
|
|
|
scatter = ax.scatter(
|
|
|
|
|
|
likelihoods, impacts,
|
|
|
|
|
|
c="blue", edgecolors="white", s=500, alpha=0.9
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
for i, risk_id in enumerate(risk_ids):
|
|
|
|
|
|
ax.annotate(
|
|
|
|
|
|
str(risk_id),
|
|
|
|
|
|
(likelihoods[i], impacts[i]),
|
|
|
|
|
|
color="white",
|
|
|
|
|
|
fontsize=12,
|
|
|
|
|
|
ha="center",
|
|
|
|
|
|
va="center",
|
|
|
|
|
|
weight="bold",
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
ax.set_xticks([])
|
|
|
|
|
|
ax.set_yticks([])
|
|
|
|
|
|
ax.set_xticklabels([])
|
|
|
|
|
|
ax.set_yticklabels([])
|
|
|
|
|
|
|
|
|
|
|
|
ax.spines['top'].set_visible(False)
|
|
|
|
|
|
ax.spines['right'].set_visible(False)
|
|
|
|
|
|
ax.spines['left'].set_visible(False)
|
|
|
|
|
|
ax.spines['bottom'].set_visible(False)
|
|
|
|
|
|
|
|
|
|
|
|
buffer = io.BytesIO()
|
|
|
|
|
|
plt.savefig(buffer, format="png", transparent=True, bbox_inches='tight', pad_inches=0)
|
|
|
|
|
|
buffer.seek(0)
|
|
|
|
|
|
image_png = buffer.getvalue()
|
|
|
|
|
|
buffer.close()
|
|
|
|
|
|
plt.close()
|
|
|
|
|
|
|
|
|
|
|
|
return base64.b64encode(image_png).decode("utf-8")
|