Files
old-saburly-wagtail-web/env/lib/python3.10/site-packages/wagtail/tests/test_audit_log.py
2024-08-27 20:33:44 +02:00

659 lines
25 KiB
Python

import datetime
import json
from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
from django.core.serializers.json import DjangoJSONEncoder
from django.test import TestCase
from django.utils import timezone
from freezegun import freeze_time
from wagtail.log_actions import LogActionRegistry
from wagtail.log_actions import registry as log_registry
from wagtail.models import (
Page,
PageLogEntry,
PageViewRestriction,
Task,
Workflow,
WorkflowTask,
)
from wagtail.models.audit_log import ModelLogEntry
from wagtail.test.testapp.models import FullFeaturedSnippet, SimplePage
from wagtail.test.utils import WagtailTestUtils
class TestAuditLogManager(WagtailTestUtils, TestCase):
def setUp(self):
self.user = self.create_superuser(
username="administrator",
email="administrator@email.com",
password="password",
)
self.page = Page.objects.get(pk=1)
self.simple_page = self.page.add_child(
instance=SimplePage(
title="Simple page", slug="simple", content="Hello", owner=self.user
)
)
self.snippet_1 = FullFeaturedSnippet.objects.create(text="snippet 1")
self.snippet_2 = FullFeaturedSnippet.objects.create(text="snippet 2")
self.snippet_content_type = ContentType.objects.get_for_model(
FullFeaturedSnippet
)
def test_log_action_for_page(self):
now = timezone.now()
with freeze_time(now):
entry = PageLogEntry.objects.log_action(
self.page, "wagtail.edit", user=self.user
)
self.assertEqual(entry.content_type, self.page.content_type)
self.assertEqual(entry.user, self.user)
self.assertEqual(entry.timestamp, now)
def test_log_action_for_snippet(self):
now = timezone.now()
with freeze_time(now):
entry = ModelLogEntry.objects.log_action(
self.snippet_1, "wagtail.edit", user=self.user
)
self.assertEqual(entry.content_type, self.snippet_content_type)
self.assertEqual(entry.user, self.user)
self.assertEqual(entry.timestamp, now)
def test_get_for_page_model(self):
PageLogEntry.objects.log_action(self.page, "wagtail.edit")
PageLogEntry.objects.log_action(self.simple_page, "wagtail.edit")
entries = PageLogEntry.objects.get_for_model(SimplePage)
self.assertEqual(entries.count(), 2)
self.assertListEqual(
list(entries), list(PageLogEntry.objects.filter(page=self.simple_page))
)
def test_get_for_snippet_model(self):
ModelLogEntry.objects.log_action(self.snippet_1, "wagtail.edit")
ModelLogEntry.objects.log_action(self.snippet_2, "wagtail.edit")
entries = ModelLogEntry.objects.get_for_model(FullFeaturedSnippet)
self.assertEqual(entries.count(), 2)
self.assertListEqual(
list(entries),
list(ModelLogEntry.objects.filter(content_type=self.snippet_content_type)),
)
def test_get_for_user(self):
self.assertEqual(
PageLogEntry.objects.get_for_user(self.user).count(), 1
) # the create from setUp
def test_get_for_page_instance(self):
PageLogEntry.objects.log_action(self.page, "wagtail.edit")
PageLogEntry.objects.log_action(self.simple_page, "wagtail.edit")
other_simple_page = self.page.add_child(
instance=SimplePage(
title="Simple page 2", slug="simple2", content="Hello", owner=self.user
)
)
PageLogEntry.objects.log_action(other_simple_page, "wagtail.edit")
entries = PageLogEntry.objects.for_instance(self.simple_page)
expected_entries = list(PageLogEntry.objects.filter(page=self.simple_page))
self.assertEqual(entries.count(), 2)
self.assertListEqual(list(entries), expected_entries)
# should also be able to retrieve entries via the log registry, which
# eliminates the need to know that PageLogEntry is the log entry model
entries = log_registry.get_logs_for_instance(self.simple_page)
self.assertEqual(entries.count(), 2)
self.assertListEqual(list(entries), expected_entries)
def test_get_for_snippet_instance(self):
ModelLogEntry.objects.log_action(self.snippet_1, "wagtail.edit")
ModelLogEntry.objects.log_action(self.snippet_2, "wagtail.edit")
entries = ModelLogEntry.objects.for_instance(self.snippet_1)
expected_entries = list(
ModelLogEntry.objects.filter(
content_type=self.snippet_content_type, object_id=self.snippet_1.pk
)
)
self.assertEqual(entries.count(), 1)
self.assertListEqual(list(entries), expected_entries)
# should also be able to retrieve entries via the log registry, which
# eliminates the need to know that ModelLogEntry is the log entry model
entries = log_registry.get_logs_for_instance(self.snippet_1)
self.assertEqual(entries.count(), 1)
self.assertListEqual(list(entries), expected_entries)
class TestAuditLog(TestCase):
def setUp(self):
self.root_page = Page.objects.get(id=1)
self.home_page = self.root_page.add_child(
instance=SimplePage(title="Homepage", slug="home2", content="hello")
)
PageLogEntry.objects.all().delete() # clean up the log entries here.
def test_page_create(self):
self.assertEqual(PageLogEntry.objects.count(), 0) # homepage
page = self.home_page.add_child(
instance=SimplePage(title="Hello", slug="my-page", content="world")
)
self.assertEqual(PageLogEntry.objects.count(), 1)
log_entry = PageLogEntry.objects.order_by("pk").last()
self.assertEqual(log_entry.action, "wagtail.create")
self.assertEqual(log_entry.page_id, page.id)
self.assertEqual(log_entry.content_type, page.content_type)
self.assertEqual(log_entry.label, page.get_admin_display_title())
def test_alias_create_from_published_page_doesnt_log_publish_action(self):
self.home_page.live = True
self.home_page.save()
alias = self.home_page.create_alias(update_slug="the-alias")
self.assertTrue(alias.live)
self.assertEqual(
PageLogEntry.objects.filter(action="wagtail.publish").count(), 0
)
def test_page_edit(self):
# Directly saving a revision should not yield a log entry
self.home_page.save_revision()
self.assertEqual(PageLogEntry.objects.count(), 0)
# Explicitly ask to record the revision change
self.home_page.save_revision(log_action=True)
self.assertEqual(PageLogEntry.objects.count(), 1)
self.assertEqual(PageLogEntry.objects.filter(action="wagtail.edit").count(), 1)
# passing a string for the action should log this.
self.home_page.save_revision(log_action="wagtail.revert")
self.assertEqual(
PageLogEntry.objects.filter(action="wagtail.revert").count(), 1
)
def test_page_publish(self):
revision = self.home_page.save_revision()
revision.publish()
self.assertEqual(PageLogEntry.objects.count(), 1)
self.assertEqual(
PageLogEntry.objects.filter(action="wagtail.publish").count(), 1
)
def test_page_publish_doesnt_log_for_aliases(self):
self.home_page.create_alias(update_slug="the-alias")
revision = self.home_page.save_revision()
revision.publish()
self.assertEqual(
PageLogEntry.objects.filter(action="wagtail.publish").count(), 1
)
def test_page_rename(self):
# Should not log a name change when publishing the first revision
revision = self.home_page.save_revision()
self.home_page.title = "Old title"
self.home_page.save()
revision.publish()
self.assertEqual(
PageLogEntry.objects.filter(action="wagtail.publish").count(), 1
)
self.assertEqual(
PageLogEntry.objects.filter(action="wagtail.rename").count(), 0
)
# Now, check the rename is logged
revision = self.home_page.save_revision()
self.home_page.title = "New title"
self.home_page.save()
revision.publish()
self.assertEqual(PageLogEntry.objects.count(), 3)
self.assertEqual(
PageLogEntry.objects.filter(action="wagtail.publish").count(), 2
)
self.assertEqual(
PageLogEntry.objects.filter(action="wagtail.rename").count(), 1
)
def test_page_unpublish(self):
self.home_page.unpublish()
self.assertEqual(PageLogEntry.objects.count(), 1)
self.assertEqual(
PageLogEntry.objects.filter(action="wagtail.unpublish").count(), 1
)
def test_page_unpublish_doesnt_log_for_aliases(self):
self.home_page.create_alias(update_slug="the-alias")
self.home_page.unpublish()
self.assertEqual(
PageLogEntry.objects.filter(action="wagtail.unpublish").count(), 1
)
def test_revision_revert(self):
revision1 = self.home_page.save_revision()
self.home_page.save_revision()
self.home_page.save_revision(log_action=True, previous_revision=revision1)
self.assertEqual(
PageLogEntry.objects.filter(action="wagtail.revert").count(), 1
)
def test_revision_schedule_publish(self):
go_live_at = datetime.datetime.now() + datetime.timedelta(days=1)
if settings.USE_TZ:
go_live_at = timezone.make_aware(go_live_at)
expected_go_live_at = timezone.localtime(go_live_at, datetime.timezone.utc)
else:
expected_go_live_at = go_live_at
self.home_page.go_live_at = go_live_at
# with no live revision
revision = self.home_page.save_revision()
revision.publish()
log_entries = PageLogEntry.objects.filter(action="wagtail.publish.schedule")
self.assertEqual(log_entries.count(), 1)
self.assertEqual(log_entries[0].data["revision"]["id"], revision.id)
self.assertEqual(
log_entries[0].data["revision"]["go_live_at"],
# skip double quotes
json.dumps(expected_go_live_at, cls=DjangoJSONEncoder)[1:-1],
)
def test_revision_schedule_revert(self):
revision1 = self.home_page.save_revision()
revision2 = self.home_page.save_revision()
if settings.USE_TZ:
self.home_page.go_live_at = timezone.make_aware(
datetime.datetime.now() + datetime.timedelta(days=1)
)
else:
self.home_page.go_live_at = datetime.datetime.now() + datetime.timedelta(
days=1
)
schedule_revision = self.home_page.save_revision(
log_action=True, previous_revision=revision2
)
schedule_revision.publish(previous_revision=revision1)
self.assertListEqual(
list(PageLogEntry.objects.values_list("action", flat=True)),
[
"wagtail.publish.schedule",
"wagtail.revert",
], # order_by -timestamp, by default
)
def test_revision_cancel_schedule(self):
go_live_at = datetime.datetime.now() + datetime.timedelta(days=1)
if settings.USE_TZ:
go_live_at = timezone.make_aware(go_live_at)
expected_go_live_at = timezone.localtime(go_live_at, datetime.timezone.utc)
else:
expected_go_live_at = go_live_at
self.home_page.go_live_at = go_live_at
revision = self.home_page.save_revision()
revision.publish()
revision.approved_go_live_at = None
revision.save(update_fields=["approved_go_live_at"])
log_entries = PageLogEntry.objects.filter(action="wagtail.schedule.cancel")
self.assertEqual(log_entries.count(), 1)
self.assertEqual(log_entries[0].data["revision"]["id"], revision.id)
self.assertEqual(
log_entries[0].data["revision"]["go_live_at"],
# skip double quotes
json.dumps(expected_go_live_at, cls=DjangoJSONEncoder)[1:-1],
)
# The home_page was live already and we've only cancelled the publication of the above revision.
self.assertTrue(log_entries[0].data["revision"]["has_live_version"])
def test_page_lock_unlock(self):
self.home_page.save(log_action="wagtail.lock")
self.home_page.save(log_action="wagtail.unlock")
self.assertEqual(
PageLogEntry.objects.filter(
action__in=["wagtail.lock", "wagtail.unlock"]
).count(),
2,
)
def test_page_copy(self):
self.home_page.copy(update_attrs={"title": "About us", "slug": "about-us"})
self.assertListEqual(
list(PageLogEntry.objects.values_list("action", flat=True)),
["wagtail.publish", "wagtail.copy", "wagtail.create"],
)
def test_page_reorder(self):
section_1 = self.root_page.add_child(
instance=SimplePage(title="Child 1", slug="child-1", content="hello")
)
self.root_page.add_child(
instance=SimplePage(title="Child 2", slug="child-2", content="hello")
)
user = get_user_model().objects.first()
# Reorder section 1 to be the last page under root_page.
# This should log as `wagtail.reorder` because the page was moved under the same parent page
section_1.move(self.root_page, user=user, pos="last-child")
self.assertEqual(
PageLogEntry.objects.filter(action="wagtail.reorder", user=user).count(), 1
)
self.assertEqual(
PageLogEntry.objects.filter(action="wagtail.move", user=user).count(), 0
)
def test_page_move(self):
section = self.root_page.add_child(
instance=SimplePage(title="About us", slug="about", content="hello")
)
user = get_user_model().objects.first()
# move() interprets `target` as an intended 'sibling' by default, so
# we must use `pos` to indicate that `self.home_page` should be the
# new 'parent'
section.move(self.home_page, pos="last-child", user=user)
self.assertEqual(
PageLogEntry.objects.filter(action="wagtail.move", user=user).count(), 1
)
self.assertEqual(
PageLogEntry.objects.filter(action="wagtail.reorder", user=user).count(), 0
)
def test_page_delete(self):
self.home_page.add_child(
instance=SimplePage(title="Child", slug="child-page", content="hello")
)
child = self.home_page.add_child(
instance=SimplePage(
title="Another child", slug="child-page-2", content="hello"
)
)
child.add_child(
instance=SimplePage(
title="Grandchild", slug="grandchild-page", content="hello"
)
)
# check deleting a parent page logs descendent deletion
self.home_page.delete()
self.assertEqual(
PageLogEntry.objects.filter(action="wagtail.delete").count(), 4
)
self.assertEqual(
set(
PageLogEntry.objects.filter(action="wagtail.delete").values_list(
"label", flat=True
)
),
{
"Homepage (simple page)",
"Grandchild (simple page)",
"Child (simple page)",
"Another child (simple page)",
},
)
def test_workflow_actions(self):
workflow = Workflow.objects.create(name="test_workflow")
task_1 = Task.objects.create(name="test_task_1")
task_2 = Task.objects.create(name="test_task_2")
WorkflowTask.objects.create(workflow=workflow, task=task_1, sort_order=1)
WorkflowTask.objects.create(workflow=workflow, task=task_2, sort_order=2)
self.home_page.save_revision()
user = get_user_model().objects.first()
workflow_state = workflow.start(self.home_page, user)
workflow_entry = PageLogEntry.objects.filter(action="wagtail.workflow.start")
self.assertEqual(workflow_entry.count(), 1)
self.assertEqual(
workflow_entry[0].data,
{
"workflow": {
"id": workflow.id,
"title": workflow.name,
"status": workflow_state.status,
"task_state_id": workflow_state.current_task_state_id,
"next": {
"id": workflow_state.current_task_state.task.id,
"title": workflow_state.current_task_state.task.name,
},
}
},
)
# Approve
for action in ["approve", "reject"]:
with self.subTest(action):
task_state = workflow_state.current_task_state
task_state.task.on_action(
task_state,
user=None,
action_name=action,
comment="This is my comment",
)
workflow_state.refresh_from_db()
entry = PageLogEntry.objects.filter(action=f"wagtail.workflow.{action}")
self.assertEqual(entry.count(), 1)
self.assertEqual(
entry[0].data,
{
"workflow": {
"id": workflow.id,
"title": workflow.name,
"status": task_state.status,
"task_state_id": task_state.id,
"task": {
"id": task_state.task.id,
"title": task_state.task.name,
},
"next": {
"id": workflow_state.current_task_state.task.id,
"title": workflow_state.current_task_state.task.name,
},
},
"comment": "This is my comment",
},
)
self.assertEqual(entry[0].comment, "This is my comment")
def test_snippet_workflow_actions(self):
workflow = Workflow.objects.create(name="test_workflow")
task_1 = Task.objects.create(name="test_task_1")
task_2 = Task.objects.create(name="test_task_2")
WorkflowTask.objects.create(workflow=workflow, task=task_1, sort_order=1)
WorkflowTask.objects.create(workflow=workflow, task=task_2, sort_order=2)
snippet = FullFeaturedSnippet.objects.create(text="Initial", live=False)
snippet.save_revision()
user = get_user_model().objects.first()
workflow_state = workflow.start(snippet, user)
workflow_entry = ModelLogEntry.objects.filter(action="wagtail.workflow.start")
self.assertEqual(workflow_entry.count(), 1)
self.assertEqual(
workflow_entry[0].data,
{
"workflow": {
"id": workflow.id,
"title": workflow.name,
"status": workflow_state.status,
"task_state_id": workflow_state.current_task_state_id,
"next": {
"id": workflow_state.current_task_state.task.id,
"title": workflow_state.current_task_state.task.name,
},
}
},
)
# Approve
for action in ["approve", "reject"]:
with self.subTest(action):
task_state = workflow_state.current_task_state
task_state.task.on_action(
task_state,
user=None,
action_name=action,
comment="This is my comment",
)
workflow_state.refresh_from_db()
entry = ModelLogEntry.objects.filter(
action=f"wagtail.workflow.{action}"
)
self.assertEqual(entry.count(), 1)
self.assertEqual(
entry[0].data,
{
"workflow": {
"id": workflow.id,
"title": workflow.name,
"status": task_state.status,
"task_state_id": task_state.id,
"task": {
"id": task_state.task.id,
"title": task_state.task.name,
},
"next": {
"id": workflow_state.current_task_state.task.id,
"title": workflow_state.current_task_state.task.name,
},
},
"comment": "This is my comment",
},
)
self.assertEqual(entry[0].comment, "This is my comment")
def test_workflow_completions_logs_publishing_user(self):
workflow = Workflow.objects.create(name="test_workflow")
task_1 = Task.objects.create(name="test_task_1")
WorkflowTask.objects.create(workflow=workflow, task=task_1, sort_order=1)
self.assertFalse(PageLogEntry.objects.filter(action="wagtail.publish").exists())
self.home_page.save_revision()
user = get_user_model().objects.first()
workflow_state = workflow.start(self.home_page, user)
publisher = get_user_model().objects.last()
task_state = workflow_state.current_task_state
task_state.task.on_action(task_state, user=None, action_name="approve")
self.assertEqual(
PageLogEntry.objects.get(action="wagtail.publish").user, publisher
)
def test_snippet_workflow_completions_logs_publishing_user(self):
workflow = Workflow.objects.create(name="test_workflow")
task_1 = Task.objects.create(name="test_task_1")
WorkflowTask.objects.create(workflow=workflow, task=task_1, sort_order=1)
self.assertFalse(
ModelLogEntry.objects.filter(action="wagtail.publish").exists()
)
snippet = FullFeaturedSnippet.objects.create(text="Initial", live=False)
snippet.save_revision()
user = get_user_model().objects.first()
workflow_state = workflow.start(snippet, user)
publisher = get_user_model().objects.last()
task_state = workflow_state.current_task_state
task_state.task.on_action(task_state, user=None, action_name="approve")
self.assertEqual(
ModelLogEntry.objects.get(action="wagtail.publish").user, publisher
)
def test_page_privacy(self):
restriction = PageViewRestriction.objects.create(page=self.home_page)
self.assertEqual(
PageLogEntry.objects.filter(
action="wagtail.view_restriction.create"
).count(),
1,
)
restriction.restriction_type = PageViewRestriction.PASSWORD
restriction.save()
self.assertEqual(
PageLogEntry.objects.filter(action="wagtail.view_restriction.edit").count(),
1,
)
def test_hook(actions):
return actions.register_action("test.custom_action", "Custom action", "Tested!")
class TestAuditLogHooks(WagtailTestUtils, TestCase):
def setUp(self):
self.root_page = Page.objects.get(id=2)
def test_register_log_actions_hook(self):
log_actions = LogActionRegistry()
self.assertTrue(log_actions.action_exists("wagtail.create"))
def test_action_must_be_registered(self):
# We check actions are registered to let developers know if they have forgotten to register
# a new action or made a spelling mistake. It's not intended as a database-level constraint.
with self.assertRaises(ValidationError) as e:
PageLogEntry.objects.log_action(self.root_page, action="test.custom_action")
self.assertEqual(
e.exception.message_dict,
{
"action": [
"The log action 'test.custom_action' has not been registered."
]
},
)
def test_action_format_message(self):
# All new logs should pass our validation, but older logs or logs that were added in bulk
# may be invalid.
# Using LogEntry.objects.update, we can bypass the on save validation.
log_entry = PageLogEntry.objects.log_action(
self.root_page, action="wagtail.create"
)
PageLogEntry.objects.update(action="test.custom_action")
log_entry.refresh_from_db()
log_actions = LogActionRegistry()
self.assertEqual(log_entry.message, "Unknown test.custom_action")
self.assertFalse(log_actions.action_exists("test.custom_action"))
with self.register_hook("register_log_actions", test_hook):
log_actions = LogActionRegistry()
self.assertTrue(log_actions.action_exists("test.custom_action"))
self.assertEqual(
log_actions.get_formatter(log_entry).format_message(log_entry),
"Tested!",
)
self.assertEqual(
log_actions.get_action_label("test.custom_action"), "Custom action"
)