import datetime import json from django.conf import settings from django.contrib.auth import get_user_model from django.contrib.contenttypes.models import ContentType from django.core.exceptions import ValidationError from django.core.serializers.json import DjangoJSONEncoder from django.test import TestCase from django.utils import timezone from freezegun import freeze_time from wagtail.log_actions import LogActionRegistry from wagtail.log_actions import registry as log_registry from wagtail.models import ( Page, PageLogEntry, PageViewRestriction, Task, Workflow, WorkflowTask, ) from wagtail.models.audit_log import ModelLogEntry from wagtail.test.testapp.models import FullFeaturedSnippet, SimplePage from wagtail.test.utils import WagtailTestUtils class TestAuditLogManager(WagtailTestUtils, TestCase): def setUp(self): self.user = self.create_superuser( username="administrator", email="administrator@email.com", password="password", ) self.page = Page.objects.get(pk=1) self.simple_page = self.page.add_child( instance=SimplePage( title="Simple page", slug="simple", content="Hello", owner=self.user ) ) self.snippet_1 = FullFeaturedSnippet.objects.create(text="snippet 1") self.snippet_2 = FullFeaturedSnippet.objects.create(text="snippet 2") self.snippet_content_type = ContentType.objects.get_for_model( FullFeaturedSnippet ) def test_log_action_for_page(self): now = timezone.now() with freeze_time(now): entry = PageLogEntry.objects.log_action( self.page, "wagtail.edit", user=self.user ) self.assertEqual(entry.content_type, self.page.content_type) self.assertEqual(entry.user, self.user) self.assertEqual(entry.timestamp, now) def test_log_action_for_snippet(self): now = timezone.now() with freeze_time(now): entry = ModelLogEntry.objects.log_action( self.snippet_1, "wagtail.edit", user=self.user ) self.assertEqual(entry.content_type, self.snippet_content_type) self.assertEqual(entry.user, self.user) self.assertEqual(entry.timestamp, now) def test_get_for_page_model(self): PageLogEntry.objects.log_action(self.page, "wagtail.edit") PageLogEntry.objects.log_action(self.simple_page, "wagtail.edit") entries = PageLogEntry.objects.get_for_model(SimplePage) self.assertEqual(entries.count(), 2) self.assertListEqual( list(entries), list(PageLogEntry.objects.filter(page=self.simple_page)) ) def test_get_for_snippet_model(self): ModelLogEntry.objects.log_action(self.snippet_1, "wagtail.edit") ModelLogEntry.objects.log_action(self.snippet_2, "wagtail.edit") entries = ModelLogEntry.objects.get_for_model(FullFeaturedSnippet) self.assertEqual(entries.count(), 2) self.assertListEqual( list(entries), list(ModelLogEntry.objects.filter(content_type=self.snippet_content_type)), ) def test_get_for_user(self): self.assertEqual( PageLogEntry.objects.get_for_user(self.user).count(), 1 ) # the create from setUp def test_get_for_page_instance(self): PageLogEntry.objects.log_action(self.page, "wagtail.edit") PageLogEntry.objects.log_action(self.simple_page, "wagtail.edit") other_simple_page = self.page.add_child( instance=SimplePage( title="Simple page 2", slug="simple2", content="Hello", owner=self.user ) ) PageLogEntry.objects.log_action(other_simple_page, "wagtail.edit") entries = PageLogEntry.objects.for_instance(self.simple_page) expected_entries = list(PageLogEntry.objects.filter(page=self.simple_page)) self.assertEqual(entries.count(), 2) self.assertListEqual(list(entries), expected_entries) # should also be able to retrieve entries via the log registry, which # eliminates the need to know that PageLogEntry is the log entry model entries = log_registry.get_logs_for_instance(self.simple_page) self.assertEqual(entries.count(), 2) self.assertListEqual(list(entries), expected_entries) def test_get_for_snippet_instance(self): ModelLogEntry.objects.log_action(self.snippet_1, "wagtail.edit") ModelLogEntry.objects.log_action(self.snippet_2, "wagtail.edit") entries = ModelLogEntry.objects.for_instance(self.snippet_1) expected_entries = list( ModelLogEntry.objects.filter( content_type=self.snippet_content_type, object_id=self.snippet_1.pk ) ) self.assertEqual(entries.count(), 1) self.assertListEqual(list(entries), expected_entries) # should also be able to retrieve entries via the log registry, which # eliminates the need to know that ModelLogEntry is the log entry model entries = log_registry.get_logs_for_instance(self.snippet_1) self.assertEqual(entries.count(), 1) self.assertListEqual(list(entries), expected_entries) class TestAuditLog(TestCase): def setUp(self): self.root_page = Page.objects.get(id=1) self.home_page = self.root_page.add_child( instance=SimplePage(title="Homepage", slug="home2", content="hello") ) PageLogEntry.objects.all().delete() # clean up the log entries here. def test_page_create(self): self.assertEqual(PageLogEntry.objects.count(), 0) # homepage page = self.home_page.add_child( instance=SimplePage(title="Hello", slug="my-page", content="world") ) self.assertEqual(PageLogEntry.objects.count(), 1) log_entry = PageLogEntry.objects.order_by("pk").last() self.assertEqual(log_entry.action, "wagtail.create") self.assertEqual(log_entry.page_id, page.id) self.assertEqual(log_entry.content_type, page.content_type) self.assertEqual(log_entry.label, page.get_admin_display_title()) def test_alias_create_from_published_page_doesnt_log_publish_action(self): self.home_page.live = True self.home_page.save() alias = self.home_page.create_alias(update_slug="the-alias") self.assertTrue(alias.live) self.assertEqual( PageLogEntry.objects.filter(action="wagtail.publish").count(), 0 ) def test_page_edit(self): # Directly saving a revision should not yield a log entry self.home_page.save_revision() self.assertEqual(PageLogEntry.objects.count(), 0) # Explicitly ask to record the revision change self.home_page.save_revision(log_action=True) self.assertEqual(PageLogEntry.objects.count(), 1) self.assertEqual(PageLogEntry.objects.filter(action="wagtail.edit").count(), 1) # passing a string for the action should log this. self.home_page.save_revision(log_action="wagtail.revert") self.assertEqual( PageLogEntry.objects.filter(action="wagtail.revert").count(), 1 ) def test_page_publish(self): revision = self.home_page.save_revision() revision.publish() self.assertEqual(PageLogEntry.objects.count(), 1) self.assertEqual( PageLogEntry.objects.filter(action="wagtail.publish").count(), 1 ) def test_page_publish_doesnt_log_for_aliases(self): self.home_page.create_alias(update_slug="the-alias") revision = self.home_page.save_revision() revision.publish() self.assertEqual( PageLogEntry.objects.filter(action="wagtail.publish").count(), 1 ) def test_page_rename(self): # Should not log a name change when publishing the first revision revision = self.home_page.save_revision() self.home_page.title = "Old title" self.home_page.save() revision.publish() self.assertEqual( PageLogEntry.objects.filter(action="wagtail.publish").count(), 1 ) self.assertEqual( PageLogEntry.objects.filter(action="wagtail.rename").count(), 0 ) # Now, check the rename is logged revision = self.home_page.save_revision() self.home_page.title = "New title" self.home_page.save() revision.publish() self.assertEqual(PageLogEntry.objects.count(), 3) self.assertEqual( PageLogEntry.objects.filter(action="wagtail.publish").count(), 2 ) self.assertEqual( PageLogEntry.objects.filter(action="wagtail.rename").count(), 1 ) def test_page_unpublish(self): self.home_page.unpublish() self.assertEqual(PageLogEntry.objects.count(), 1) self.assertEqual( PageLogEntry.objects.filter(action="wagtail.unpublish").count(), 1 ) def test_page_unpublish_doesnt_log_for_aliases(self): self.home_page.create_alias(update_slug="the-alias") self.home_page.unpublish() self.assertEqual( PageLogEntry.objects.filter(action="wagtail.unpublish").count(), 1 ) def test_revision_revert(self): revision1 = self.home_page.save_revision() self.home_page.save_revision() self.home_page.save_revision(log_action=True, previous_revision=revision1) self.assertEqual( PageLogEntry.objects.filter(action="wagtail.revert").count(), 1 ) def test_revision_schedule_publish(self): go_live_at = datetime.datetime.now() + datetime.timedelta(days=1) if settings.USE_TZ: go_live_at = timezone.make_aware(go_live_at) expected_go_live_at = timezone.localtime(go_live_at, datetime.timezone.utc) else: expected_go_live_at = go_live_at self.home_page.go_live_at = go_live_at # with no live revision revision = self.home_page.save_revision() revision.publish() log_entries = PageLogEntry.objects.filter(action="wagtail.publish.schedule") self.assertEqual(log_entries.count(), 1) self.assertEqual(log_entries[0].data["revision"]["id"], revision.id) self.assertEqual( log_entries[0].data["revision"]["go_live_at"], # skip double quotes json.dumps(expected_go_live_at, cls=DjangoJSONEncoder)[1:-1], ) def test_revision_schedule_revert(self): revision1 = self.home_page.save_revision() revision2 = self.home_page.save_revision() if settings.USE_TZ: self.home_page.go_live_at = timezone.make_aware( datetime.datetime.now() + datetime.timedelta(days=1) ) else: self.home_page.go_live_at = datetime.datetime.now() + datetime.timedelta( days=1 ) schedule_revision = self.home_page.save_revision( log_action=True, previous_revision=revision2 ) schedule_revision.publish(previous_revision=revision1) self.assertListEqual( list(PageLogEntry.objects.values_list("action", flat=True)), [ "wagtail.publish.schedule", "wagtail.revert", ], # order_by -timestamp, by default ) def test_revision_cancel_schedule(self): go_live_at = datetime.datetime.now() + datetime.timedelta(days=1) if settings.USE_TZ: go_live_at = timezone.make_aware(go_live_at) expected_go_live_at = timezone.localtime(go_live_at, datetime.timezone.utc) else: expected_go_live_at = go_live_at self.home_page.go_live_at = go_live_at revision = self.home_page.save_revision() revision.publish() revision.approved_go_live_at = None revision.save(update_fields=["approved_go_live_at"]) log_entries = PageLogEntry.objects.filter(action="wagtail.schedule.cancel") self.assertEqual(log_entries.count(), 1) self.assertEqual(log_entries[0].data["revision"]["id"], revision.id) self.assertEqual( log_entries[0].data["revision"]["go_live_at"], # skip double quotes json.dumps(expected_go_live_at, cls=DjangoJSONEncoder)[1:-1], ) # The home_page was live already and we've only cancelled the publication of the above revision. self.assertTrue(log_entries[0].data["revision"]["has_live_version"]) def test_page_lock_unlock(self): self.home_page.save(log_action="wagtail.lock") self.home_page.save(log_action="wagtail.unlock") self.assertEqual( PageLogEntry.objects.filter( action__in=["wagtail.lock", "wagtail.unlock"] ).count(), 2, ) def test_page_copy(self): self.home_page.copy(update_attrs={"title": "About us", "slug": "about-us"}) self.assertListEqual( list(PageLogEntry.objects.values_list("action", flat=True)), ["wagtail.publish", "wagtail.copy", "wagtail.create"], ) def test_page_reorder(self): section_1 = self.root_page.add_child( instance=SimplePage(title="Child 1", slug="child-1", content="hello") ) self.root_page.add_child( instance=SimplePage(title="Child 2", slug="child-2", content="hello") ) user = get_user_model().objects.first() # Reorder section 1 to be the last page under root_page. # This should log as `wagtail.reorder` because the page was moved under the same parent page section_1.move(self.root_page, user=user, pos="last-child") self.assertEqual( PageLogEntry.objects.filter(action="wagtail.reorder", user=user).count(), 1 ) self.assertEqual( PageLogEntry.objects.filter(action="wagtail.move", user=user).count(), 0 ) def test_page_move(self): section = self.root_page.add_child( instance=SimplePage(title="About us", slug="about", content="hello") ) user = get_user_model().objects.first() # move() interprets `target` as an intended 'sibling' by default, so # we must use `pos` to indicate that `self.home_page` should be the # new 'parent' section.move(self.home_page, pos="last-child", user=user) self.assertEqual( PageLogEntry.objects.filter(action="wagtail.move", user=user).count(), 1 ) self.assertEqual( PageLogEntry.objects.filter(action="wagtail.reorder", user=user).count(), 0 ) def test_page_delete(self): self.home_page.add_child( instance=SimplePage(title="Child", slug="child-page", content="hello") ) child = self.home_page.add_child( instance=SimplePage( title="Another child", slug="child-page-2", content="hello" ) ) child.add_child( instance=SimplePage( title="Grandchild", slug="grandchild-page", content="hello" ) ) # check deleting a parent page logs descendent deletion self.home_page.delete() self.assertEqual( PageLogEntry.objects.filter(action="wagtail.delete").count(), 4 ) self.assertEqual( set( PageLogEntry.objects.filter(action="wagtail.delete").values_list( "label", flat=True ) ), { "Homepage (simple page)", "Grandchild (simple page)", "Child (simple page)", "Another child (simple page)", }, ) def test_workflow_actions(self): workflow = Workflow.objects.create(name="test_workflow") task_1 = Task.objects.create(name="test_task_1") task_2 = Task.objects.create(name="test_task_2") WorkflowTask.objects.create(workflow=workflow, task=task_1, sort_order=1) WorkflowTask.objects.create(workflow=workflow, task=task_2, sort_order=2) self.home_page.save_revision() user = get_user_model().objects.first() workflow_state = workflow.start(self.home_page, user) workflow_entry = PageLogEntry.objects.filter(action="wagtail.workflow.start") self.assertEqual(workflow_entry.count(), 1) self.assertEqual( workflow_entry[0].data, { "workflow": { "id": workflow.id, "title": workflow.name, "status": workflow_state.status, "task_state_id": workflow_state.current_task_state_id, "next": { "id": workflow_state.current_task_state.task.id, "title": workflow_state.current_task_state.task.name, }, } }, ) # Approve for action in ["approve", "reject"]: with self.subTest(action): task_state = workflow_state.current_task_state task_state.task.on_action( task_state, user=None, action_name=action, comment="This is my comment", ) workflow_state.refresh_from_db() entry = PageLogEntry.objects.filter(action=f"wagtail.workflow.{action}") self.assertEqual(entry.count(), 1) self.assertEqual( entry[0].data, { "workflow": { "id": workflow.id, "title": workflow.name, "status": task_state.status, "task_state_id": task_state.id, "task": { "id": task_state.task.id, "title": task_state.task.name, }, "next": { "id": workflow_state.current_task_state.task.id, "title": workflow_state.current_task_state.task.name, }, }, "comment": "This is my comment", }, ) self.assertEqual(entry[0].comment, "This is my comment") def test_snippet_workflow_actions(self): workflow = Workflow.objects.create(name="test_workflow") task_1 = Task.objects.create(name="test_task_1") task_2 = Task.objects.create(name="test_task_2") WorkflowTask.objects.create(workflow=workflow, task=task_1, sort_order=1) WorkflowTask.objects.create(workflow=workflow, task=task_2, sort_order=2) snippet = FullFeaturedSnippet.objects.create(text="Initial", live=False) snippet.save_revision() user = get_user_model().objects.first() workflow_state = workflow.start(snippet, user) workflow_entry = ModelLogEntry.objects.filter(action="wagtail.workflow.start") self.assertEqual(workflow_entry.count(), 1) self.assertEqual( workflow_entry[0].data, { "workflow": { "id": workflow.id, "title": workflow.name, "status": workflow_state.status, "task_state_id": workflow_state.current_task_state_id, "next": { "id": workflow_state.current_task_state.task.id, "title": workflow_state.current_task_state.task.name, }, } }, ) # Approve for action in ["approve", "reject"]: with self.subTest(action): task_state = workflow_state.current_task_state task_state.task.on_action( task_state, user=None, action_name=action, comment="This is my comment", ) workflow_state.refresh_from_db() entry = ModelLogEntry.objects.filter( action=f"wagtail.workflow.{action}" ) self.assertEqual(entry.count(), 1) self.assertEqual( entry[0].data, { "workflow": { "id": workflow.id, "title": workflow.name, "status": task_state.status, "task_state_id": task_state.id, "task": { "id": task_state.task.id, "title": task_state.task.name, }, "next": { "id": workflow_state.current_task_state.task.id, "title": workflow_state.current_task_state.task.name, }, }, "comment": "This is my comment", }, ) self.assertEqual(entry[0].comment, "This is my comment") def test_workflow_completions_logs_publishing_user(self): workflow = Workflow.objects.create(name="test_workflow") task_1 = Task.objects.create(name="test_task_1") WorkflowTask.objects.create(workflow=workflow, task=task_1, sort_order=1) self.assertFalse(PageLogEntry.objects.filter(action="wagtail.publish").exists()) self.home_page.save_revision() user = get_user_model().objects.first() workflow_state = workflow.start(self.home_page, user) publisher = get_user_model().objects.last() task_state = workflow_state.current_task_state task_state.task.on_action(task_state, user=None, action_name="approve") self.assertEqual( PageLogEntry.objects.get(action="wagtail.publish").user, publisher ) def test_snippet_workflow_completions_logs_publishing_user(self): workflow = Workflow.objects.create(name="test_workflow") task_1 = Task.objects.create(name="test_task_1") WorkflowTask.objects.create(workflow=workflow, task=task_1, sort_order=1) self.assertFalse( ModelLogEntry.objects.filter(action="wagtail.publish").exists() ) snippet = FullFeaturedSnippet.objects.create(text="Initial", live=False) snippet.save_revision() user = get_user_model().objects.first() workflow_state = workflow.start(snippet, user) publisher = get_user_model().objects.last() task_state = workflow_state.current_task_state task_state.task.on_action(task_state, user=None, action_name="approve") self.assertEqual( ModelLogEntry.objects.get(action="wagtail.publish").user, publisher ) def test_page_privacy(self): restriction = PageViewRestriction.objects.create(page=self.home_page) self.assertEqual( PageLogEntry.objects.filter( action="wagtail.view_restriction.create" ).count(), 1, ) restriction.restriction_type = PageViewRestriction.PASSWORD restriction.save() self.assertEqual( PageLogEntry.objects.filter(action="wagtail.view_restriction.edit").count(), 1, ) def test_hook(actions): return actions.register_action("test.custom_action", "Custom action", "Tested!") class TestAuditLogHooks(WagtailTestUtils, TestCase): def setUp(self): self.root_page = Page.objects.get(id=2) def test_register_log_actions_hook(self): log_actions = LogActionRegistry() self.assertTrue(log_actions.action_exists("wagtail.create")) def test_action_must_be_registered(self): # We check actions are registered to let developers know if they have forgotten to register # a new action or made a spelling mistake. It's not intended as a database-level constraint. with self.assertRaises(ValidationError) as e: PageLogEntry.objects.log_action(self.root_page, action="test.custom_action") self.assertEqual( e.exception.message_dict, { "action": [ "The log action 'test.custom_action' has not been registered." ] }, ) def test_action_format_message(self): # All new logs should pass our validation, but older logs or logs that were added in bulk # may be invalid. # Using LogEntry.objects.update, we can bypass the on save validation. log_entry = PageLogEntry.objects.log_action( self.root_page, action="wagtail.create" ) PageLogEntry.objects.update(action="test.custom_action") log_entry.refresh_from_db() log_actions = LogActionRegistry() self.assertEqual(log_entry.message, "Unknown test.custom_action") self.assertFalse(log_actions.action_exists("test.custom_action")) with self.register_hook("register_log_actions", test_hook): log_actions = LogActionRegistry() self.assertTrue(log_actions.action_exists("test.custom_action")) self.assertEqual( log_actions.get_formatter(log_entry).format_message(log_entry), "Tested!", ) self.assertEqual( log_actions.get_action_label("test.custom_action"), "Custom action" )