From d75590b943907175472cdf083d7c88039d2cb65f Mon Sep 17 00:00:00 2001 From: Michael Derynck Date: Tue, 16 Apr 2024 08:39:00 -0600 Subject: [PATCH] Handle alert group deleted when task is already queued (#4230) # What this PR does - Since send_alert_create_signal is inside transaction on_commit we can conclude that if it does not exist it was intentionally deleted before the task could run and the task can exit instead of retrying - Improve logging when send_alert_create_signal is called so both alert and alert group are in the same line so you don't need to search the logs as much - Improve logging on public api delete alert group so we can know what the alert group belonged to and the responsible user/org - Remove distribute_alerts (Stopped using a while back, code should be safe to remove now, no tasks running in system) ## Which issue(s) this PR closes Closes https://github.com/grafana/oncall-private/issues/2640 ## Checklist - [x] Unit, integration, and e2e (if applicable) tests updated - [x] Documentation added (or `pr:no public docs` PR label added if not required) - [x] Added the relevant release notes label (see labels prefixed w/ `release:`). These labels dictate how your PR will show up in the autogenerated release notes. --- engine/apps/alerts/models/alert.py | 2 +- engine/apps/alerts/models/alert_group.py | 1 + engine/apps/alerts/tasks/__init__.py | 1 - engine/apps/alerts/tasks/distribute_alert.py | 45 ++-------- engine/apps/alerts/tests/test_alert.py | 89 +++++++++++--------- engine/apps/webhooks/listeners.py | 4 +- engine/settings/celery_task_routes.py | 1 - 7 files changed, 61 insertions(+), 82 deletions(-) diff --git a/engine/apps/alerts/models/alert.py b/engine/apps/alerts/models/alert.py index 4751f883..5c3d1358 100644 --- a/engine/apps/alerts/models/alert.py +++ b/engine/apps/alerts/models/alert.py @@ -136,7 +136,7 @@ class Alert(models.Model): is_the_first_alert_in_group=group_created, ) alert.save() - logger.debug(f"alert {alert.pk} created") + logger.debug(f"alert {alert.pk} created for alert group {group.pk}") transaction.on_commit(partial(send_alert_create_signal.apply_async, (alert.pk,))) diff --git a/engine/apps/alerts/models/alert_group.py b/engine/apps/alerts/models/alert_group.py index 90488977..d93f4f72 100644 --- a/engine/apps/alerts/models/alert_group.py +++ b/engine/apps/alerts/models/alert_group.py @@ -1285,6 +1285,7 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. logger.debug( f"send alert_group_action_triggered_signal for alert_group {self.pk}, " + f"in channel {self.channel.pk}, in org {self.channel.organization.pk}, by user {user.pk}, " f"log record {log_record.pk} with type '{log_record.get_type_display()}', " f"action source: delete" ) diff --git a/engine/apps/alerts/tasks/__init__.py b/engine/apps/alerts/tasks/__init__.py index 9e6f026c..056140a3 100644 --- a/engine/apps/alerts/tasks/__init__.py +++ b/engine/apps/alerts/tasks/__init__.py @@ -8,7 +8,6 @@ from .custom_webhook_result import custom_webhook_result # noqa: F401 from .delete_alert_group import delete_alert_group # noqa: F401 from .delete_alert_group import finish_delete_alert_group # noqa: F401 from .delete_alert_group import send_alert_group_signal_for_delete # noqa: F401 -from .distribute_alert import distribute_alert # noqa: F401 from .escalate_alert_group import escalate_alert_group # noqa: F401 from .invite_user_to_join_incident import invite_user_to_join_incident # noqa: F401 from .maintenance import disable_maintenance # noqa: F401 diff --git a/engine/apps/alerts/tasks/distribute_alert.py b/engine/apps/alerts/tasks/distribute_alert.py index f69f80fe..b3dc0293 100644 --- a/engine/apps/alerts/tasks/distribute_alert.py +++ b/engine/apps/alerts/tasks/distribute_alert.py @@ -1,48 +1,11 @@ from django.conf import settings -from apps.alerts.constants import TASK_DELAY_SECONDS -from apps.alerts.signals import alert_create_signal, alert_group_escalation_snapshot_built +from apps.alerts.signals import alert_create_signal from common.custom_celery_tasks import shared_dedicated_queue_retry_task from .task_logger import task_logger -@shared_dedicated_queue_retry_task( - autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None, default_retry_delay=60 -) -def distribute_alert(alert_id): - """ - We need this task to make task processing async and to make sure the task is delivered. - This task is not used anymore, but we keep it for the tasks in the queue to be processed. - TODO: remove this task after all the tasks in the queue are processed. - """ - from apps.alerts.models import Alert - - alert = Alert.objects.get(pk=alert_id) - task_logger.debug(f"Start distribute_alert for alert {alert_id} from alert_group {alert.group_id}") - - send_alert_create_signal.apply_async((alert_id,)) - - # Launch escalation for the group if it's the first alert, or if the group is paused. - # "paused" means that the current escalation step is "Continue escalation if >X alerts per Y minutes" and there are - # not enough alerts to trigger the escalation further. Launching escalation for a paused group will re-evaluate - # the threshold and advance the escalation if needed, or go back to the same "paused" state if the threshold is - # still not reached. - if alert.is_the_first_alert_in_group or alert.group.pause_escalation: - alert.group.start_escalation_if_needed(countdown=TASK_DELAY_SECONDS) - - if alert.is_the_first_alert_in_group: - alert_group_escalation_snapshot_built.send(sender=distribute_alert, alert_group=alert.group) - - updated_rows = Alert.objects.filter(pk=alert_id, delivered=True).update(delivered=True) - if updated_rows != 1: - task_logger.critical( - f"Tried to mark alert {alert_id} as delivered but it's already marked as delivered. Possible concurrency issue." - ) - - task_logger.debug(f"Finish distribute_alert for alert {alert_id} from alert_group {alert.group_id}") - - @shared_dedicated_queue_retry_task( autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None ) @@ -50,7 +13,11 @@ def send_alert_create_signal(alert_id): from apps.alerts.models import Alert, AlertReceiveChannel task_logger.debug(f"Started send_alert_create_signal task for alert {alert_id}") - alert = Alert.objects.get(pk=alert_id) + try: + alert = Alert.objects.get(pk=alert_id) + except Alert.DoesNotExist: + task_logger.info(f"Alert {alert_id} does not exist, likely parent alert group was deleted") + return if alert.group.channel.maintenance_mode != AlertReceiveChannel.MAINTENANCE: alert_create_signal.send( diff --git a/engine/apps/alerts/tests/test_alert.py b/engine/apps/alerts/tests/test_alert.py index 74c3cbd0..33cee023 100644 --- a/engine/apps/alerts/tests/test_alert.py +++ b/engine/apps/alerts/tests/test_alert.py @@ -4,7 +4,6 @@ import pytest from django.utils import timezone from apps.alerts.models import Alert, ChannelFilter, EscalationPolicy -from apps.alerts.tasks import distribute_alert, escalate_alert_group from common.jinja_templater.apply_jinja_template import JinjaTemplateError, JinjaTemplateWarning @@ -118,18 +117,19 @@ def test_alert_create_track_received_at_timestamp(make_organization, make_alert_ assert alert_group.received_at == now +@patch("apps.alerts.models.AlertGroup.start_escalation_if_needed", return_value=None) @pytest.mark.django_db def test_distribute_alert_escalate_alert_group( + mock_start_escalation, make_organization, make_alert_receive_channel, make_channel_filter, - make_alert_group, make_alert, make_escalation_chain, make_escalation_policy, ): """ - Check escalate_alert_group is called for the first alert in the group and not called for the second alert in the group. + Check start_escalation_if_needed is called for the first alert in the group and not called for the second alert in the group. """ organization = make_organization() escalation_chain = make_escalation_chain(organization) @@ -139,42 +139,49 @@ def test_distribute_alert_escalate_alert_group( ) alert_receive_channel = make_alert_receive_channel(organization) channel_filter = make_channel_filter(alert_receive_channel, escalation_chain=escalation_chain) - alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter) - # Check escalate_alert_group is called for the first alert in the group - alert_1 = make_alert( - alert_group=alert_group, - is_the_first_alert_in_group=True, + # Check start_escalation_if_needed is called for the first alert in the group + Alert.create( + title="the title", + message="the message", + integration_unique_data={}, + image_url=None, + link_to_upstream_details=None, raw_request_data=alert_receive_channel.config.example_payload, + alert_receive_channel=alert_receive_channel, + channel_filter=channel_filter, ) - with patch.object(escalate_alert_group, "apply_async") as mock_escalate_alert_group_1: - distribute_alert(alert_1.pk) - mock_escalate_alert_group_1.assert_called_once() + mock_start_escalation.assert_called_once() + mock_start_escalation.reset_mock() - # Check escalate_alert_group is not called for the second alert in the group - alert_2 = make_alert( - alert_group=alert_group, - is_the_first_alert_in_group=False, + # Check start_escalation_if_needed is not called for the second alert in the group + Alert.create( + title="the title", + message="the message", + integration_unique_data={}, + image_url=None, + link_to_upstream_details=None, raw_request_data=alert_receive_channel.config.example_payload, + alert_receive_channel=alert_receive_channel, + channel_filter=channel_filter, ) - with patch.object(escalate_alert_group, "apply_async") as mock_escalate_alert_group_2: - distribute_alert(alert_2.pk) - mock_escalate_alert_group_2.assert_not_called() + mock_start_escalation.assert_not_called() +@patch("apps.alerts.models.AlertGroup.start_escalation_if_needed", return_value=None) @pytest.mark.django_db def test_distribute_alert_escalate_alert_group_when_escalation_paused( + mock_start_escalation, make_organization, make_alert_receive_channel, make_channel_filter, - make_alert_group, make_alert, make_escalation_chain, make_escalation_policy, ): """ - Check escalate_alert_group is called for the first alert in the group and for the second alert in the group when - escalation is paused. + Check start_escalation_if_needed is called for the first alert in the group and for the second alert in the group + when escalation is paused. """ organization = make_organization() escalation_chain = make_escalation_chain(organization) @@ -184,31 +191,37 @@ def test_distribute_alert_escalate_alert_group_when_escalation_paused( ) alert_receive_channel = make_alert_receive_channel(organization) channel_filter = make_channel_filter(alert_receive_channel, escalation_chain=escalation_chain) - alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter) - # Check escalate_alert_group is called for the first alert in the group - alert_1 = make_alert( - alert_group=alert_group, - is_the_first_alert_in_group=True, + # Check start_escalation_if_needed is called for the first alert in the group + Alert.create( + title="the title", + message="the message", + integration_unique_data={}, + image_url=None, + link_to_upstream_details=None, raw_request_data=alert_receive_channel.config.example_payload, + alert_receive_channel=alert_receive_channel, + channel_filter=channel_filter, ) - with patch.object(escalate_alert_group, "apply_async") as mock_escalate_alert_group_1: - distribute_alert(alert_1.pk) - mock_escalate_alert_group_1.assert_called_once() + mock_start_escalation.assert_called_once() + mock_start_escalation.reset_mock() - # Check escalate_alert_group is called for the second alert in the group when escalation is paused - alert_2 = make_alert( - alert_group=alert_group, - is_the_first_alert_in_group=False, - raw_request_data=alert_receive_channel.config.example_payload, - ) + # Check start_escalation_if_needed is called for the second alert in the group when escalation is paused with patch( "apps.alerts.escalation_snapshot.escalation_snapshot_mixin.EscalationSnapshotMixin.pause_escalation", new_callable=PropertyMock(return_value=True), ): - with patch.object(escalate_alert_group, "apply_async") as mock_escalate_alert_group_2: - distribute_alert(alert_2.pk) - mock_escalate_alert_group_2.assert_called_once() + Alert.create( + title="the title", + message="the message", + integration_unique_data={}, + image_url=None, + link_to_upstream_details=None, + raw_request_data=alert_receive_channel.config.example_payload, + alert_receive_channel=alert_receive_channel, + channel_filter=channel_filter, + ) + mock_start_escalation.assert_called_once() @pytest.mark.django_db diff --git a/engine/apps/webhooks/listeners.py b/engine/apps/webhooks/listeners.py index b74ba258..f4e8385d 100644 --- a/engine/apps/webhooks/listeners.py +++ b/engine/apps/webhooks/listeners.py @@ -24,9 +24,9 @@ def on_action_triggered(**kwargs): if not isinstance(log_record, AlertGroupLogRecord): try: log_record = AlertGroupLogRecord.objects.get(pk=log_record) - except AlertGroupLogRecord.DoesNotExist as e: + except AlertGroupLogRecord.DoesNotExist: logger.warning(f"Webhook action triggered: log record {log_record} never created or has been deleted") - raise e + return # keep track if this status change was triggered by a backsync event is_backsync = log_record.action_source == ActionSource.BACKSYNC diff --git a/engine/settings/celery_task_routes.py b/engine/settings/celery_task_routes.py index ec28312c..7b154fea 100644 --- a/engine/settings/celery_task_routes.py +++ b/engine/settings/celery_task_routes.py @@ -90,7 +90,6 @@ CELERY_TASK_ROUTES = { # CRITICAL "apps.alerts.tasks.acknowledge_reminder.acknowledge_reminder_task": {"queue": "critical"}, "apps.alerts.tasks.acknowledge_reminder.unacknowledge_timeout_task": {"queue": "critical"}, - "apps.alerts.tasks.distribute_alert.distribute_alert": {"queue": "critical"}, "apps.alerts.tasks.distribute_alert.send_alert_create_signal": {"queue": "critical"}, "apps.alerts.tasks.escalate_alert_group.escalate_alert_group": {"queue": "critical"}, "apps.alerts.tasks.invite_user_to_join_incident.invite_user_to_join_incident": {"queue": "critical"},