Files
old-riskletpy/backend/core/admin.py
2025-10-20 19:52:11 +02:00

477 lines
22 KiB
Python

from django.contrib import admin
from .models import Document, DocumentSegment, Organization, Risk, Control, DocumentTemplate, DocumentRiskControl, DemoCode, PaymentRecord
from django.urls import reverse, path
from django.utils.html import format_html
from .utils import generate_demo_code, get_top_risk, get_controls_for_risk, generate_key_findings, generate_recommendations
from .tables import get_risk_table
from django.shortcuts import render, redirect
from .forms import GenerateCodesForm, RiskExplanationWidget
from django.conf import settings
from backend.accounts.utils import send_document_email
from django import forms
from django.contrib.admin.widgets import FilteredSelectMultiple
import logging
from django.contrib import messages
from django.db import transaction
import re
logger = logging.getLogger(__name__)
class DocumentRiskControlInline(admin.TabularInline):
model = DocumentRiskControl
extra = 2
max_num = 10
can_delete = False
fields = ('risk', 'control', 'weight', 'likelihood')
def get_formset(self, request, obj=None, **kwargs):
formset = super().get_formset(request, obj, **kwargs)
try:
if request.method == 'POST' and 'organization_risks' in request.POST:
risk_ids = request.POST.getlist('organization_risks')
formset.form.base_fields['risk'].queryset = Risk.objects.filter(pk__in=risk_ids)
elif obj:
formset.form.base_fields['risk'].queryset = obj.organization.risks.all()
except Exception:
logger.exception("Error building DocumentRiskControlInline formset")
return formset
class DocumentAdminForm(forms.ModelForm):
organization_risks = forms.ModelMultipleChoiceField(
queryset=Risk.objects.all(),
required=False,
widget=FilteredSelectMultiple(verbose_name="Risks", is_stacked=False),
help_text="Edit the AI-selected risks for this organization."
)
risk_explanations = forms.Field(
required=False,
widget=RiskExplanationWidget,
help_text="Edit explanations for each risk.",
label=''
)
class Meta:
model = Document
fields = ['organization', 'risk_explanations', 'status', 'key_findings', 'recomendations']
class Media:
css = { 'all': ('admin/css/widgets.css',) }
js = (
'admin/js/SelectBox.js',
'admin/js/SelectFilter2.js',
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.instance and getattr(self.instance, 'organization_id', None):
self.fields['organization_risks'].initial = self.instance.organization.risks.all()
def clean_risk_explanations(self):
data = self.data
explanations = {}
for key in data:
if key.startswith('risk_explanations_risk_') and not key.startswith('risk_explanations_risk_new_'):
risk_id = data[key]
explanation_key = f"risk_explanations_explanation_{risk_id}"
explanation = data.get(explanation_key, "")
if risk_id:
explanations[str(risk_id)] = explanation
for i in range(10):
risk_id = data.get(f"risk_explanations_risk_new_{i}", "")
explanation = data.get(f"risk_explanations_explanation_new_{i}", "")
if risk_id:
explanations[str(risk_id)] = explanation
return explanations
class DocumentAdmin(admin.ModelAdmin):
change_form_template = "admin/core/document/change_form.html"
form = DocumentAdminForm
inlines = [DocumentRiskControlInline]
list_display = ('organization', 'status', 'created_at', 'modified_at', 'review_link')
list_filter = ('status', 'created_at')
search_fields = ['organization__name', 'organization__email']
readonly_fields = (
'created_at', 'modified_at', 'regen_note_action',
'regen_document_action', 'regen_top_risks_action', 'regen_controls_action', 'regen_keyfindings_action', 'regen_recommendations_action',
)
fieldsets = (
('Organization & Risks', {
'fields': ('organization', 'regen_note_action', 'regen_document_action', 'organization_risks', 'risk_explanations', 'regen_top_risks_action')
}),
('Key Findings', {
'fields': ('key_findings', 'regen_keyfindings_action')
}),
('Recommendations', {
'fields': ('recomendations', 'regen_recommendations_action')
}),
('Status', {
'fields': ('status',)
}),
('Timestamps', {
'fields': ('created_at', 'modified_at')
}),
)
def regen_note_action(self, obj):
return format_html(
'<div class="ai-callout">'
'⏰ <strong>Tip:</strong> Regenerating document can take some time since we depend on AI models to generate content.'
'</div>'
'<br/>'
)
regen_note_action.short_description = ''
def regen_top_risks_action(self,obj):
return format_html(
'<div class="ai-callout ai-callout-info">'
'💡 <strong>Recommended:</strong> after regenerating the Top 10 risks, also update Controls, Key Findings, and Recommendations.'
'</div>'
'<br/>'
'<button type="submit" name="_regen_top_risks" class="button">Regenerate Top 10 Risks using AI</button>'
)
regen_top_risks_action.short_description = ''
def regen_controls_action(self, obj):
return format_html('<button type="submit" name="_regen_controls" class="button">Regenerate Controls using AI</button>')
regen_controls_action.short_description = ''
def regen_keyfindings_action(self, obj):
return format_html('<br/><button type="submit" name="_regen_key_findings" class="button">Regenerate Key Findings using AI</button>')
regen_keyfindings_action.short_description = ''
def regen_recommendations_action(self, obj):
return format_html('<br/><button type="submit" name="_regen_recommendations" class="button">Regenerate Recommendations using AI</button>')
regen_recommendations_action.short_description = ''
def regen_document_action(self, obj):
return format_html(
'<div class="ai-callout">'
'🧹 <strong>Warning:</strong> this will clear the current document (segments, mapped risks/controls, key findings, and recommendations) and regenerate everything as if the document was newly created.'
'</div>'
'<br/>'
'<button type="submit" name="_regen_document" class="button">Regenerate Entire Document</button>'
)
regen_document_action.short_description = ''
def save_model(self, request, obj, form, change):
super().save_model(request, obj, form, change)
org_risks = form.cleaned_data.get('organization_risks')
explanations = form.cleaned_data.get('risk_explanations', {})
old_explanations = obj.risk_explanations or {}
if org_risks is not None and obj.organization_id:
obj.organization.risks.set(org_risks)
new_explanations = {}
for risk in org_risks:
key = str(risk.risk_id)
new_explanations[key] = explanations.get(key, old_explanations.get(key, ""))
obj.risk_explanations = new_explanations
obj.save(update_fields=['risk_explanations'])
else:
if explanations:
obj.risk_explanations = explanations
obj.save(update_fields=['risk_explanations'])
def _apply_post_org_risks(self, request, obj):
try:
if 'organization_risks' in request.POST and obj.organization_id:
risk_ids = [int(pk) for pk in request.POST.getlist('organization_risks') if pk]
obj.organization.risks.set(Risk.objects.filter(pk__in=risk_ids))
except Exception:
logger.exception("Failed to apply posted organization_risks")
def _clear_segments(self, obj, startswith=None, exact=None):
try:
if startswith:
for s in startswith:
obj.segments.filter(content__startswith=s).delete()
if exact:
obj.segments.filter(content__in=exact).delete()
except Exception:
logger.exception("Failed to clear segments for document %s", getattr(obj, 'pk', None))
def _risk_content(self, risks, explanation_map):
return "\n\n".join([
f"Risk: {risk.risk_id} - {risk.risk_name} \n"
f"Category: {risk.category}\n"
f"Primary Impact: {risk.primary_impact} \n"
f"Secondary Impact: {risk.secondary_impact}\n"
f"Tertiary Impact: {risk.tretiary_impact} \n"
f"Detection Difficulty: {risk.detection_difficulty} \n"
f"Recovery Complexity: {risk.recovery_complexity} \n"
f"Business Impact Severity: {risk.businnes_impact_severity}\n"
f"Explanation: {explanation_map.get(risk.risk_id, '')}\n"
for risk in risks
])
def _add_identified_risks(self, obj, risks):
if not risks:
return
self._clear_segments(obj, startswith=["Identified Risks"], exact=["Top 10 Risks Identified"])
obj.add_segment('h1', "Top 10 Risks Identified")
explanation_map = obj.risk_explanations or {}
obj.add_segment('body', f"Identified Risks: \n\n{self._risk_content(risks, explanation_map)}")
def _clear_document_mappings(self, obj, clear_org_risks=True):
try:
obj.segments.all().delete()
obj.documentriskcontrol_set.all().delete()
obj.key_findings = ''
obj.recomendations = ''
obj.status = Document.STATUS_WAITING
obj.save(update_fields=['key_findings', 'recomendations', 'status', 'modified_at'])
if clear_org_risks and getattr(obj, 'organization', None):
obj.organization.risks.clear()
except Exception:
logger.exception("Failed to clear document mappings for document %s", getattr(obj, 'pk', None))
def _regen_pipeline(self, obj):
ok = True
if not self._regen_top_risks(obj):
ok = False
else:
try:
self._regen_controls(obj)
except Exception:
logger.exception("_regen_controls failed")
ok = False
if not self._regen_key_findings(obj):
ok = False
if not self._regen_recommendations(obj):
ok = False
return ok
def _regen_top_risks(self, obj):
top_risks_with_explanation = get_top_risk(obj.organization)
top_risk_ids = [r['risk_id'] for r in top_risks_with_explanation]
top_risks = Risk.objects.filter(risk_id__in=top_risk_ids)
explanation_map = {r['risk_id']: r['explanation'] for r in top_risks_with_explanation}
obj.organization.risks.set(top_risks)
obj.risk_explanations = explanation_map
obj.save(update_fields=['risk_explanations', 'modified_at'])
self._add_identified_risks(obj, top_risks)
return True
def _regen_controls(self, obj):
self._clear_segments(obj, startswith=["Identified Risks", "Mitigation Controls"], exact=["Top 10 Risks Identified", "Regenerated Controls"])
obj.documentriskcontrol_set.all().delete()
top_risks = list(obj.organization.risks.all())
self._add_identified_risks(obj, top_risks)
controls_content = "Mitigation Controls:\n\n"
for risk in top_risks:
controls_content += f"Risk: {risk.risk_id} - {risk.risk_name}\n"
selected_controls = get_controls_for_risk(risk, organization=obj.organization)
for control_id, weight, likelihood in selected_controls:
control = Control.objects.filter(id=control_id).first()
if control:
DocumentRiskControl.objects.create(
document=obj,
risk=risk,
control=control,
weight=weight,
likelihood=likelihood
)
label = f"{control.subcategory} - {control.function or ''}".rstrip(" -")
controls_content += f" - Control: {label} (Impact Weight: {weight}/10) (Likelihood: {likelihood}/10)\n"
controls_content += "\n"
obj.add_segment('body', controls_content)
def _regen_key_findings(self, obj):
risks_top3 = get_risk_table(obj)[:3]
key_findings = generate_key_findings(obj, risks_top3)
if key_findings:
obj.key_findings = key_findings
obj.save(update_fields=['key_findings', 'modified_at'])
return True
return False
def _regen_recommendations(self, obj):
risks_top10 = get_risk_table(obj)[:10]
recommendations = generate_recommendations(risks_top10, obj.organization)
if recommendations:
obj.recomendations = recommendations
obj.save(update_fields=['recomendations', 'modified_at'])
return True
return False
def changeform_view(self, request, object_id=None, form_url='', extra_context=None):
if request.method == 'POST' and any(k in request.POST for k in ("_regen_controls", "_regen_key_findings", "_regen_recommendations", "_regen_top_risks", "_regen_document")):
obj = self.get_object(request, object_id)
if obj is None:
return super().changeform_view(request, object_id, form_url, extra_context)
try:
self._apply_post_org_risks(request, obj)
if "_regen_top_risks" in request.POST:
if not obj.organization_id:
self.message_user(request, "Please select an organization first.", level=messages.WARNING)
else:
if self._regen_top_risks(obj):
self.message_user(request, "Top risks regenerated and risk segment updated.")
else:
self.message_user(request, "Top risks could not be generated.", level=messages.WARNING)
elif "_regen_controls" in request.POST:
self._regen_controls(obj)
self.message_user(request, "Risks and controls regenerated successfully.")
elif "_regen_key_findings" in request.POST:
if self._regen_key_findings(obj):
self.message_user(request, "Key Findings regenerated.")
else:
self.message_user(request, "Key Findings could not be generated.", level=messages.WARNING)
elif "_regen_recommendations" in request.POST:
if self._regen_recommendations(obj):
self.message_user(request, "Recommendations regenerated.")
else:
self.message_user(request, "Recommendations could not be generated.", level=messages.WARNING)
elif "_regen_document" in request.POST:
if not obj.organization_id:
self.message_user(request, "Please select an organization first.", level=messages.WARNING)
else:
try:
with transaction.atomic():
self._clear_document_mappings(obj, clear_org_risks=True)
regen_ok = self._regen_pipeline(obj)
try:
self.log_change(request, obj, "Full document regeneration triggered")
except Exception:
logger.exception("Failed to log admin change for full regen")
if regen_ok:
self.message_user(request, "Document fully regenerated.")
else:
self.message_user(request, "Document regeneration finished with warnings or missing outputs.", level=messages.WARNING)
except Exception as e:
logger.exception("Full regeneration failed")
self.message_user(request, f"Full regeneration failed: {e}", level=messages.ERROR)
except Exception as e:
logger.exception("changeform_view action failed")
self.message_user(request, f"Action failed: {e}", level=messages.ERROR)
return redirect(reverse('admin:core_document_change', args=[obj.pk]))
return super().changeform_view(request, object_id, form_url, extra_context)
def review_link(self, obj):
url = reverse('admin:core_document_change', args=[obj.pk])
label = 'Review / Edit'
return format_html('<a class="button" href="{}">{}</a>', url, label)
review_link.short_description = 'Action'
def response_change(self, request, obj):
if "_save_send" in request.POST:
try:
url = f"{settings.SITE_DOMAIN}/pdf/{obj.id}/"
send_document_email(obj.organization.email, url, obj)
obj.status = Document.STATUS_DONE
obj.save(update_fields=['status', 'modified_at'])
self.message_user(request, "Document sent and marked as done.")
except Exception as e:
logger.exception("Failed to send document email")
self.message_user(request, f"Failed to send document: {e}", level=messages.ERROR)
return redirect(reverse('admin:core_document_change', args=[obj.pk]))
return super().response_change(request, obj)
def _refresh_segments_from_current_mappings(self, obj):
self._clear_segments(obj, startswith=["Identified Risks", "Mitigation Controls"], exact=["Top 10 Risks Identified", "Regenerated Controls"])
top_risks = list(obj.organization.risks.all())
if top_risks:
self._add_identified_risks(obj, top_risks)
from collections import defaultdict
controls_by_risk = defaultdict(list)
for drc in obj.documentriskcontrol_set.select_related('risk', 'control').all():
controls_by_risk[drc.risk_id].append(drc)
controls_content = "Mitigation Controls:\n\n"
for risk in top_risks:
controls_content += f"Risk: {risk.risk_id} - {risk.risk_name}\n"
rows = controls_by_risk.get(risk.pk, [])
rows.sort(key=lambda x: (x.control.subcategory or '', x.control.function or ''))
for drc in rows:
control = drc.control
if control:
label = f"{control.subcategory} - {control.function or ''}".rstrip(" -")
controls_content += f" - Control: {label} (Impact Weight: {drc.weight}/10) (Likelihood: {drc.likelihood}/10)\n"
controls_content += "\n"
obj.add_segment('body', controls_content)
def save_related(self, request, form, formsets, change):
super().save_related(request, form, formsets, change)
obj = form.instance
try:
self._refresh_segments_from_current_mappings(obj)
except Exception:
logger.exception("Failed to refresh segments from current mappings for document %s", getattr(obj, 'pk', None))
class DocumentTemplateAdmin(admin.ModelAdmin):
list_display = ['name', 'created_at', 'updated_at', 'preview_button']
def preview_button(self, obj):
url = reverse('core:template_preview', args=[obj.name])
return format_html('<a href="{}" target="_blank">Preview</a>', url)
preview_button.short_description = 'Preview'
preview_button.allow_tags = True
class OrganizationAdmin(admin.ModelAdmin):
list_display = ('name', 'email', 'industry_sector')
search_fields = ['name', 'email']
class RiskAdmin(admin.ModelAdmin):
ordering = ['risk_id']
list_display = ['risk_id','risk_name','category']
class ControlAdmin(admin.ModelAdmin):
list_display = ('id', 'subcategory', 'function', 'category')
search_fields = ('subcategory', 'function', 'category')
class DocumentRiskControlAdmin(admin.ModelAdmin):
list_display = ('document', 'risk', 'control', 'weight','likelihood')
class DemoCodeAdmin(admin.ModelAdmin):
list_display = ('code', 'created_at', 'used', 'company', 'used_at')
change_list_template = "admin/democode_changelist.html"
def get_urls(self):
urls = super().get_urls()
custom_urls = [
path('generate-codes/', self.admin_site.admin_view(self.generate_codes_view), name='generate-codes'),
]
return custom_urls + urls
def generate_codes_view(self, request):
if request.method == 'POST':
form = GenerateCodesForm(request.POST)
if form.is_valid():
count = form.cleaned_data['count']
created = 0
for _ in range(count):
while True:
code = generate_demo_code()
if not DemoCode.objects.filter(code=code).exists():
DemoCode.objects.create(code=code)
created += 1
break
self.message_user(request, f"{created} codes generated.")
return redirect('..')
else:
form = GenerateCodesForm()
return render(request, 'admin/generate_codes.html', {'form': form})
class PaymentRecordAdmin(admin.ModelAdmin):
list_display = ('company', 'amount', 'currency', 'payment_date', 'transaction_id')
search_fields = ('company__name', 'transaction_id')
list_filter = ('payment_date',)
admin.site.register(PaymentRecord, PaymentRecordAdmin)
admin.site.register(Document, DocumentAdmin)
admin.site.register(Organization, OrganizationAdmin)
admin.site.register(Risk ,RiskAdmin)
admin.site.register(Control, ControlAdmin)
admin.site.register(DocumentTemplate, DocumentTemplateAdmin)
admin.site.register(DocumentRiskControl, DocumentRiskControlAdmin)
admin.site.register(DemoCode, DemoCodeAdmin)