Import and Export CSV
This commit is contained in:
38
backend/core/management/commands/export_risks.py
Normal file
38
backend/core/management/commands/export_risks.py
Normal file
@@ -0,0 +1,38 @@
|
||||
import csv
|
||||
from django.core.management.base import BaseCommand
|
||||
from backend.core.models import Risk
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = "Export risks to a CSV file"
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument('csv_file', type=str, help="The CSV file to export risks to")
|
||||
|
||||
def handle(self, *args, **options):
|
||||
csv_file_path = options['csv_file']
|
||||
|
||||
with open(csv_file_path, mode="w", newline="") as csv_file:
|
||||
fieldnames = [
|
||||
'Risk ID','Category','Risk Name', 'Primary Impact', 'Secondary Impact',
|
||||
'Tertiary Impact', 'Detection Difficulty', 'Recovery Complexity',
|
||||
'Business Impact Severity'
|
||||
|
||||
]
|
||||
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
|
||||
|
||||
writer.writeheader()
|
||||
for risk in Risk.objects.all():
|
||||
writer.writerow({
|
||||
'Risk ID': risk.risk_id,
|
||||
'Category': risk.category,
|
||||
'Risk Name': risk.risk_name,
|
||||
'Primary Impact': risk.primary_impact,
|
||||
'Secondary Impact': risk.secondary_impact,
|
||||
'Tertiary Impact': risk.tretiary_impact,
|
||||
'Detection Difficulty': risk.detection_difficulty,
|
||||
'Recovery Complexity': risk.recovery_complexity,
|
||||
'Business Impact Severity': risk.businnes_impact_severity,
|
||||
|
||||
})
|
||||
self.stdout.write(self.style.SUCCESS('Risks exported successfully'))
|
||||
32
backend/core/management/commands/import_risks.py
Normal file
32
backend/core/management/commands/import_risks.py
Normal file
@@ -0,0 +1,32 @@
|
||||
import csv
|
||||
from django.core.management.base import BaseCommand , CommandError
|
||||
from backend.core.models import Risk
|
||||
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = "Import risks from CSV file"
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument('csv_file',type=str, help="CSV file to import risks from")
|
||||
|
||||
def handle(self, *args, **options):
|
||||
csv_file_path = options['csv_file']
|
||||
|
||||
with open(csv_file_path, mode="r") as csv_file:
|
||||
reader = csv.DictReader(csv_file)
|
||||
for row in reader:
|
||||
Risk.objects.update_or_create(
|
||||
risk_id=row["Risk ID"],
|
||||
defaults = {
|
||||
'category':row['Category'],
|
||||
'risk_name': row['Risk Name'],
|
||||
'primary_impact': row['Primary Impact'],
|
||||
'secondary_impact': row['Secondary Impact'],
|
||||
'tretiary_impact':row['Tertiary Impact'],
|
||||
'detection_difficulty': row['Detection Difficulty'],
|
||||
'recovery_complexity':row['Recovery Complexity'],
|
||||
'businnes_impact_severity': row['Business Impact Severity']
|
||||
}
|
||||
)
|
||||
self.stdout.write(self.style.SUCCESS('Risks imported successfully'))
|
||||
0
backend/core/management/commands/tests/__init__.py
Normal file
0
backend/core/management/commands/tests/__init__.py
Normal file
61
backend/core/management/commands/tests/test_export_risks.py
Normal file
61
backend/core/management/commands/tests/test_export_risks.py
Normal file
@@ -0,0 +1,61 @@
|
||||
import os
|
||||
import csv
|
||||
from django.core.management import call_command
|
||||
from django.test import TestCase
|
||||
from django.conf import settings
|
||||
from backend.core.models import Risk
|
||||
|
||||
class ExportRisksCommandTest(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.risk = Risk.objects.create(
|
||||
risk_id="1",
|
||||
category="Security",
|
||||
risk_name="Data Breach",
|
||||
primary_impact="High",
|
||||
secondary_impact="Medium",
|
||||
tretiary_impact="Low",
|
||||
detection_difficulty="High",
|
||||
recovery_complexity="Medium",
|
||||
businnes_impact_severity="High"
|
||||
)
|
||||
self.csv_file_path = os.path.join(settings.BASE_DIR, "test_risks.csv")
|
||||
|
||||
def tearDown(self):
|
||||
if os.path.exists(self.csv_file_path):
|
||||
os.remove(self.csv_file_path)
|
||||
|
||||
def test_export_risks_to_csv(self):
|
||||
call_command('export_risks', self.csv_file_path)
|
||||
|
||||
self.assertTrue(os.path.exists(self.csv_file_path))
|
||||
|
||||
with open(self.csv_file_path, mode='r') as csv_file:
|
||||
reader = csv.DictReader(csv_file)
|
||||
rows = list(reader)
|
||||
|
||||
self.assertEqual(len(rows), 1)
|
||||
|
||||
row = rows[0]
|
||||
self.assertEqual(row['Risk ID'], self.risk.risk_id)
|
||||
self.assertEqual(row['Category'], self.risk.category)
|
||||
self.assertEqual(row['Risk Name'], self.risk.risk_name)
|
||||
self.assertEqual(row['Primary Impact'], self.risk.primary_impact)
|
||||
self.assertEqual(row['Secondary Impact'], self.risk.secondary_impact)
|
||||
self.assertEqual(row['Tertiary Impact'], self.risk.tretiary_impact)
|
||||
self.assertEqual(row['Detection Difficulty'], self.risk.detection_difficulty)
|
||||
self.assertEqual(row['Recovery Complexity'], self.risk.recovery_complexity)
|
||||
self.assertEqual(row['Business Impact Severity'], self.risk.businnes_impact_severity)
|
||||
|
||||
def test_no_risks_in_db(self):
|
||||
Risk.objects.all().delete()
|
||||
|
||||
call_command('export_risks', self.csv_file_path)
|
||||
|
||||
self.assertTrue(os.path.exists(self.csv_file_path))
|
||||
|
||||
with open(self.csv_file_path, mode='r') as csv_file:
|
||||
reader = csv.DictReader(csv_file)
|
||||
rows = list(reader)
|
||||
|
||||
self.assertEqual(len(rows), 0)
|
||||
54
backend/core/management/commands/tests/test_import_risks.py
Normal file
54
backend/core/management/commands/tests/test_import_risks.py
Normal file
@@ -0,0 +1,54 @@
|
||||
import os
|
||||
import csv
|
||||
import tempfile
|
||||
from django.core.management import call_command
|
||||
from django.test import TestCase
|
||||
from backend.core.models import Risk
|
||||
|
||||
class ImportRisksCommandTest(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
"""Create a temporary CSV file with test risk data"""
|
||||
self.temp_csv = tempfile.NamedTemporaryFile(mode="w", delete=False, newline='', encoding='utf-8')
|
||||
fieldnames = [
|
||||
"Risk ID", "Category", "Risk Name", "Primary Impact",
|
||||
"Secondary Impact", "Tertiary Impact", "Detection Difficulty",
|
||||
"Recovery Complexity", "Business Impact Severity"
|
||||
]
|
||||
|
||||
writer = csv.DictWriter(self.temp_csv, fieldnames=fieldnames)
|
||||
writer.writeheader()
|
||||
writer.writerow({
|
||||
"Risk ID": "1",
|
||||
"Category": "Operational",
|
||||
"Risk Name": "Data Loss",
|
||||
"Primary Impact": "High",
|
||||
"Secondary Impact": "Medium",
|
||||
"Tertiary Impact": "Low",
|
||||
"Detection Difficulty": "Hard",
|
||||
"Recovery Complexity": "Complex",
|
||||
"Business Impact Severity": "Severe"
|
||||
})
|
||||
self.temp_csv.close()
|
||||
|
||||
def test_import_risks_command(self):
|
||||
"""Test importing risks from a CSV file"""
|
||||
# Ensure no risks exist before import
|
||||
self.assertEqual(Risk.objects.count(), 0)
|
||||
|
||||
# Run the import command
|
||||
call_command('import_risks', self.temp_csv.name)
|
||||
|
||||
# Check if the risk has been imported
|
||||
self.assertEqual(Risk.objects.count(), 1)
|
||||
risk = Risk.objects.get(risk_id="1")
|
||||
self.assertEqual(risk.category, "Operational")
|
||||
self.assertEqual(risk.risk_name, "Data Loss")
|
||||
self.assertEqual(risk.primary_impact, "High")
|
||||
self.assertEqual(risk.secondary_impact, "Medium")
|
||||
self.assertEqual(risk.tretiary_impact, "Low") # Corrected spelling
|
||||
self.assertEqual(risk.businnes_impact_severity, "Severe") # Corrected spelling
|
||||
|
||||
def tearDown(self):
|
||||
"""Remove temporary CSV file after test"""
|
||||
os.remove(self.temp_csv.name)
|
||||
Reference in New Issue
Block a user