from django.contrib import admin from .models import Document, DocumentSegment, Organization, Risk, Control, DocumentTemplate, DocumentRiskControl, DemoCode from django.urls import reverse, path from django.utils.html import format_html from .utils import generate_demo_code, get_top_risk, get_controls_for_risk, generate_key_findings, generate_recommendations from .tables import get_risk_table from django.shortcuts import render, redirect from .forms import GenerateCodesForm, RiskExplanationWidget from django.conf import settings from backend.accounts.utils import send_document_email from django import forms from django.contrib.admin.widgets import FilteredSelectMultiple import logging from django.contrib import messages from django.db import transaction import re logger = logging.getLogger(__name__) class DocumentRiskControlInline(admin.TabularInline): model = DocumentRiskControl extra = 2 max_num = 10 can_delete = False fields = ('risk', 'control', 'weight', 'likelihood') def get_formset(self, request, obj=None, **kwargs): formset = super().get_formset(request, obj, **kwargs) try: if request.method == 'POST' and 'organization_risks' in request.POST: risk_ids = request.POST.getlist('organization_risks') formset.form.base_fields['risk'].queryset = Risk.objects.filter(pk__in=risk_ids) elif obj: formset.form.base_fields['risk'].queryset = obj.organization.risks.all() except Exception: logger.exception("Error building DocumentRiskControlInline formset") return formset class DocumentAdminForm(forms.ModelForm): organization_risks = forms.ModelMultipleChoiceField( queryset=Risk.objects.all(), required=False, widget=FilteredSelectMultiple(verbose_name="Risks", is_stacked=False), help_text="Edit the AI-selected risks for this organization." ) risk_explanations = forms.Field( required=False, widget=RiskExplanationWidget, help_text="Edit explanations for each risk.", label='' ) class Meta: model = Document fields = ['organization', 'risk_explanations', 'status', 'key_findings', 'recomendations'] class Media: css = { 'all': ('admin/css/widgets.css',) } js = ( 'admin/js/SelectBox.js', 'admin/js/SelectFilter2.js', ) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if self.instance and getattr(self.instance, 'organization_id', None): self.fields['organization_risks'].initial = self.instance.organization.risks.all() def clean_risk_explanations(self): data = self.data explanations = {} for key in data: if key.startswith('risk_explanations_risk_') and not key.startswith('risk_explanations_risk_new_'): risk_id = data[key] explanation_key = f"risk_explanations_explanation_{risk_id}" explanation = data.get(explanation_key, "") if risk_id: explanations[str(risk_id)] = explanation for i in range(10): risk_id = data.get(f"risk_explanations_risk_new_{i}", "") explanation = data.get(f"risk_explanations_explanation_new_{i}", "") if risk_id: explanations[str(risk_id)] = explanation return explanations class DocumentAdmin(admin.ModelAdmin): change_form_template = "admin/core/document/change_form.html" form = DocumentAdminForm inlines = [DocumentRiskControlInline] list_display = ('organization', 'status', 'created_at', 'modified_at', 'review_link') list_filter = ('status', 'created_at') search_fields = ['organization__name', 'organization__email'] readonly_fields = ( 'created_at', 'modified_at', 'regen_note_action', 'regen_document_action', 'regen_top_risks_action', 'regen_controls_action', 'regen_keyfindings_action', 'regen_recommendations_action', ) fieldsets = ( ('Organization & Risks', { 'fields': ('organization', 'regen_note_action', 'regen_document_action', 'organization_risks', 'risk_explanations', 'regen_top_risks_action') }), ('Key Findings', { 'fields': ('key_findings', 'regen_keyfindings_action') }), ('Recommendations', { 'fields': ('recomendations', 'regen_recommendations_action') }), ('Status', { 'fields': ('status',) }), ('Timestamps', { 'fields': ('created_at', 'modified_at') }), ) def regen_note_action(self, obj): return format_html( '
' '⏰ Tip: Regenerating document can take some time since we depend on AI models to generate content.' '
' '
' ) regen_note_action.short_description = '' def regen_top_risks_action(self,obj): return format_html( '
' '💡 Recommended: after regenerating the Top 10 risks, also update Controls, Key Findings, and Recommendations.' '
' '
' '' ) regen_top_risks_action.short_description = '' def regen_controls_action(self, obj): return format_html('') regen_controls_action.short_description = '' def regen_keyfindings_action(self, obj): return format_html('
') regen_keyfindings_action.short_description = '' def regen_recommendations_action(self, obj): return format_html('
') regen_recommendations_action.short_description = '' def regen_document_action(self, obj): return format_html( '
' '🧹 Warning: this will clear the current document (segments, mapped risks/controls, key findings, and recommendations) and regenerate everything as if the document was newly created.' '
' '
' '' ) regen_document_action.short_description = '' def save_model(self, request, obj, form, change): super().save_model(request, obj, form, change) org_risks = form.cleaned_data.get('organization_risks') explanations = form.cleaned_data.get('risk_explanations', {}) old_explanations = obj.risk_explanations or {} if org_risks is not None and obj.organization_id: obj.organization.risks.set(org_risks) new_explanations = {} for risk in org_risks: key = str(risk.risk_id) new_explanations[key] = explanations.get(key, old_explanations.get(key, "")) obj.risk_explanations = new_explanations obj.save(update_fields=['risk_explanations']) else: if explanations: obj.risk_explanations = explanations obj.save(update_fields=['risk_explanations']) def _apply_post_org_risks(self, request, obj): try: if 'organization_risks' in request.POST and obj.organization_id: risk_ids = [int(pk) for pk in request.POST.getlist('organization_risks') if pk] obj.organization.risks.set(Risk.objects.filter(pk__in=risk_ids)) except Exception: logger.exception("Failed to apply posted organization_risks") def _clear_segments(self, obj, startswith=None, exact=None): try: if startswith: for s in startswith: obj.segments.filter(content__startswith=s).delete() if exact: obj.segments.filter(content__in=exact).delete() except Exception: logger.exception("Failed to clear segments for document %s", getattr(obj, 'pk', None)) def _risk_content(self, risks, explanation_map): return "\n\n".join([ f"Risk: {risk.risk_id} - {risk.risk_name} \n" f"Category: {risk.category}\n" f"Primary Impact: {risk.primary_impact} \n" f"Secondary Impact: {risk.secondary_impact}\n" f"Tertiary Impact: {risk.tretiary_impact} \n" f"Detection Difficulty: {risk.detection_difficulty} \n" f"Recovery Complexity: {risk.recovery_complexity} \n" f"Business Impact Severity: {risk.businnes_impact_severity}\n" f"Explanation: {explanation_map.get(risk.risk_id, '')}\n" for risk in risks ]) def _add_identified_risks(self, obj, risks): if not risks: return self._clear_segments(obj, startswith=["Identified Risks"], exact=["Top 10 Risks Identified"]) obj.add_segment('h1', "Top 10 Risks Identified") explanation_map = obj.risk_explanations or {} obj.add_segment('body', f"Identified Risks: \n\n{self._risk_content(risks, explanation_map)}") def _clear_document_mappings(self, obj, clear_org_risks=True): try: obj.segments.all().delete() obj.documentriskcontrol_set.all().delete() obj.key_findings = '' obj.recomendations = '' obj.status = Document.STATUS_WAITING obj.save(update_fields=['key_findings', 'recomendations', 'status', 'modified_at']) if clear_org_risks and getattr(obj, 'organization', None): obj.organization.risks.clear() except Exception: logger.exception("Failed to clear document mappings for document %s", getattr(obj, 'pk', None)) def _regen_pipeline(self, obj): ok = True if not self._regen_top_risks(obj): ok = False else: try: self._regen_controls(obj) except Exception: logger.exception("_regen_controls failed") ok = False if not self._regen_key_findings(obj): ok = False if not self._regen_recommendations(obj): ok = False return ok def _regen_top_risks(self, obj): top_risks_with_explanation = get_top_risk(obj.organization) top_risk_ids = [r['risk_id'] for r in top_risks_with_explanation] top_risks = Risk.objects.filter(risk_id__in=top_risk_ids) explanation_map = {r['risk_id']: r['explanation'] for r in top_risks_with_explanation} obj.organization.risks.set(top_risks) obj.risk_explanations = explanation_map obj.save(update_fields=['risk_explanations', 'modified_at']) self._add_identified_risks(obj, top_risks) return True def _regen_controls(self, obj): self._clear_segments(obj, startswith=["Identified Risks", "Mitigation Controls"], exact=["Top 10 Risks Identified", "Regenerated Controls"]) obj.documentriskcontrol_set.all().delete() top_risks = list(obj.organization.risks.all()) self._add_identified_risks(obj, top_risks) controls_content = "Mitigation Controls:\n\n" for risk in top_risks: controls_content += f"Risk: {risk.risk_id} - {risk.risk_name}\n" selected_controls = get_controls_for_risk(risk, organization=obj.organization) for control_id, weight, likelihood in selected_controls: control = Control.objects.filter(id=control_id).first() if control: DocumentRiskControl.objects.create( document=obj, risk=risk, control=control, weight=weight, likelihood=likelihood ) label = f"{control.subcategory} - {control.function or ''}".rstrip(" -") controls_content += f" - Control: {label} (Impact Weight: {weight}/10) (Likelihood: {likelihood}/10)\n" controls_content += "\n" obj.add_segment('body', controls_content) def _regen_key_findings(self, obj): risks_top3 = get_risk_table(obj)[:3] key_findings = generate_key_findings(obj, risks_top3) if key_findings: obj.key_findings = key_findings obj.save(update_fields=['key_findings', 'modified_at']) return True return False def _regen_recommendations(self, obj): risks_top10 = get_risk_table(obj)[:10] recommendations = generate_recommendations(risks_top10, obj.organization) if recommendations: obj.recomendations = recommendations obj.save(update_fields=['recomendations', 'modified_at']) return True return False def changeform_view(self, request, object_id=None, form_url='', extra_context=None): if request.method == 'POST' and any(k in request.POST for k in ("_regen_controls", "_regen_key_findings", "_regen_recommendations", "_regen_top_risks", "_regen_document")): obj = self.get_object(request, object_id) if obj is None: return super().changeform_view(request, object_id, form_url, extra_context) try: self._apply_post_org_risks(request, obj) if "_regen_top_risks" in request.POST: if not obj.organization_id: self.message_user(request, "Please select an organization first.", level=messages.WARNING) else: if self._regen_top_risks(obj): self.message_user(request, "Top risks regenerated and risk segment updated.") else: self.message_user(request, "Top risks could not be generated.", level=messages.WARNING) elif "_regen_controls" in request.POST: self._regen_controls(obj) self.message_user(request, "Risks and controls regenerated successfully.") elif "_regen_key_findings" in request.POST: if self._regen_key_findings(obj): self.message_user(request, "Key Findings regenerated.") else: self.message_user(request, "Key Findings could not be generated.", level=messages.WARNING) elif "_regen_recommendations" in request.POST: if self._regen_recommendations(obj): self.message_user(request, "Recommendations regenerated.") else: self.message_user(request, "Recommendations could not be generated.", level=messages.WARNING) elif "_regen_document" in request.POST: if not obj.organization_id: self.message_user(request, "Please select an organization first.", level=messages.WARNING) else: try: with transaction.atomic(): self._clear_document_mappings(obj, clear_org_risks=True) regen_ok = self._regen_pipeline(obj) try: self.log_change(request, obj, "Full document regeneration triggered") except Exception: logger.exception("Failed to log admin change for full regen") if regen_ok: self.message_user(request, "Document fully regenerated.") else: self.message_user(request, "Document regeneration finished with warnings or missing outputs.", level=messages.WARNING) except Exception as e: logger.exception("Full regeneration failed") self.message_user(request, f"Full regeneration failed: {e}", level=messages.ERROR) except Exception as e: logger.exception("changeform_view action failed") self.message_user(request, f"Action failed: {e}", level=messages.ERROR) return redirect(reverse('admin:core_document_change', args=[obj.pk])) return super().changeform_view(request, object_id, form_url, extra_context) def review_link(self, obj): url = reverse('admin:core_document_change', args=[obj.pk]) label = 'Review / Edit' return format_html('{}', url, label) review_link.short_description = 'Action' def response_change(self, request, obj): if "_save_send" in request.POST: try: url = f"{settings.SITE_DOMAIN}/pdf/{obj.id}/" send_document_email(obj.organization.email, url, obj) obj.status = Document.STATUS_DONE obj.save(update_fields=['status', 'modified_at']) self.message_user(request, "Document sent and marked as done.") except Exception as e: logger.exception("Failed to send document email") self.message_user(request, f"Failed to send document: {e}", level=messages.ERROR) return redirect(reverse('admin:core_document_change', args=[obj.pk])) return super().response_change(request, obj) def _refresh_segments_from_current_mappings(self, obj): self._clear_segments(obj, startswith=["Identified Risks", "Mitigation Controls"], exact=["Top 10 Risks Identified", "Regenerated Controls"]) top_risks = list(obj.organization.risks.all()) if top_risks: self._add_identified_risks(obj, top_risks) from collections import defaultdict controls_by_risk = defaultdict(list) for drc in obj.documentriskcontrol_set.select_related('risk', 'control').all(): controls_by_risk[drc.risk_id].append(drc) controls_content = "Mitigation Controls:\n\n" for risk in top_risks: controls_content += f"Risk: {risk.risk_id} - {risk.risk_name}\n" rows = controls_by_risk.get(risk.pk, []) rows.sort(key=lambda x: (x.control.subcategory or '', x.control.function or '')) for drc in rows: control = drc.control if control: label = f"{control.subcategory} - {control.function or ''}".rstrip(" -") controls_content += f" - Control: {label} (Impact Weight: {drc.weight}/10) (Likelihood: {drc.likelihood}/10)\n" controls_content += "\n" obj.add_segment('body', controls_content) def save_related(self, request, form, formsets, change): super().save_related(request, form, formsets, change) obj = form.instance try: self._refresh_segments_from_current_mappings(obj) except Exception: logger.exception("Failed to refresh segments from current mappings for document %s", getattr(obj, 'pk', None)) class DocumentTemplateAdmin(admin.ModelAdmin): list_display = ['name', 'created_at', 'updated_at', 'preview_button'] def preview_button(self, obj): url = reverse('core:template_preview', args=[obj.name]) return format_html('Preview', url) preview_button.short_description = 'Preview' preview_button.allow_tags = True class OrganizationAdmin(admin.ModelAdmin): list_display = ('name', 'email', 'industry_sector') search_fields = ['name', 'email'] class RiskAdmin(admin.ModelAdmin): ordering = ['risk_id'] list_display = ['risk_id','risk_name','category'] class ControlAdmin(admin.ModelAdmin): list_display = ('id', 'subcategory', 'function', 'category') search_fields = ('subcategory', 'function', 'category') class DocumentRiskControlAdmin(admin.ModelAdmin): list_display = ('document', 'risk', 'control', 'weight','likelihood') class DemoCodeAdmin(admin.ModelAdmin): list_display = ('code', 'created_at', 'used', 'company', 'used_at') change_list_template = "admin/democode_changelist.html" def get_urls(self): urls = super().get_urls() custom_urls = [ path('generate-codes/', self.admin_site.admin_view(self.generate_codes_view), name='generate-codes'), ] return custom_urls + urls def generate_codes_view(self, request): if request.method == 'POST': form = GenerateCodesForm(request.POST) if form.is_valid(): count = form.cleaned_data['count'] created = 0 for _ in range(count): while True: code = generate_demo_code() if not DemoCode.objects.filter(code=code).exists(): DemoCode.objects.create(code=code) created += 1 break self.message_user(request, f"{created} codes generated.") return redirect('..') else: form = GenerateCodesForm() return render(request, 'admin/generate_codes.html', {'form': form}) admin.site.register(Document, DocumentAdmin) admin.site.register(Organization, OrganizationAdmin) admin.site.register(Risk ,RiskAdmin) admin.site.register(Control, ControlAdmin) admin.site.register(DocumentTemplate, DocumentTemplateAdmin) admin.site.register(DocumentRiskControl, DocumentRiskControlAdmin) admin.site.register(DemoCode, DemoCodeAdmin)