2024-12-29 03:44:52 +01:00
from django . contrib import admin
2025-07-14 11:29:11 +02:00
from . models import Document , DocumentSegment , Organization , Risk , Control , DocumentTemplate , DocumentRiskControl , DemoCode
2025-08-17 18:43:55 +02:00
from django . urls import reverse , path
2025-02-13 17:55:46 +01:00
from django . utils . html import format_html
2025-08-26 19:32:03 +02:00
from . utils import generate_demo_code , get_top_risk , get_controls_for_risk , generate_key_findings , generate_recommendations
2025-08-17 18:43:55 +02:00
from . tables import get_risk_table
2025-06-20 00:56:57 +02:00
from django . shortcuts import render , redirect
from . forms import GenerateCodesForm
2025-08-17 18:43:55 +02:00
from django . conf import settings
from backend . accounts . utils import send_document_email
from django import forms
from django . contrib . admin . widgets import FilteredSelectMultiple
2025-08-26 19:32:03 +02:00
import logging
from django . contrib import messages
from django . db import transaction
logger = logging . getLogger ( __name__ )
2025-08-17 18:43:55 +02:00
class DocumentRiskControlInline ( admin . TabularInline ) :
model = DocumentRiskControl
extra = 2
max_num = 10
can_delete = False
fields = ( ' risk ' , ' control ' , ' weight ' , ' likelihood ' )
def get_formset ( self , request , obj = None , * * kwargs ) :
formset = super ( ) . get_formset ( request , obj , * * kwargs )
try :
if request . method == ' POST ' and ' organization_risks ' in request . POST :
risk_ids = request . POST . getlist ( ' organization_risks ' )
formset . form . base_fields [ ' risk ' ] . queryset = Risk . objects . filter ( pk__in = risk_ids )
elif obj :
formset . form . base_fields [ ' risk ' ] . queryset = obj . organization . risks . all ( )
except Exception :
2025-08-26 19:32:03 +02:00
logger . exception ( " Error building DocumentRiskControlInline formset " )
2025-08-17 18:43:55 +02:00
return formset
class DocumentAdminForm ( forms . ModelForm ) :
organization_risks = forms . ModelMultipleChoiceField (
queryset = Risk . objects . all ( ) ,
required = False ,
widget = FilteredSelectMultiple ( verbose_name = " Risks " , is_stacked = False ) ,
help_text = " Edit the AI-selected risks for this organization. "
)
class Meta :
model = Document
fields = [ ' organization ' , ' status ' , ' key_findings ' , ' recomendations ' ]
class Media :
css = { ' all ' : ( ' admin/css/widgets.css ' , ) }
js = (
' admin/js/SelectBox.js ' ,
' admin/js/SelectFilter2.js ' ,
)
def __init__ ( self , * args , * * kwargs ) :
super ( ) . __init__ ( * args , * * kwargs )
if self . instance and getattr ( self . instance , ' organization_id ' , None ) :
self . fields [ ' organization_risks ' ] . initial = self . instance . organization . risks . all ( )
2024-12-29 03:44:52 +01:00
2025-02-06 11:24:19 +01:00
class DocumentAdmin ( admin . ModelAdmin ) :
2025-08-17 18:43:55 +02:00
change_form_template = " admin/core/document/change_form.html "
form = DocumentAdminForm
inlines = [ DocumentRiskControlInline ]
list_display = ( ' organization ' , ' status ' , ' created_at ' , ' modified_at ' , ' review_link ' )
list_filter = ( ' status ' , ' created_at ' )
search_fields = [ ' organization__name ' , ' organization__email ' ]
readonly_fields = (
2025-08-26 19:32:03 +02:00
' created_at ' , ' modified_at ' , ' regen_note_action ' ,
' regen_document_action ' , ' regen_top_risks_action ' , ' regen_controls_action ' , ' regen_keyfindings_action ' , ' regen_recommendations_action ' ,
2025-08-17 18:43:55 +02:00
)
fieldsets = (
( ' Organization & Risks ' , {
2025-08-26 19:32:03 +02:00
' fields ' : ( ' organization ' , ' regen_note_action ' , ' regen_document_action ' , ' organization_risks ' , ' regen_top_risks_action ' )
2025-08-17 18:43:55 +02:00
} ) ,
( ' Key Findings ' , {
' fields ' : ( ' key_findings ' , ' regen_keyfindings_action ' )
} ) ,
( ' Recommendations ' , {
' fields ' : ( ' recomendations ' , ' regen_recommendations_action ' )
} ) ,
( ' Status ' , {
' fields ' : ( ' status ' , )
} ) ,
( ' Timestamps ' , {
' fields ' : ( ' created_at ' , ' modified_at ' )
} ) ,
)
2025-08-26 19:32:03 +02:00
def regen_note_action ( self , obj ) :
return format_html (
' <div class= " ai-callout " > '
' ⏰ <strong>Tip:</strong> Regenerating document can take some time since we depend on AI models to generate content. '
' </div> '
' <br/> '
)
regen_note_action . short_description = ' '
def regen_top_risks_action ( self , obj ) :
return format_html (
' <div class= " ai-callout ai-callout-info " > '
' 💡 <strong>Recommended:</strong> after regenerating the Top 10 risks, also update Controls, Key Findings, and Recommendations. '
' </div> '
' <br/> '
' <button type= " submit " name= " _regen_top_risks " class= " button " >Regenerate Top 10 Risks using AI</button> '
)
regen_top_risks_action . short_description = ' '
2025-08-17 18:43:55 +02:00
def regen_controls_action ( self , obj ) :
return format_html ( ' <button type= " submit " name= " _regen_controls " class= " button " >Regenerate Controls using AI</button> ' )
regen_controls_action . short_description = ' '
def regen_keyfindings_action ( self , obj ) :
2025-08-26 19:32:03 +02:00
return format_html ( ' <br/><button type= " submit " name= " _regen_key_findings " class= " button " >Regenerate Key Findings using AI</button> ' )
2025-08-17 18:43:55 +02:00
regen_keyfindings_action . short_description = ' '
def regen_recommendations_action ( self , obj ) :
2025-08-26 19:32:03 +02:00
return format_html ( ' <br/><button type= " submit " name= " _regen_recommendations " class= " button " >Regenerate Recommendations using AI</button> ' )
2025-08-17 18:43:55 +02:00
regen_recommendations_action . short_description = ' '
2025-08-26 19:32:03 +02:00
def regen_document_action ( self , obj ) :
return format_html (
' <div class= " ai-callout " > '
' 🧹 <strong>Warning:</strong> this will clear the current document (segments, mapped risks/controls, key findings, and recommendations) and regenerate everything as if the document was newly created. '
' </div> '
' <br/> '
' <button type= " submit " name= " _regen_document " class= " button " >Regenerate Entire Document</button> '
)
regen_document_action . short_description = ' '
2025-08-17 18:43:55 +02:00
def save_model ( self , request , obj , form , change ) :
super ( ) . save_model ( request , obj , form , change )
org_risks = form . cleaned_data . get ( ' organization_risks ' )
if org_risks is not None and obj . organization_id :
obj . organization . risks . set ( org_risks )
def _apply_post_org_risks ( self , request , obj ) :
try :
if ' organization_risks ' in request . POST and obj . organization_id :
risk_ids = [ int ( pk ) for pk in request . POST . getlist ( ' organization_risks ' ) if pk ]
obj . organization . risks . set ( Risk . objects . filter ( pk__in = risk_ids ) )
except Exception :
2025-08-26 19:32:03 +02:00
logger . exception ( " Failed to apply posted organization_risks " )
def _clear_segments ( self , obj , startswith = None , exact = None ) :
try :
if startswith :
for s in startswith :
obj . segments . filter ( content__startswith = s ) . delete ( )
if exact :
obj . segments . filter ( content__in = exact ) . delete ( )
except Exception :
logger . exception ( " Failed to clear segments for document %s " , getattr ( obj , ' pk ' , None ) )
def _risk_content ( self , risks ) :
return " \n \n " . join ( [
f " Risk: { risk . risk_id } - { risk . risk_name } \n "
f " Category: { risk . category } \n "
f " Primary Impact: { risk . primary_impact } \n "
f " Secondary Impact: { risk . secondary_impact } \n "
f " Tertiary Impact: { risk . tretiary_impact } \n "
f " Detection Difficulty: { risk . detection_difficulty } \n "
f " Recovery Complexity: { risk . recovery_complexity } \n "
f " Business Impact Severity: { risk . businnes_impact_severity } \n "
for risk in risks
] )
def _add_identified_risks ( self , obj , risks ) :
if not risks :
return
self . _clear_segments ( obj , startswith = [ " Identified Risks " ] , exact = [ " Top 10 Risks Identified " ] )
obj . add_segment ( ' h1 ' , " Top 10 Risks Identified " )
obj . add_segment ( ' body ' , f " Identified Risks: \n \n { self . _risk_content ( risks ) } " )
def _clear_document_mappings ( self , obj , clear_org_risks = True ) :
try :
obj . segments . all ( ) . delete ( )
obj . documentriskcontrol_set . all ( ) . delete ( )
obj . key_findings = ' '
obj . recomendations = ' '
obj . status = Document . STATUS_WAITING
obj . save ( update_fields = [ ' key_findings ' , ' recomendations ' , ' status ' , ' modified_at ' ] )
if clear_org_risks and getattr ( obj , ' organization ' , None ) :
obj . organization . risks . clear ( )
except Exception :
logger . exception ( " Failed to clear document mappings for document %s " , getattr ( obj , ' pk ' , None ) )
def _regen_pipeline ( self , obj ) :
ok = True
if not self . _regen_top_risks ( obj ) :
ok = False
else :
try :
self . _regen_controls ( obj )
except Exception :
logger . exception ( " _regen_controls failed " )
ok = False
if not self . _regen_key_findings ( obj ) :
ok = False
if not self . _regen_recommendations ( obj ) :
ok = False
return ok
def _regen_top_risks ( self , obj ) :
top_risk_ids = get_top_risk ( obj . organization )
top_risks = Risk . objects . filter ( risk_id__in = top_risk_ids )
obj . organization . risks . set ( top_risks )
self . _add_identified_risks ( obj , top_risks )
return True
2025-08-17 18:43:55 +02:00
def _regen_controls ( self , obj ) :
2025-08-26 19:32:03 +02:00
self . _clear_segments ( obj , startswith = [ " Identified Risks " , " Mitigation Controls " ] , exact = [ " Top 10 Risks Identified " , " Regenerated Controls " ] )
2025-08-17 18:43:55 +02:00
obj . documentriskcontrol_set . all ( ) . delete ( )
top_risks = list ( obj . organization . risks . all ( ) )
2025-08-26 19:32:03 +02:00
self . _add_identified_risks ( obj , top_risks )
2025-08-17 18:43:55 +02:00
controls_content = " Mitigation Controls: \n \n "
for risk in top_risks :
controls_content + = f " Risk: { risk . risk_id } - { risk . risk_name } \n "
selected_controls = get_controls_for_risk ( risk , organization = obj . organization )
for control_id , weight , likelihood in selected_controls :
control = Control . objects . filter ( id = control_id ) . first ( )
if control :
DocumentRiskControl . objects . create (
document = obj ,
risk = risk ,
control = control ,
weight = weight ,
likelihood = likelihood
)
label = f " { control . subcategory } - { control . function or ' ' } " . rstrip ( " - " )
controls_content + = f " - Control: { label } (Impact Weight: { weight } /10) (Likelihood: { likelihood } /10) \n "
controls_content + = " \n "
obj . add_segment ( ' body ' , controls_content )
def _regen_key_findings ( self , obj ) :
risks_top3 = get_risk_table ( obj ) [ : 3 ]
key_findings = generate_key_findings ( obj , risks_top3 )
if key_findings :
obj . key_findings = key_findings
obj . save ( update_fields = [ ' key_findings ' , ' modified_at ' ] )
return True
return False
def _regen_recommendations ( self , obj ) :
risks_top10 = get_risk_table ( obj ) [ : 10 ]
recommendations = generate_recommendations ( risks_top10 , obj . organization )
if recommendations :
obj . recomendations = recommendations
obj . save ( update_fields = [ ' recomendations ' , ' modified_at ' ] )
return True
return False
def changeform_view ( self , request , object_id = None , form_url = ' ' , extra_context = None ) :
2025-08-26 19:32:03 +02:00
if request . method == ' POST ' and any ( k in request . POST for k in ( " _regen_controls " , " _regen_key_findings " , " _regen_recommendations " , " _regen_top_risks " , " _regen_document " ) ) :
2025-08-17 18:43:55 +02:00
obj = self . get_object ( request , object_id )
if obj is None :
return super ( ) . changeform_view ( request , object_id , form_url , extra_context )
try :
self . _apply_post_org_risks ( request , obj )
2025-08-26 19:32:03 +02:00
if " _regen_top_risks " in request . POST :
if not obj . organization_id :
self . message_user ( request , " Please select an organization first. " , level = messages . WARNING )
else :
if self . _regen_top_risks ( obj ) :
self . message_user ( request , " Top risks regenerated and risk segment updated. " )
else :
self . message_user ( request , " Top risks could not be generated. " , level = messages . WARNING )
elif " _regen_controls " in request . POST :
2025-08-17 18:43:55 +02:00
self . _regen_controls ( obj )
self . message_user ( request , " Risks and controls regenerated successfully. " )
elif " _regen_key_findings " in request . POST :
if self . _regen_key_findings ( obj ) :
self . message_user ( request , " Key Findings regenerated. " )
else :
2025-08-26 19:32:03 +02:00
self . message_user ( request , " Key Findings could not be generated. " , level = messages . WARNING )
2025-08-17 18:43:55 +02:00
elif " _regen_recommendations " in request . POST :
if self . _regen_recommendations ( obj ) :
self . message_user ( request , " Recommendations regenerated. " )
else :
2025-08-26 19:32:03 +02:00
self . message_user ( request , " Recommendations could not be generated. " , level = messages . WARNING )
elif " _regen_document " in request . POST :
if not obj . organization_id :
self . message_user ( request , " Please select an organization first. " , level = messages . WARNING )
else :
try :
with transaction . atomic ( ) :
self . _clear_document_mappings ( obj , clear_org_risks = True )
regen_ok = self . _regen_pipeline ( obj )
try :
self . log_change ( request , obj , " Full document regeneration triggered " )
except Exception :
logger . exception ( " Failed to log admin change for full regen " )
if regen_ok :
self . message_user ( request , " Document fully regenerated. " )
else :
self . message_user ( request , " Document regeneration finished with warnings or missing outputs. " , level = messages . WARNING )
except Exception as e :
logger . exception ( " Full regeneration failed " )
self . message_user ( request , f " Full regeneration failed: { e } " , level = messages . ERROR )
2025-08-17 18:43:55 +02:00
except Exception as e :
2025-08-26 19:32:03 +02:00
logger . exception ( " changeform_view action failed " )
self . message_user ( request , f " Action failed: { e } " , level = messages . ERROR )
2025-08-17 18:43:55 +02:00
return redirect ( reverse ( ' admin:core_document_change ' , args = [ obj . pk ] ) )
return super ( ) . changeform_view ( request , object_id , form_url , extra_context )
def review_link ( self , obj ) :
url = reverse ( ' admin:core_document_change ' , args = [ obj . pk ] )
label = ' Review / Edit '
return format_html ( ' <a class= " button " href= " {} " > {} </a> ' , url , label )
review_link . short_description = ' Action '
def response_change ( self , request , obj ) :
if " _save_send " in request . POST :
try :
url = f " { settings . SITE_DOMAIN } /pdf/ { obj . id } / "
send_document_email ( obj . organization . email , url , obj )
obj . status = Document . STATUS_DONE
obj . save ( update_fields = [ ' status ' , ' modified_at ' ] )
self . message_user ( request , " Document sent and marked as done. " )
except Exception as e :
2025-08-26 19:32:03 +02:00
logger . exception ( " Failed to send document email " )
self . message_user ( request , f " Failed to send document: { e } " , level = messages . ERROR )
2025-08-17 18:43:55 +02:00
return redirect ( reverse ( ' admin:core_document_change ' , args = [ obj . pk ] ) )
return super ( ) . response_change ( request , obj )
def _refresh_segments_from_current_mappings ( self , obj ) :
2025-08-26 19:32:03 +02:00
self . _clear_segments ( obj , startswith = [ " Identified Risks " , " Mitigation Controls " ] , exact = [ " Top 10 Risks Identified " , " Regenerated Controls " ] )
2025-08-17 18:43:55 +02:00
top_risks = list ( obj . organization . risks . all ( ) )
if top_risks :
2025-08-26 19:32:03 +02:00
self . _add_identified_risks ( obj , top_risks )
2025-08-17 18:43:55 +02:00
from collections import defaultdict
controls_by_risk = defaultdict ( list )
for drc in obj . documentriskcontrol_set . select_related ( ' risk ' , ' control ' ) . all ( ) :
controls_by_risk [ drc . risk_id ] . append ( drc )
controls_content = " Mitigation Controls: \n \n "
for risk in top_risks :
controls_content + = f " Risk: { risk . risk_id } - { risk . risk_name } \n "
rows = controls_by_risk . get ( risk . pk , [ ] )
rows . sort ( key = lambda x : ( x . control . subcategory or ' ' , x . control . function or ' ' ) )
for drc in rows :
control = drc . control
if control :
label = f " { control . subcategory } - { control . function or ' ' } " . rstrip ( " - " )
controls_content + = f " - Control: { label } (Impact Weight: { drc . weight } /10) (Likelihood: { drc . likelihood } /10) \n "
controls_content + = " \n "
obj . add_segment ( ' body ' , controls_content )
def save_related ( self , request , form , formsets , change ) :
super ( ) . save_related ( request , form , formsets , change )
obj = form . instance
try :
self . _refresh_segments_from_current_mappings ( obj )
except Exception :
2025-08-26 19:32:03 +02:00
logger . exception ( " Failed to refresh segments from current mappings for document %s " , getattr ( obj , ' pk ' , None ) )
2025-02-13 17:55:46 +01:00
class DocumentTemplateAdmin ( admin . ModelAdmin ) :
list_display = [ ' name ' , ' created_at ' , ' updated_at ' , ' preview_button ' ]
def preview_button ( self , obj ) :
url = reverse ( ' core:template_preview ' , args = [ obj . name ] )
return format_html ( ' <a href= " {} " target= " _blank " >Preview</a> ' , url )
preview_button . short_description = ' Preview '
preview_button . allow_tags = True
2025-02-06 11:24:19 +01:00
class OrganizationAdmin ( admin . ModelAdmin ) :
list_display = ( ' name ' , ' email ' , ' industry_sector ' )
search_fields = [ ' name ' , ' email ' ]
2025-02-10 09:25:39 +01:00
class RiskAdmin ( admin . ModelAdmin ) :
ordering = [ ' risk_id ' ]
list_display = [ ' risk_id ' , ' risk_name ' , ' category ' ]
2025-02-12 16:15:06 +01:00
class ControlAdmin ( admin . ModelAdmin ) :
2025-08-14 14:08:34 +02:00
list_display = ( ' id ' , ' subcategory ' , ' function ' , ' category ' )
search_fields = ( ' subcategory ' , ' function ' , ' category ' )
2025-02-13 18:29:54 +01:00
2025-02-14 17:52:51 +01:00
class DocumentRiskControlAdmin ( admin . ModelAdmin ) :
2025-03-27 23:57:31 +01:00
list_display = ( ' document ' , ' risk ' , ' control ' , ' weight ' , ' likelihood ' )
2025-02-12 16:15:06 +01:00
2025-07-14 11:29:11 +02:00
class DemoCodeAdmin ( admin . ModelAdmin ) :
2025-06-20 00:56:57 +02:00
list_display = ( ' code ' , ' created_at ' , ' used ' , ' company ' , ' used_at ' )
2025-07-14 11:29:11 +02:00
change_list_template = " admin/democode_changelist.html "
2025-06-20 00:56:57 +02:00
def get_urls ( self ) :
urls = super ( ) . get_urls ( )
custom_urls = [
path ( ' generate-codes/ ' , self . admin_site . admin_view ( self . generate_codes_view ) , name = ' generate-codes ' ) ,
]
return custom_urls + urls
def generate_codes_view ( self , request ) :
if request . method == ' POST ' :
form = GenerateCodesForm ( request . POST )
if form . is_valid ( ) :
count = form . cleaned_data [ ' count ' ]
created = 0
for _ in range ( count ) :
while True :
2025-07-14 11:29:11 +02:00
code = generate_demo_code ( )
if not DemoCode . objects . filter ( code = code ) . exists ( ) :
DemoCode . objects . create ( code = code )
2025-06-20 00:56:57 +02:00
created + = 1
break
self . message_user ( request , f " { created } codes generated. " )
return redirect ( ' .. ' )
else :
form = GenerateCodesForm ( )
return render ( request , ' admin/generate_codes.html ' , { ' form ' : form } )
2025-02-10 09:25:39 +01:00
2025-02-06 11:24:19 +01:00
admin . site . register ( Document , DocumentAdmin )
admin . site . register ( Organization , OrganizationAdmin )
2025-02-10 09:25:39 +01:00
admin . site . register ( Risk , RiskAdmin )
2025-02-12 16:15:06 +01:00
admin . site . register ( Control , ControlAdmin )
2025-02-13 17:55:46 +01:00
admin . site . register ( DocumentTemplate , DocumentTemplateAdmin )
2025-02-14 17:52:51 +01:00
admin . site . register ( DocumentRiskControl , DocumentRiskControlAdmin )
2025-07-14 11:29:11 +02:00
admin . site . register ( DemoCode , DemoCodeAdmin )