Handle alert group deleted when task is already queued (#4230)
# What this PR does - Since send_alert_create_signal is inside transaction on_commit we can conclude that if it does not exist it was intentionally deleted before the task could run and the task can exit instead of retrying - Improve logging when send_alert_create_signal is called so both alert and alert group are in the same line so you don't need to search the logs as much - Improve logging on public api delete alert group so we can know what the alert group belonged to and the responsible user/org - Remove distribute_alerts (Stopped using a while back, code should be safe to remove now, no tasks running in system) ## Which issue(s) this PR closes Closes https://github.com/grafana/oncall-private/issues/2640 <!-- *Note*: if you have more than one GitHub issue that this PR closes, be sure to preface each issue link with a [closing keyword](https://docs.github.com/en/get-started/writing-on-github/working-with-advanced-formatting/using-keywords-in-issues-and-pull-requests#linking-a-pull-request-to-an-issue). This ensures that the issue(s) are auto-closed once the PR has been merged. --> ## Checklist - [x] Unit, integration, and e2e (if applicable) tests updated - [x] Documentation added (or `pr:no public docs` PR label added if not required) - [x] Added the relevant release notes label (see labels prefixed w/ `release:`). These labels dictate how your PR will show up in the autogenerated release notes.
This commit is contained in:
parent
a30f4d9527
commit
d75590b943
7 changed files with 61 additions and 82 deletions
|
|
@ -136,7 +136,7 @@ class Alert(models.Model):
|
|||
is_the_first_alert_in_group=group_created,
|
||||
)
|
||||
alert.save()
|
||||
logger.debug(f"alert {alert.pk} created")
|
||||
logger.debug(f"alert {alert.pk} created for alert group {group.pk}")
|
||||
|
||||
transaction.on_commit(partial(send_alert_create_signal.apply_async, (alert.pk,)))
|
||||
|
||||
|
|
|
|||
|
|
@ -1285,6 +1285,7 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
|
|||
|
||||
logger.debug(
|
||||
f"send alert_group_action_triggered_signal for alert_group {self.pk}, "
|
||||
f"in channel {self.channel.pk}, in org {self.channel.organization.pk}, by user {user.pk}, "
|
||||
f"log record {log_record.pk} with type '{log_record.get_type_display()}', "
|
||||
f"action source: delete"
|
||||
)
|
||||
|
|
|
|||
|
|
@ -8,7 +8,6 @@ from .custom_webhook_result import custom_webhook_result # noqa: F401
|
|||
from .delete_alert_group import delete_alert_group # noqa: F401
|
||||
from .delete_alert_group import finish_delete_alert_group # noqa: F401
|
||||
from .delete_alert_group import send_alert_group_signal_for_delete # noqa: F401
|
||||
from .distribute_alert import distribute_alert # noqa: F401
|
||||
from .escalate_alert_group import escalate_alert_group # noqa: F401
|
||||
from .invite_user_to_join_incident import invite_user_to_join_incident # noqa: F401
|
||||
from .maintenance import disable_maintenance # noqa: F401
|
||||
|
|
|
|||
|
|
@ -1,48 +1,11 @@
|
|||
from django.conf import settings
|
||||
|
||||
from apps.alerts.constants import TASK_DELAY_SECONDS
|
||||
from apps.alerts.signals import alert_create_signal, alert_group_escalation_snapshot_built
|
||||
from apps.alerts.signals import alert_create_signal
|
||||
from common.custom_celery_tasks import shared_dedicated_queue_retry_task
|
||||
|
||||
from .task_logger import task_logger
|
||||
|
||||
|
||||
@shared_dedicated_queue_retry_task(
|
||||
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None, default_retry_delay=60
|
||||
)
|
||||
def distribute_alert(alert_id):
|
||||
"""
|
||||
We need this task to make task processing async and to make sure the task is delivered.
|
||||
This task is not used anymore, but we keep it for the tasks in the queue to be processed.
|
||||
TODO: remove this task after all the tasks in the queue are processed.
|
||||
"""
|
||||
from apps.alerts.models import Alert
|
||||
|
||||
alert = Alert.objects.get(pk=alert_id)
|
||||
task_logger.debug(f"Start distribute_alert for alert {alert_id} from alert_group {alert.group_id}")
|
||||
|
||||
send_alert_create_signal.apply_async((alert_id,))
|
||||
|
||||
# Launch escalation for the group if it's the first alert, or if the group is paused.
|
||||
# "paused" means that the current escalation step is "Continue escalation if >X alerts per Y minutes" and there are
|
||||
# not enough alerts to trigger the escalation further. Launching escalation for a paused group will re-evaluate
|
||||
# the threshold and advance the escalation if needed, or go back to the same "paused" state if the threshold is
|
||||
# still not reached.
|
||||
if alert.is_the_first_alert_in_group or alert.group.pause_escalation:
|
||||
alert.group.start_escalation_if_needed(countdown=TASK_DELAY_SECONDS)
|
||||
|
||||
if alert.is_the_first_alert_in_group:
|
||||
alert_group_escalation_snapshot_built.send(sender=distribute_alert, alert_group=alert.group)
|
||||
|
||||
updated_rows = Alert.objects.filter(pk=alert_id, delivered=True).update(delivered=True)
|
||||
if updated_rows != 1:
|
||||
task_logger.critical(
|
||||
f"Tried to mark alert {alert_id} as delivered but it's already marked as delivered. Possible concurrency issue."
|
||||
)
|
||||
|
||||
task_logger.debug(f"Finish distribute_alert for alert {alert_id} from alert_group {alert.group_id}")
|
||||
|
||||
|
||||
@shared_dedicated_queue_retry_task(
|
||||
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
|
||||
)
|
||||
|
|
@ -50,7 +13,11 @@ def send_alert_create_signal(alert_id):
|
|||
from apps.alerts.models import Alert, AlertReceiveChannel
|
||||
|
||||
task_logger.debug(f"Started send_alert_create_signal task for alert {alert_id}")
|
||||
alert = Alert.objects.get(pk=alert_id)
|
||||
try:
|
||||
alert = Alert.objects.get(pk=alert_id)
|
||||
except Alert.DoesNotExist:
|
||||
task_logger.info(f"Alert {alert_id} does not exist, likely parent alert group was deleted")
|
||||
return
|
||||
|
||||
if alert.group.channel.maintenance_mode != AlertReceiveChannel.MAINTENANCE:
|
||||
alert_create_signal.send(
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@ import pytest
|
|||
from django.utils import timezone
|
||||
|
||||
from apps.alerts.models import Alert, ChannelFilter, EscalationPolicy
|
||||
from apps.alerts.tasks import distribute_alert, escalate_alert_group
|
||||
from common.jinja_templater.apply_jinja_template import JinjaTemplateError, JinjaTemplateWarning
|
||||
|
||||
|
||||
|
|
@ -118,18 +117,19 @@ def test_alert_create_track_received_at_timestamp(make_organization, make_alert_
|
|||
assert alert_group.received_at == now
|
||||
|
||||
|
||||
@patch("apps.alerts.models.AlertGroup.start_escalation_if_needed", return_value=None)
|
||||
@pytest.mark.django_db
|
||||
def test_distribute_alert_escalate_alert_group(
|
||||
mock_start_escalation,
|
||||
make_organization,
|
||||
make_alert_receive_channel,
|
||||
make_channel_filter,
|
||||
make_alert_group,
|
||||
make_alert,
|
||||
make_escalation_chain,
|
||||
make_escalation_policy,
|
||||
):
|
||||
"""
|
||||
Check escalate_alert_group is called for the first alert in the group and not called for the second alert in the group.
|
||||
Check start_escalation_if_needed is called for the first alert in the group and not called for the second alert in the group.
|
||||
"""
|
||||
organization = make_organization()
|
||||
escalation_chain = make_escalation_chain(organization)
|
||||
|
|
@ -139,42 +139,49 @@ def test_distribute_alert_escalate_alert_group(
|
|||
)
|
||||
alert_receive_channel = make_alert_receive_channel(organization)
|
||||
channel_filter = make_channel_filter(alert_receive_channel, escalation_chain=escalation_chain)
|
||||
alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter)
|
||||
|
||||
# Check escalate_alert_group is called for the first alert in the group
|
||||
alert_1 = make_alert(
|
||||
alert_group=alert_group,
|
||||
is_the_first_alert_in_group=True,
|
||||
# Check start_escalation_if_needed is called for the first alert in the group
|
||||
Alert.create(
|
||||
title="the title",
|
||||
message="the message",
|
||||
integration_unique_data={},
|
||||
image_url=None,
|
||||
link_to_upstream_details=None,
|
||||
raw_request_data=alert_receive_channel.config.example_payload,
|
||||
alert_receive_channel=alert_receive_channel,
|
||||
channel_filter=channel_filter,
|
||||
)
|
||||
with patch.object(escalate_alert_group, "apply_async") as mock_escalate_alert_group_1:
|
||||
distribute_alert(alert_1.pk)
|
||||
mock_escalate_alert_group_1.assert_called_once()
|
||||
mock_start_escalation.assert_called_once()
|
||||
mock_start_escalation.reset_mock()
|
||||
|
||||
# Check escalate_alert_group is not called for the second alert in the group
|
||||
alert_2 = make_alert(
|
||||
alert_group=alert_group,
|
||||
is_the_first_alert_in_group=False,
|
||||
# Check start_escalation_if_needed is not called for the second alert in the group
|
||||
Alert.create(
|
||||
title="the title",
|
||||
message="the message",
|
||||
integration_unique_data={},
|
||||
image_url=None,
|
||||
link_to_upstream_details=None,
|
||||
raw_request_data=alert_receive_channel.config.example_payload,
|
||||
alert_receive_channel=alert_receive_channel,
|
||||
channel_filter=channel_filter,
|
||||
)
|
||||
with patch.object(escalate_alert_group, "apply_async") as mock_escalate_alert_group_2:
|
||||
distribute_alert(alert_2.pk)
|
||||
mock_escalate_alert_group_2.assert_not_called()
|
||||
mock_start_escalation.assert_not_called()
|
||||
|
||||
|
||||
@patch("apps.alerts.models.AlertGroup.start_escalation_if_needed", return_value=None)
|
||||
@pytest.mark.django_db
|
||||
def test_distribute_alert_escalate_alert_group_when_escalation_paused(
|
||||
mock_start_escalation,
|
||||
make_organization,
|
||||
make_alert_receive_channel,
|
||||
make_channel_filter,
|
||||
make_alert_group,
|
||||
make_alert,
|
||||
make_escalation_chain,
|
||||
make_escalation_policy,
|
||||
):
|
||||
"""
|
||||
Check escalate_alert_group is called for the first alert in the group and for the second alert in the group when
|
||||
escalation is paused.
|
||||
Check start_escalation_if_needed is called for the first alert in the group and for the second alert in the group
|
||||
when escalation is paused.
|
||||
"""
|
||||
organization = make_organization()
|
||||
escalation_chain = make_escalation_chain(organization)
|
||||
|
|
@ -184,31 +191,37 @@ def test_distribute_alert_escalate_alert_group_when_escalation_paused(
|
|||
)
|
||||
alert_receive_channel = make_alert_receive_channel(organization)
|
||||
channel_filter = make_channel_filter(alert_receive_channel, escalation_chain=escalation_chain)
|
||||
alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter)
|
||||
|
||||
# Check escalate_alert_group is called for the first alert in the group
|
||||
alert_1 = make_alert(
|
||||
alert_group=alert_group,
|
||||
is_the_first_alert_in_group=True,
|
||||
# Check start_escalation_if_needed is called for the first alert in the group
|
||||
Alert.create(
|
||||
title="the title",
|
||||
message="the message",
|
||||
integration_unique_data={},
|
||||
image_url=None,
|
||||
link_to_upstream_details=None,
|
||||
raw_request_data=alert_receive_channel.config.example_payload,
|
||||
alert_receive_channel=alert_receive_channel,
|
||||
channel_filter=channel_filter,
|
||||
)
|
||||
with patch.object(escalate_alert_group, "apply_async") as mock_escalate_alert_group_1:
|
||||
distribute_alert(alert_1.pk)
|
||||
mock_escalate_alert_group_1.assert_called_once()
|
||||
mock_start_escalation.assert_called_once()
|
||||
mock_start_escalation.reset_mock()
|
||||
|
||||
# Check escalate_alert_group is called for the second alert in the group when escalation is paused
|
||||
alert_2 = make_alert(
|
||||
alert_group=alert_group,
|
||||
is_the_first_alert_in_group=False,
|
||||
raw_request_data=alert_receive_channel.config.example_payload,
|
||||
)
|
||||
# Check start_escalation_if_needed is called for the second alert in the group when escalation is paused
|
||||
with patch(
|
||||
"apps.alerts.escalation_snapshot.escalation_snapshot_mixin.EscalationSnapshotMixin.pause_escalation",
|
||||
new_callable=PropertyMock(return_value=True),
|
||||
):
|
||||
with patch.object(escalate_alert_group, "apply_async") as mock_escalate_alert_group_2:
|
||||
distribute_alert(alert_2.pk)
|
||||
mock_escalate_alert_group_2.assert_called_once()
|
||||
Alert.create(
|
||||
title="the title",
|
||||
message="the message",
|
||||
integration_unique_data={},
|
||||
image_url=None,
|
||||
link_to_upstream_details=None,
|
||||
raw_request_data=alert_receive_channel.config.example_payload,
|
||||
alert_receive_channel=alert_receive_channel,
|
||||
channel_filter=channel_filter,
|
||||
)
|
||||
mock_start_escalation.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
|
|
|
|||
|
|
@ -24,9 +24,9 @@ def on_action_triggered(**kwargs):
|
|||
if not isinstance(log_record, AlertGroupLogRecord):
|
||||
try:
|
||||
log_record = AlertGroupLogRecord.objects.get(pk=log_record)
|
||||
except AlertGroupLogRecord.DoesNotExist as e:
|
||||
except AlertGroupLogRecord.DoesNotExist:
|
||||
logger.warning(f"Webhook action triggered: log record {log_record} never created or has been deleted")
|
||||
raise e
|
||||
return
|
||||
|
||||
# keep track if this status change was triggered by a backsync event
|
||||
is_backsync = log_record.action_source == ActionSource.BACKSYNC
|
||||
|
|
|
|||
|
|
@ -90,7 +90,6 @@ CELERY_TASK_ROUTES = {
|
|||
# CRITICAL
|
||||
"apps.alerts.tasks.acknowledge_reminder.acknowledge_reminder_task": {"queue": "critical"},
|
||||
"apps.alerts.tasks.acknowledge_reminder.unacknowledge_timeout_task": {"queue": "critical"},
|
||||
"apps.alerts.tasks.distribute_alert.distribute_alert": {"queue": "critical"},
|
||||
"apps.alerts.tasks.distribute_alert.send_alert_create_signal": {"queue": "critical"},
|
||||
"apps.alerts.tasks.escalate_alert_group.escalate_alert_group": {"queue": "critical"},
|
||||
"apps.alerts.tasks.invite_user_to_join_incident.invite_user_to_join_incident": {"queue": "critical"},
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue