'
'🧹 Warning: this will clear the current document (segments, mapped risks/controls, key findings, and recommendations) and regenerate everything as if the document was newly created.'
'
'
' '
''
)
regen_document_action.short_description = ''
def save_model(self, request, obj, form, change):
super().save_model(request, obj, form, change)
org_risks = form.cleaned_data.get('organization_risks')
if org_risks is not None and obj.organization_id:
obj.organization.risks.set(org_risks)
def _apply_post_org_risks(self, request, obj):
try:
if 'organization_risks' in request.POST and obj.organization_id:
risk_ids = [int(pk) for pk in request.POST.getlist('organization_risks') if pk]
obj.organization.risks.set(Risk.objects.filter(pk__in=risk_ids))
except Exception:
logger.exception("Failed to apply posted organization_risks")
def _clear_segments(self, obj, startswith=None, exact=None):
try:
if startswith:
for s in startswith:
obj.segments.filter(content__startswith=s).delete()
if exact:
obj.segments.filter(content__in=exact).delete()
except Exception:
logger.exception("Failed to clear segments for document %s", getattr(obj, 'pk', None))
def _risk_content(self, risks):
return "\n\n".join([
f"Risk: {risk.risk_id} - {risk.risk_name} \n"
f"Category: {risk.category}\n"
f"Primary Impact: {risk.primary_impact} \n"
f"Secondary Impact: {risk.secondary_impact}\n"
f"Tertiary Impact: {risk.tretiary_impact} \n"
f"Detection Difficulty: {risk.detection_difficulty} \n"
f"Recovery Complexity: {risk.recovery_complexity} \n"
f"Business Impact Severity: {risk.businnes_impact_severity}\n"
for risk in risks
])
def _add_identified_risks(self, obj, risks):
if not risks:
return
self._clear_segments(obj, startswith=["Identified Risks"], exact=["Top 10 Risks Identified"])
obj.add_segment('h1', "Top 10 Risks Identified")
obj.add_segment('body', f"Identified Risks: \n\n{self._risk_content(risks)}")
def _clear_document_mappings(self, obj, clear_org_risks=True):
try:
obj.segments.all().delete()
obj.documentriskcontrol_set.all().delete()
obj.key_findings = ''
obj.recomendations = ''
obj.status = Document.STATUS_WAITING
obj.save(update_fields=['key_findings', 'recomendations', 'status', 'modified_at'])
if clear_org_risks and getattr(obj, 'organization', None):
obj.organization.risks.clear()
except Exception:
logger.exception("Failed to clear document mappings for document %s", getattr(obj, 'pk', None))
def _regen_pipeline(self, obj):
ok = True
if not self._regen_top_risks(obj):
ok = False
else:
try:
self._regen_controls(obj)
except Exception:
logger.exception("_regen_controls failed")
ok = False
if not self._regen_key_findings(obj):
ok = False
if not self._regen_recommendations(obj):
ok = False
return ok
def _regen_top_risks(self, obj):
top_risk_ids = get_top_risk(obj.organization)
top_risks = Risk.objects.filter(risk_id__in=top_risk_ids)
obj.organization.risks.set(top_risks)
self._add_identified_risks(obj, top_risks)
return True
def _regen_controls(self, obj):
self._clear_segments(obj, startswith=["Identified Risks", "Mitigation Controls"], exact=["Top 10 Risks Identified", "Regenerated Controls"])
obj.documentriskcontrol_set.all().delete()
top_risks = list(obj.organization.risks.all())
self._add_identified_risks(obj, top_risks)
controls_content = "Mitigation Controls:\n\n"
for risk in top_risks:
controls_content += f"Risk: {risk.risk_id} - {risk.risk_name}\n"
selected_controls = get_controls_for_risk(risk, organization=obj.organization)
for control_id, weight, likelihood in selected_controls:
control = Control.objects.filter(id=control_id).first()
if control:
DocumentRiskControl.objects.create(
document=obj,
risk=risk,
control=control,
weight=weight,
likelihood=likelihood
)
label = f"{control.subcategory} - {control.function or ''}".rstrip(" -")
controls_content += f" - Control: {label} (Impact Weight: {weight}/10) (Likelihood: {likelihood}/10)\n"
controls_content += "\n"
obj.add_segment('body', controls_content)
def _regen_key_findings(self, obj):
risks_top3 = get_risk_table(obj)[:3]
key_findings = generate_key_findings(obj, risks_top3)
if key_findings:
obj.key_findings = key_findings
obj.save(update_fields=['key_findings', 'modified_at'])
return True
return False
def _regen_recommendations(self, obj):
risks_top10 = get_risk_table(obj)[:10]
recommendations = generate_recommendations(risks_top10, obj.organization)
if recommendations:
obj.recomendations = recommendations
obj.save(update_fields=['recomendations', 'modified_at'])
return True
return False
def changeform_view(self, request, object_id=None, form_url='', extra_context=None):
if request.method == 'POST' and any(k in request.POST for k in ("_regen_controls", "_regen_key_findings", "_regen_recommendations", "_regen_top_risks", "_regen_document")):
obj = self.get_object(request, object_id)
if obj is None:
return super().changeform_view(request, object_id, form_url, extra_context)
try:
self._apply_post_org_risks(request, obj)
if "_regen_top_risks" in request.POST:
if not obj.organization_id:
self.message_user(request, "Please select an organization first.", level=messages.WARNING)
else:
if self._regen_top_risks(obj):
self.message_user(request, "Top risks regenerated and risk segment updated.")
else:
self.message_user(request, "Top risks could not be generated.", level=messages.WARNING)
elif "_regen_controls" in request.POST:
self._regen_controls(obj)
self.message_user(request, "Risks and controls regenerated successfully.")
elif "_regen_key_findings" in request.POST:
if self._regen_key_findings(obj):
self.message_user(request, "Key Findings regenerated.")
else:
self.message_user(request, "Key Findings could not be generated.", level=messages.WARNING)
elif "_regen_recommendations" in request.POST:
if self._regen_recommendations(obj):
self.message_user(request, "Recommendations regenerated.")
else:
self.message_user(request, "Recommendations could not be generated.", level=messages.WARNING)
elif "_regen_document" in request.POST:
if not obj.organization_id:
self.message_user(request, "Please select an organization first.", level=messages.WARNING)
else:
try:
with transaction.atomic():
self._clear_document_mappings(obj, clear_org_risks=True)
regen_ok = self._regen_pipeline(obj)
try:
self.log_change(request, obj, "Full document regeneration triggered")
except Exception:
logger.exception("Failed to log admin change for full regen")
if regen_ok:
self.message_user(request, "Document fully regenerated.")
else:
self.message_user(request, "Document regeneration finished with warnings or missing outputs.", level=messages.WARNING)
except Exception as e:
logger.exception("Full regeneration failed")
self.message_user(request, f"Full regeneration failed: {e}", level=messages.ERROR)
except Exception as e:
logger.exception("changeform_view action failed")
self.message_user(request, f"Action failed: {e}", level=messages.ERROR)
return redirect(reverse('admin:core_document_change', args=[obj.pk]))
return super().changeform_view(request, object_id, form_url, extra_context)
def review_link(self, obj):
url = reverse('admin:core_document_change', args=[obj.pk])
label = 'Review / Edit'
return format_html('{}', url, label)
review_link.short_description = 'Action'
def response_change(self, request, obj):
if "_save_send" in request.POST:
try:
url = f"{settings.SITE_DOMAIN}/pdf/{obj.id}/"
send_document_email(obj.organization.email, url, obj)
obj.status = Document.STATUS_DONE
obj.save(update_fields=['status', 'modified_at'])
self.message_user(request, "Document sent and marked as done.")
except Exception as e:
logger.exception("Failed to send document email")
self.message_user(request, f"Failed to send document: {e}", level=messages.ERROR)
return redirect(reverse('admin:core_document_change', args=[obj.pk]))
return super().response_change(request, obj)
def _refresh_segments_from_current_mappings(self, obj):
self._clear_segments(obj, startswith=["Identified Risks", "Mitigation Controls"], exact=["Top 10 Risks Identified", "Regenerated Controls"])
top_risks = list(obj.organization.risks.all())
if top_risks:
self._add_identified_risks(obj, top_risks)
from collections import defaultdict
controls_by_risk = defaultdict(list)
for drc in obj.documentriskcontrol_set.select_related('risk', 'control').all():
controls_by_risk[drc.risk_id].append(drc)
controls_content = "Mitigation Controls:\n\n"
for risk in top_risks:
controls_content += f"Risk: {risk.risk_id} - {risk.risk_name}\n"
rows = controls_by_risk.get(risk.pk, [])
rows.sort(key=lambda x: (x.control.subcategory or '', x.control.function or ''))
for drc in rows:
control = drc.control
if control:
label = f"{control.subcategory} - {control.function or ''}".rstrip(" -")
controls_content += f" - Control: {label} (Impact Weight: {drc.weight}/10) (Likelihood: {drc.likelihood}/10)\n"
controls_content += "\n"
obj.add_segment('body', controls_content)
def save_related(self, request, form, formsets, change):
super().save_related(request, form, formsets, change)
obj = form.instance
try:
self._refresh_segments_from_current_mappings(obj)
except Exception:
logger.exception("Failed to refresh segments from current mappings for document %s", getattr(obj, 'pk', None))
class DocumentTemplateAdmin(admin.ModelAdmin):
list_display = ['name', 'created_at', 'updated_at', 'preview_button']
def preview_button(self, obj):
url = reverse('core:template_preview', args=[obj.name])
return format_html('Preview', url)
preview_button.short_description = 'Preview'
preview_button.allow_tags = True
class OrganizationAdmin(admin.ModelAdmin):
list_display = ('name', 'email', 'industry_sector')
search_fields = ['name', 'email']
class RiskAdmin(admin.ModelAdmin):
ordering = ['risk_id']
list_display = ['risk_id','risk_name','category']
class ControlAdmin(admin.ModelAdmin):
list_display = ('id', 'subcategory', 'function', 'category')
search_fields = ('subcategory', 'function', 'category')
class DocumentRiskControlAdmin(admin.ModelAdmin):
list_display = ('document', 'risk', 'control', 'weight','likelihood')
class DemoCodeAdmin(admin.ModelAdmin):
list_display = ('code', 'created_at', 'used', 'company', 'used_at')
change_list_template = "admin/democode_changelist.html"
def get_urls(self):
urls = super().get_urls()
custom_urls = [
path('generate-codes/', self.admin_site.admin_view(self.generate_codes_view), name='generate-codes'),
]
return custom_urls + urls
def generate_codes_view(self, request):
if request.method == 'POST':
form = GenerateCodesForm(request.POST)
if form.is_valid():
count = form.cleaned_data['count']
created = 0
for _ in range(count):
while True:
code = generate_demo_code()
if not DemoCode.objects.filter(code=code).exists():
DemoCode.objects.create(code=code)
created += 1
break
self.message_user(request, f"{created} codes generated.")
return redirect('..')
else:
form = GenerateCodesForm()
return render(request, 'admin/generate_codes.html', {'form': form})
admin.site.register(Document, DocumentAdmin)
admin.site.register(Organization, OrganizationAdmin)
admin.site.register(Risk ,RiskAdmin)
admin.site.register(Control, ControlAdmin)
admin.site.register(DocumentTemplate, DocumentTemplateAdmin)
admin.site.register(DocumentRiskControl, DocumentRiskControlAdmin)
admin.site.register(DemoCode, DemoCodeAdmin)