Files
old-riskletpy/backend/core/utils.py

585 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from openai import OpenAI
from django.conf import settings
from .models import Risk, Control
from weasyprint import HTML
from django.http import HttpResponse
from PIL import Image
import io
import base64
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from django.contrib.staticfiles.finders import find
import matplotlib.image as mpimg
site_domain = settings.SITE_DOMAIN
import random
import re
def extract_organization_details(organization):
excluded_fields = {"name", "email"}
risk_data = {}
for field in organization._meta.get_fields():
if field.name not in excluded_fields and hasattr(organization, field.name):
value = getattr(organization, field.name)
if value:
help_text = getattr(field, 'help_text', '').strip()
key = help_text if help_text else field.name
risk_data[key] = value
return risk_data
def get_top_risk(organization):
client = OpenAI(api_key=settings.OPENAI_API_KEY)
all_risks = Risk.objects.all()
risk_list = []
for risk in all_risks:
risk_list.append(f"""
Risk ID: {risk.risk_id}
Category: {risk.category}
Name: {risk.risk_name}
Primary Impact: {risk.primary_impact}
Secondary Impact: {risk.secondary_impact}
Tertiary Impact: {risk.tretiary_impact}
Detection Difficulty: {risk.detection_difficulty}
Recovery Complexity: {risk.recovery_complexity}
Business Impact Severity: {risk.businnes_impact_severity}
""")
organization_details = extract_organization_details(organization)
prompt = f"""
You are an expert cybersecurity risk analyst. Your task is to identify
the top 10 most critical cybersecurity risks for a client based on
their specific company profile and a comprehensive risk catalog. Your
analysis must be logical, evidence-based, and directly tied to the
client's details.
Methodology:
Analyze the Company Profile: Carefully review all details provided
about the company, including its industry, size (revenue and
employees), IT dependency, regulatory requirements, and operational
characteristics (e.g., remote work, third-party vendors, internal
development).
Evaluate the Risk Catalog: Review the provided list of known risks.
Map Profile to Risks: Correlate specific details from the company
profile to the risks in the catalog. For example:
A company in the Financial sector subject to GDPR is highly
susceptible to "Privacy Regulation Violation" (Risk ID 61).
A company with significant "Internal Software Development" is more
vulnerable to "CI/CD Pipeline Compromise" (Risk ID 30) and "Source
Code Exposure" (Risk ID 9).
High dependency on a "Cloud Provider" increases the criticality of
"Cloud Provider Service Outage" (Risk ID 20).
Prioritize by Impact: Determine the most critical risks by assessing
the potential impact (financial, operational, reputational, and
regulatory) on this specific company. A risk is critical if it poses a
severe threat to the company's core operations, data, or compliance
standing.
Final Selection: Select the 10 risks with the highest criticality and
provide a clear, concise justification for each choice.
Company Details:
{organization_details}
List of Risks:
{risk_list}
Required Output Format:
Provide your response as a numbered list from 1 to 10. For each item,
include the Risk ID, the Risk Name, and a brief, one-sentence
justification that links a specific company detail to why that risk is
critical.
Example:
Risk ID 18 (Ransomware Infection): Critical due to the company's high
IT dependency and the severe operational and financial impact a
ransomware event would cause.
Risk ID 61 (Privacy Regulation Violation): Critical because the
company operates under GDPR, making any breach of personal data a
significant legal and financial liability.
"""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "system", "content": prompt}]
)
content = response.choices[0].message.content.strip()
matches = re.findall(
r'Risk ID\s*(\d+)\s*\((.*?)\)\*\*:\s*(.+?)(?=\n\d+\.|\Z)', content, re.DOTALL
)
results = []
for risk_id, risk_name, explanation in matches:
results.append({
"risk_id": int(risk_id),
"risk_name": risk_name.strip(),
"explanation": explanation.strip()
})
return results
def get_controls_for_risk(risk, organization):
client = OpenAI(api_key=settings.OPENAI_API_KEY)
all_controls = Control.objects.all()
organization_details = extract_organization_details(organization)
control_list = [f"Control ID: {control.id}, Control Name: {control.subcategory} - {control.function or ''}".rstrip(" -") for control in all_controls]
valid_control_ids = {control.id for control in all_controls}
control_map = {control.id: (f"{control.subcategory} - {control.function or ''}").rstrip(" -") for control in all_controls}
def fetch_controls(prompt):
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "system", "content": prompt}]
)
return response.choices[0].message.content.strip()
prompt = f"""
You are a senior cybersecurity risk consultant. Your objective is to
analyze the risk "{risk.risk_name}" in the context of the
organization's profile and recommend the 10 most effective mitigating
controls from the provided list.
For each of the 10 selected controls, you must assign two scores from 1 to 5:
* **Weight (1-5):** This score represents the control's
**effectiveness in reducing the potential impact** of the risk.
* **1 (Low Impact Reduction):** A supplementary control with a
minor effect.
* **3 (Moderate Impact Reduction):** A standard control that
significantly reduces impact.
* **5 (High Impact Reduction):** A critical control that is
highly effective at minimizing the damage from this risk.
* **Likelihood (1-5):** This score represents the
control's **effectiveness in reducing the likelihood** that the risk
event will occur.
* **1 (Low Likelihood Reduction):** The control has a minimal
effect on preventing the event.
* **3 (Moderate Likelihood Reduction):** The control makes the
event considerably less likely.
* **5 (High Likelihood Reduction):** The control is a primary
defense that makes the event much less likely to happen.
**Rules:**
1. You must select **exactly 10 unique controls**. No duplicates.
2. Your output must **only** contain the control ID, Weight, and
Likelihood Reduction score.
3. Strictly adhere to the specified format. Do not add any
explanations or extra text.
---
**Risk to Analyze:** {risk.risk_name}
**Organization Details:** {organization_details}
**Available Controls:** {control_list}
**Expected Response Format (STRICTLY FOLLOW THIS FORMAT):**
<control_id> : <Weight> : <Likelihood>
**Example Correct Response:**
12 : 5 : 4
45 : 4 : 5
"""
selected_controls = []
control_ids_seen = set()
result = fetch_controls(prompt)
for line in result.split("\n"):
line = line.strip()
parts = line.split(":")
if len(parts) == 3:
control_id_str = parts[0].replace("ID:", "").replace("id:", "").replace("Id:", "").strip()
weight_str = parts[1].strip().replace("Weight:", "").replace("weight:", "").strip()
likelihood_str = parts[2].strip().replace("Likelihood:", "").replace("likelihood:", "").strip()
control_id_str = ''.join(filter(str.isdigit, control_id_str))
weight_str = ''.join(filter(str.isdigit, weight_str))
likelihood_str = ''.join(filter(str.isdigit, likelihood_str))
if control_id_str and weight_str and likelihood_str:
try:
control_id = int(control_id_str)
weight = int(weight_str)
likelihood = int(likelihood_str)
if control_id in valid_control_ids and 1 <= weight <= 5 and 1 <= likelihood <= 5 and control_id not in control_ids_seen:
selected_controls.append((control_id, weight, likelihood))
control_ids_seen.add(control_id)
except ValueError:
continue
if len(selected_controls) == 10:
return selected_controls
while len(selected_controls) < 10:
missing_count = 10 - len(selected_controls)
remaining_controls = valid_control_ids - control_ids_seen
remaining_controls_list = [f"Control ID: {cid}, Control Name: {control_map[cid]}" for cid in remaining_controls]
retry_prompt = f"""
You are a senior cybersecurity risk consultant. Your objective is to
analyze the risk "{risk.risk_name}" in the context of the
organization's profile and recommend the {missing_count} umost effective mitigating
controls from the provided list.
For each of the {missing_count} selected controls, you must assign two scores from 1 to 5:
* **Weight (1-5):** This score represents the control's
**effectiveness in reducing the potential impact** of the risk.
* **1 (Low Impact Reduction):** A supplementary control with a
minor effect.
* **3 (Moderate Impact Reduction):** A standard control that
significantly reduces impact.
* **5 (High Impact Reduction):** A critical control that is
highly effective at minimizing the damage from this risk.
* **Likelihood (1-5):** This score represents the
control's **effectiveness in reducing the likelihood** that the risk
event will occur.
* **1 (Low Likelihood Reduction):** The control has a minimal
effect on preventing the event.
* **3 (Moderate Likelihood Reduction):** The control makes the
event considerably less likely.
* **5 (High Likelihood Reduction):** The control is a primary
defense that makes the event much less likely to happen.
**Rules:**
1. You must select **exactly 10 unique controls**. No duplicates.
2. Your output must **only** contain the control ID, Weight, and
Likelihood score.
3. Strictly adhere to the specified format. Do not add any
explanations or extra text.
---
**Risk to Analyze:** {risk.risk_name}
**Organization Details:** {organization_details}
**Available Controls:** {remaining_controls_list}
**Expected Response Format (STRICTLY FOLLOW THIS FORMAT):**
<control_id> : <Weight> : <Likelihood>
**Example Correct Response:**
12 : 5 : 4
45 : 4 : 5
"""
result = fetch_controls(retry_prompt)
for line in result.split("\n"):
line = line.strip()
parts = line.split(":")
if len(parts) == 3:
control_id_str = parts[0].replace("ID:", "").replace("id:", "").replace("Id:", "").strip()
weight_str = parts[1].strip().replace("Weight:", "").replace("weight:", "").strip()
likelihood_str = parts[2].strip().replace("Likelihood:", "").replace("likelihood:", "").strip()
control_id_str = ''.join(filter(str.isdigit, control_id_str))
weight_str = ''.join(filter(str.isdigit, weight_str))
likelihood_str = ''.join(filter(str.isdigit, likelihood_str))
if control_id_str and weight_str and likelihood_str:
try:
control_id = int(control_id_str)
weight = int(weight_str)
likelihood = int(likelihood_str)
if control_id in valid_control_ids and 1 <= weight <= 5 and 1 <= likelihood <= 5 and control_id not in control_ids_seen:
selected_controls.append((control_id, weight, likelihood))
control_ids_seen.add(control_id)
except ValueError:
continue
if not remaining_controls:
break
return selected_controls if len(selected_controls) == 10 else []
def generate_recommendations(risks_with_controls, organization):
client = OpenAI(api_key=settings.OPENAI_API_KEY)
organization_details = extract_organization_details(organization)
prompt = f"""
You are an AI assistant tasked with generating the Recommendations section for a cybersecurity assessment report. Use the organizations context and the list of risks with their proposed controls to produce concise, actionable, and prioritized guidance.
Inputs:
- Organization details:
{organization_details}
- Risks with controls (Python-like list of dicts). Each item includes:
risk: id, name, category, risk_description (or similar)
r_impact (inherent impact 15), r_likelihood (inherent likelihood 15), risk_score
residual_impact, residual_likelihood, residual_risk_score (may be present)
controls: list of controls, each with control__subcategory and control__function, weight (15 effectiveness), likelihood (15 occurrence modifier)
Task:
1) Compute a priority score per control = weight × likelihood. Aggregate scores across all risks and cluster into 35 thematic areas that best match the actual controls and risk names (e.g., Access Control & MFA, Patch & Vulnerability Management, Vendor/Third-Party Risk Management, Network Security & Segmentation, Logging/Monitoring/Detection, Incident Response & BCDR, Ransomware Prevention & Recovery, Cryptography & Key Management). Do not invent themes without support in the inputs.
2) For each chosen theme, produce 35 concrete actions derived from the highest-priority controls. Tailor to the organization_details where appropriate. Prefer steps that reduce both likelihood and impact.
3) Each bullet should be 12 sentences: start with a clear, imperative recommendation, and (optionally) add a brief explanation or context. Still keep it concise and actionable.
4) Use only the control label (i.e., "subcategory - function") for reference—do NOT include or reference control IDs, years, or quarter references (Q1, Q2, Q3, Q4).
5) Do not introduce controls that are not represented in the provided controls list.
Output format (STRICT):
<23 sentence paragraph explaining that recommendations are prioritized by expected risk reduction based on the provided controls and aligned to the organizations context.>
<h3>Theme Title</h3>
- Bullet 1 (12 sentences, no IDs, years, or quarters)
- Bullet 2
- Bullet 3
- Bullet 4 (optional)
- Bullet 5 (optional)
Constraints:
- 35 themed subsections, each with 35 bullets.
- No preamble or postscript beyond the sections above.
- Do NOT reference or display control IDs, years, or quarters in any form.
Now produce the final Recommendations section using the actual inputs above.
Risks with controls:
{risks_with_controls}
"""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "system", "content": prompt}]
)
recommendations = response.choices[0].message.content.strip()
return recommendations
def generate_key_findings(document, top_10_risks):
client = OpenAI(api_key=settings.OPENAI_API_KEY)
def extract_organization_details(organization):
excluded_fields = {"email"}
risk_data = {}
for field in organization._meta.get_fields():
if field.name not in excluded_fields and hasattr(organization, field.name):
value = getattr(organization, field.name)
if value:
help_text = getattr(field, 'help_text', '').strip()
key = help_text if help_text else field.name
risk_data[key] = value
return risk_data
organization_details = extract_organization_details(document.organization)
prompt = f"""
You are an AI assistant tasked with generating a "Key Findings" section for a cybersecurity assessment report. Your output must be structured precisely, extracting and presenting the top 3 risks.
From the following list of risks, select the 3 most critical for the organization and generate the as specified.
List of risks:
{top_10_risks}
Organization details:
{organization_details}
Introduction: The description field must begin with the following exact text:
"The assessment revealed several areas where { document.organization.name } faces heightened cybersecurity risks. These risks pose significant threats to operational continuity, sensitive data, and regulatory compliance. The top risks identified are:"
Risk Presentation:
Identify the top 3 risks from the list above.
For each of these top 3 risks, present it as a bulleted item within the description field, following this format:
"- [Risk Name]: [Concise, professionally phrased description of the risk's significance in context of the organization, likelihood, or impact.]"
Description Derivation:
The [Risk Name] part should be the actual name of the risk from the input data (e.g., {{ item.risk.name }}).
The [Concise, professionally phrased description] part must be synthesized from the provided risk_description field (e.g., {{ item.risk_description }}) associated with that risk. Aim to create a polished, impactful summary that clearly explains the risk's context, severity, or contributing factors.
Return it as plain text in the following format:
Output Format(STRICT):
Introduction
- <b> Risk 1 </b> : Brief description of Risk 1
- <b> Risk 2 </b> : Brief description of Risk 2
- <b> Risk 3 </b> : Brief description of Risk 3
"""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "system", "content": prompt}]
)
key_findings = response.choices[0].message.content.strip()
return key_findings
def generate_pdf(document):
document_link = f"{site_domain}/document/{document.id}/"
pdf_content = HTML(url=document_link).write_pdf()
response = HttpResponse(pdf_content, content_type='application/pdf')
response['Content-Disposition'] = f'inline; filename=document_{document.id}.pdf'
return response
def generate_first_page_image(document):
document_link = f"{site_domain}/document/{document.id}/"
pdf_bytes = HTML(url=document_link).write_pdf()
from pdf2image import convert_from_bytes
images = convert_from_bytes(pdf_bytes, first_page=1, last_page=1)
img_io = io.BytesIO()
images[0].save(img_io, format="JPEG", quality=90)
img_io.seek(0)
return img_io
def calculate_aggregate_weight(controls):
total_weight = sum(control['weight']for control in controls)
return total_weight
def calculate_aggregate_likelihood(controls):
total_likelihood = sum(control['likelihood'] for control in controls)
return total_likelihood
def map_weight_to_impact_likelihood(total_weight, total_likelihood, max_weight):
impact = min(5.0, max(1.0, total_weight / 10.0))
likelihood = min(5.0, max(1.0, total_likelihood / 10.0))
return impact, likelihood
def _draw_risk_matrix_background(ax):
ax.set_xlim(0.5, 5.5)
ax.set_ylim(0.5, 5.5)
ax.set_aspect('equal')
def score_color(score: int) -> str:
if score <= 2:
return '#1abc9c'
if score <= 4:
return '#2ecc71'
if score <= 9:
return '#f1c40f'
if score <= 15:
return '#f39c12'
return '#e74c3c'
for y in range(1, 6):
for x in range(1, 6):
score = x * y
rect = plt.Rectangle(
(x - 0.5, y - 0.5), 1, 1,
facecolor=score_color(score), edgecolor='#dddddd', linewidth=1.0, zorder=0
)
ax.add_patch(rect)
text_color = '#000000' if 5 <= score <= 9 else '#ffffff'
font_weight = 'bold' if score >= 15 else 'normal'
ax.text(
x, y, str(score),
ha='center', va='center', fontsize=9,
color=text_color, alpha=0.95, zorder=1, fontweight=font_weight
)
ax.set_xlabel('Likelihood', labelpad=10)
ax.set_ylabel('Impact', labelpad=10)
ax.set_xticks([])
ax.set_yticks([])
ax.tick_params(length=0)
ax.grid(False)
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
def generate_risk_graph(risks_with_controls):
impacts = [risk['impact'] for risk in risks_with_controls]
likelihoods = [risk['likelihood'] for risk in risks_with_controls]
risk_ids = [risk['risk']['id'] for risk in risks_with_controls]
fig, ax = plt.subplots(figsize=(10, 8))
_draw_risk_matrix_background(ax)
scatter = ax.scatter(
likelihoods, impacts,
c="#1f6feb", edgecolors="white", linewidths=1.5, s=420, alpha=0.95, zorder=3
)
for i, risk_id in enumerate(risk_ids):
ax.annotate(
str(risk_id),
(likelihoods[i], impacts[i]),
color="white",
fontsize=12,
ha="center",
va="center",
weight="bold",
zorder=4,
)
buffer = io.BytesIO()
plt.savefig(buffer, format="png", transparent=True, bbox_inches='tight', pad_inches=0.1)
buffer.seek(0)
image_png = buffer.getvalue()
buffer.close()
plt.close()
return base64.b64encode(image_png).decode("utf-8")
def generate_residual_risk_graph(risks_with_controls):
impacts = [max(risk.get('impact', 0) - 1.0, 1.0) if risk.get('impact') else 2 for risk in risks_with_controls]
likelihoods = [max(risk.get('likelihood', 0) - 1.0, 1.0) if risk.get('likelihood') else 2 for risk in risks_with_controls]
risk_ids = [risk['risk']['id'] for risk in risks_with_controls]
fig, ax = plt.subplots(figsize=(10, 8))
_draw_risk_matrix_background(ax)
scatter = ax.scatter(
likelihoods, impacts,
c="#000000", edgecolors="white", linewidths=1.5, s=420, alpha=0.95, zorder=3
)
for i, risk_id in enumerate(risk_ids):
ax.annotate(
str(risk_id),
(likelihoods[i], impacts[i]),
color="white",
fontsize=12,
ha="center",
va="center",
weight="bold",
zorder=4,
)
buffer = io.BytesIO()
plt.savefig(buffer, format="png", transparent=True, bbox_inches='tight', pad_inches=0.1)
buffer.seek(0)
image_png = buffer.getvalue()
buffer.close()
plt.close()
return base64.b64encode(image_png).decode("utf-8")
def generate_demo_code(length=6):
chars = 'ABCDEFGHJKLMNPQRSTUVWXYZ23456789'
return ''.join(random.choices(chars, k=length))