diff --git a/CHANGELOG.md b/CHANGELOG.md index 7b197c4e..b7ca9858 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +### Added + +- Modified `check_escalation_finished_task` celery task to use read-only databases for its query, if one is defined + + make the validation logic stricter + ping a configurable heartbeat on successful completion of this task ([1266](https://github.com/grafana/oncall/pull/1266)) + ### Changed - Updated wording in some Slack messages to use 'Alert Group' instead of 'Incident' ([1565](https://github.com/grafana/oncall/pull/1565)) diff --git a/dev/.env.dev.example b/dev/.env.dev.example index 9350004d..07258b08 100644 --- a/dev/.env.dev.example +++ b/dev/.env.dev.example @@ -29,10 +29,11 @@ GRAFANA_INCIDENT_STATIC_API_KEY= GRAFANA_API_URL=http://localhost:3000 CELERY_WORKER_QUEUE="default,critical,long,slack,telegram,webhook,retry,celery" -CELERY_WORKER_CONCURRENCY=1 +CELERY_WORKER_CONCURRENCY=3 CELERY_WORKER_MAX_TASKS_PER_CHILD=100 CELERY_WORKER_SHUTDOWN_INTERVAL=65m CELERY_WORKER_BEAT_ENABLED=True +CELERY_WORKER_DEBUG_LOGS=False RABBITMQ_USERNAME=rabbitmq RABBITMQ_PASSWORD=rabbitmq diff --git a/docs/sources/open-source/_index.md b/docs/sources/open-source/_index.md index f13c3213..5d3f981d 100644 --- a/docs/sources/open-source/_index.md +++ b/docs/sources/open-source/_index.md @@ -243,7 +243,7 @@ The limit can be changed using env variables: ## Mobile application set up ->**Note**: This application is currently in beta +> **Note**: This application is currently in beta Grafana OnCall OSS users can use the mobile app to receive push notifications from OnCall. Grafana OnCall OSS relies on Grafana Cloud as on relay for push notifications. @@ -255,3 +255,29 @@ For Grafana OnCall OSS, the mobile app QR code includes an authentication token Your Grafana OnCall OSS instance should be reachable from the same network as your mobile device, preferably from the internet. For more information, see [Grafana OnCall mobile app]({{< relref "../mobile-app" >}}) + +## Alert Group Escalation Auditor + +Grafana OnCall has a periodic background task, which runs to check that all alert group escalations have finished +properly. This feature, if configured, can also ping an OnCall Webhook Integration's heartbeat URL, so that you can be +alerted, in the event that something goes wrong. + +Logs originating from the celery worker, for the `apps.alerts.tasks.check_escalation_finished.check_escalation_finished_task` +task, that reference a `AlertGroupEscalationPolicyExecutionAuditException` exception +indicate that the auditor periodic task is failing check(s) on one or more alert groups. Logs for this task which +mention `.. passed the audit checks` indicate that there were no issues with with the escalation on the audited +alert groups. + +To configure this feature as such: + +1. Create a Webhook, or Formatted Webhook, Integration type. +1. Under the "Heartbeat" tab in the Integration modal, copy the unique heartbeat URL that is shown. +1. Set the hearbeat's expected time interval to 15 minutes (see note below regarding `ALERT_GROUP_ESCALATION_AUDITOR_CELERY_TASK_HEARTBEAT_INTERVAL`) +1. Configure the integration's escalation chain as necessary +1. Populate the following env variables: + +- `ALERT_GROUP_ESCALATION_AUDITOR_CELERY_TASK_HEARTBEAT_URL` - integration's unique heartbeat URL +- `ALERT_GROUP_ESCALATION_AUDITOR_CELERY_TASK_HEARTBEAT_INTERVAL` - how often the auditor task should run. By default the + task runs every 13 minutes so we therefore recommend setting the heartbeat's expected time interval to 15 minutes. If you + would like to modify this, we recommend configuring this env variable to 1 or 2 minutes less than the value set for the + integration's heartbeat expected time interval. diff --git a/engine/apps/alerts/escalation_snapshot/escalation_snapshot_mixin.py b/engine/apps/alerts/escalation_snapshot/escalation_snapshot_mixin.py index 1f9072ce..04c1cb8a 100644 --- a/engine/apps/alerts/escalation_snapshot/escalation_snapshot_mixin.py +++ b/engine/apps/alerts/escalation_snapshot/escalation_snapshot_mixin.py @@ -1,3 +1,4 @@ +import datetime import logging from typing import Optional @@ -5,19 +6,16 @@ import pytz from celery import uuid as celery_uuid from dateutil.parser import parse from django.apps import apps -from django.utils import timezone from django.utils.functional import cached_property from rest_framework.exceptions import ValidationError -from apps.alerts.constants import NEXT_ESCALATION_DELAY from apps.alerts.escalation_snapshot.snapshot_classes import ( ChannelFilterSnapshot, EscalationChainSnapshot, EscalationPolicySnapshot, EscalationSnapshot, ) -from apps.alerts.escalation_snapshot.utils import eta_for_escalation_step_notify_if_time -from apps.alerts.tasks import calculate_escalation_finish_time, escalate_alert_group +from apps.alerts.tasks import escalate_alert_group logger = logging.getLogger(__name__) @@ -90,8 +88,7 @@ class EscalationSnapshotMixin: 'next_step_eta': '2021-10-18T10:28:28.890369Z } """ - - escalation_snapshot = None + data = {} if self.escalation_chain_exists: channel_filter = self.channel_filter @@ -104,53 +101,7 @@ class EscalationSnapshotMixin: "escalation_policies_snapshots": escalation_policies, "slack_channel_id": self.slack_channel_id, } - escalation_snapshot = EscalationSnapshot.serializer(data).data - return escalation_snapshot - - def calculate_eta_for_finish_escalation(self, escalation_started=False, start_time=None): - if not self.escalation_snapshot: - return - EscalationPolicy = apps.get_model("alerts", "EscalationPolicy") - TOLERANCE_SECONDS = 1 - TOLERANCE_TIME = timezone.timedelta(seconds=NEXT_ESCALATION_DELAY + TOLERANCE_SECONDS) - start_time = start_time or timezone.now() # start time may be different for silenced incidents - wait_summ = timezone.timedelta() - # Get next_active_escalation_policy_order using flag `escalation_started` because this calculation can be - # started in parallel with escalation task where next_active_escalation_policy_order can be changed. - # That's why we are using `escalation_started` flag here, which means, that we want count eta from the first - # step. - next_escalation_policy_order = ( - self.escalation_snapshot.next_active_escalation_policy_order if escalation_started else 0 - ) - escalation_policies = self.escalation_snapshot.escalation_policies_snapshots[next_escalation_policy_order:] - for escalation_policy in escalation_policies: - if escalation_policy.step == EscalationPolicy.STEP_WAIT: - if escalation_policy.wait_delay is not None: - wait_summ += escalation_policy.wait_delay - else: - wait_summ += EscalationPolicy.DEFAULT_WAIT_DELAY # Default wait in case it's not selected yet - elif escalation_policy.step == EscalationPolicy.STEP_NOTIFY_IF_TIME: - if escalation_policy.from_time and escalation_policy.to_time: - estimate_start_time = start_time + wait_summ - STEP_TOLERANCE = timezone.timedelta(minutes=1) - next_step_estimate_start_time = eta_for_escalation_step_notify_if_time( - escalation_policy.from_time, - escalation_policy.to_time, - estimate_start_time + STEP_TOLERANCE, - ) - wait_summ += next_step_estimate_start_time - estimate_start_time - elif escalation_policy.step == EscalationPolicy.STEP_REPEAT_ESCALATION_N_TIMES: - # the part of escalation with repeat step will be passed six times: the first time plus five repeats - wait_summ *= EscalationPolicy.MAX_TIMES_REPEAT + 1 - elif escalation_policy.step == EscalationPolicy.STEP_NOTIFY_IF_NUM_ALERTS_IN_TIME_WINDOW: - # In this case we cannot calculate finish time, so we return None - return - elif escalation_policy.step == EscalationPolicy.STEP_FINAL_RESOLVE: - break - wait_summ += TOLERANCE_TIME - - escalation_finish_time = start_time + wait_summ - return escalation_finish_time + return EscalationSnapshot.serializer(data).data @property def channel_filter_with_respect_to_escalation_snapshot(self): @@ -166,38 +117,52 @@ class EscalationSnapshotMixin: @cached_property def channel_filter_snapshot(self) -> Optional[ChannelFilterSnapshot]: - # in some cases we need only channel filter and don't want to serialize whole escalation - channel_filter_snapshot_object = None + """ + in some cases we need only channel filter and don't want to serialize whole escalation + """ escalation_snapshot = self.raw_escalation_snapshot - if escalation_snapshot is not None: - channel_filter_snapshot = ChannelFilterSnapshot.serializer().to_internal_value( - escalation_snapshot["channel_filter_snapshot"] - ) - channel_filter_snapshot_object = ChannelFilterSnapshot(**channel_filter_snapshot) - return channel_filter_snapshot_object + if not escalation_snapshot: + return None + + channel_filter_snapshot = escalation_snapshot["channel_filter_snapshot"] + if not channel_filter_snapshot: + return None + + channel_filter_snapshot = ChannelFilterSnapshot.serializer().to_internal_value(channel_filter_snapshot) + return ChannelFilterSnapshot(**channel_filter_snapshot) @cached_property def escalation_chain_snapshot(self) -> Optional[EscalationChainSnapshot]: - # in some cases we need only escalation chain and don't want to serialize whole escalation + """ + in some cases we need only escalation chain and don't want to serialize whole escalation escalation_chain_snapshot_object = None + """ escalation_snapshot = self.raw_escalation_snapshot - if escalation_snapshot is not None: - escalation_chain_snapshot = EscalationChainSnapshot.serializer().to_internal_value( - escalation_snapshot["escalation_chain_snapshot"] - ) - escalation_chain_snapshot_object = EscalationChainSnapshot(**escalation_chain_snapshot) - return escalation_chain_snapshot_object + if not escalation_snapshot: + return None + + escalation_chain_snapshot = escalation_snapshot["escalation_chain_snapshot"] + if not escalation_chain_snapshot: + return None + + escalation_chain_snapshot = EscalationChainSnapshot.serializer().to_internal_value(escalation_chain_snapshot) + return EscalationChainSnapshot(**escalation_chain_snapshot) @cached_property def escalation_snapshot(self) -> Optional[EscalationSnapshot]: - escalation_snapshot_object = None raw_escalation_snapshot = self.raw_escalation_snapshot - if raw_escalation_snapshot is not None: + if raw_escalation_snapshot: try: - escalation_snapshot_object = self._deserialize_escalation_snapshot(raw_escalation_snapshot) + return self._deserialize_escalation_snapshot(raw_escalation_snapshot) except ValidationError as e: logger.error(f"Error trying to deserialize raw escalation snapshot: {e}") - return escalation_snapshot_object + return None + + @cached_property + def has_escalation_policies_snapshots(self) -> bool: + if not self.raw_escalation_snapshot: + return False + return len(self.raw_escalation_snapshot["escalation_policies_snapshots"]) > 0 def _deserialize_escalation_snapshot(self, raw_escalation_snapshot) -> EscalationSnapshot: """ @@ -225,20 +190,34 @@ class EscalationSnapshotMixin: return escalation_snapshot_object @property - def escalation_chain_exists(self): - return not self.pause_escalation and self.channel_filter and self.channel_filter.escalation_chain + def escalation_chain_exists(self) -> bool: + if self.pause_escalation: + return False + elif not self.channel_filter: + return False + return self.channel_filter.escalation_chain is not None @property - def pause_escalation(self): - # get pause_escalation field directly to avoid serialization overhead - return self.raw_escalation_snapshot is not None and self.raw_escalation_snapshot.get("pause_escalation", False) + def pause_escalation(self) -> bool: + """ + get pause_escalation field directly to avoid serialization overhead + """ + if not self.raw_escalation_snapshot: + return False + return self.raw_escalation_snapshot.get("pause_escalation", False) @property - def next_step_eta(self): - # get next_step_eta field directly to avoid serialization overhead - raw_next_step_eta = ( - self.raw_escalation_snapshot.get("next_step_eta") if self.raw_escalation_snapshot is not None else None - ) + def next_step_eta(self) -> Optional[datetime.datetime]: + """ + get next_step_eta field directly to avoid serialization overhead + """ + if not self.raw_escalation_snapshot: + return None + + raw_next_step_eta = self.raw_escalation_snapshot.get("next_step_eta") + if not raw_next_step_eta: + return None + if raw_next_step_eta: return parse(raw_next_step_eta).replace(tzinfo=pytz.UTC) @@ -272,13 +251,10 @@ class EscalationSnapshotMixin: is_escalation_finished=False, raw_escalation_snapshot=raw_escalation_snapshot, ) - if not self.pause_escalation: - calculate_escalation_finish_time.apply_async((self.pk,), immutable=True) escalate_alert_group.apply_async((self.pk,), countdown=countdown, immutable=True, eta=eta, task_id=task_id) def stop_escalation(self): self.is_escalation_finished = True - self.estimate_escalation_finish_time = None # change active_escalation_id to prevent alert escalation self.active_escalation_id = "intentionally_stopped" - self.save(update_fields=["is_escalation_finished", "estimate_escalation_finish_time", "active_escalation_id"]) + self.save(update_fields=["is_escalation_finished", "active_escalation_id"]) diff --git a/engine/apps/alerts/escalation_snapshot/serializers/escalation_snapshot.py b/engine/apps/alerts/escalation_snapshot/serializers/escalation_snapshot.py index fbe15be0..f2b5fb4d 100644 --- a/engine/apps/alerts/escalation_snapshot/serializers/escalation_snapshot.py +++ b/engine/apps/alerts/escalation_snapshot/serializers/escalation_snapshot.py @@ -8,11 +8,11 @@ from apps.alerts.escalation_snapshot.serializers import ( class EscalationSnapshotSerializer(serializers.Serializer): - channel_filter_snapshot = ChannelFilterSnapshotSerializer() - escalation_chain_snapshot = EscalationChainSnapshotSerializer() + channel_filter_snapshot = ChannelFilterSnapshotSerializer(allow_null=True, default=None) + escalation_chain_snapshot = EscalationChainSnapshotSerializer(allow_null=True, default=None) last_active_escalation_policy_order = serializers.IntegerField(allow_null=True, default=None) - escalation_policies_snapshots = EscalationPolicySnapshotSerializer(many=True) - slack_channel_id = serializers.CharField(allow_null=True) + escalation_policies_snapshots = EscalationPolicySnapshotSerializer(many=True, default=list) + slack_channel_id = serializers.CharField(allow_null=True, default=None) pause_escalation = serializers.BooleanField(allow_null=True, default=False) next_step_eta = serializers.DateTimeField(allow_null=True, default=None) diff --git a/engine/apps/alerts/escalation_snapshot/snapshot_classes/escalation_snapshot.py b/engine/apps/alerts/escalation_snapshot/snapshot_classes/escalation_snapshot.py index d4845c57..1997408d 100644 --- a/engine/apps/alerts/escalation_snapshot/snapshot_classes/escalation_snapshot.py +++ b/engine/apps/alerts/escalation_snapshot/snapshot_classes/escalation_snapshot.py @@ -1,15 +1,23 @@ import logging -from typing import Optional +import typing from celery.utils.log import get_task_logger +from django.utils import timezone from apps.alerts.escalation_snapshot.serializers import EscalationSnapshotSerializer -from apps.alerts.escalation_snapshot.snapshot_classes.escalation_policy_snapshot import EscalationPolicySnapshot from apps.alerts.models.alert_group_log_record import AlertGroupLogRecord logger = get_task_logger(__name__) logger.setLevel(logging.DEBUG) +if typing.TYPE_CHECKING: + from apps.alerts.escalation_snapshot.snapshot_classes import ( + ChannelFilterSnapshot, + EscalationChainSnapshot, + EscalationPolicySnapshot, + ) + from apps.alerts.models import AlertGroup + class EscalationSnapshot: __slots__ = ( @@ -28,34 +36,34 @@ class EscalationSnapshot: def __init__( self, - alert_group, - channel_filter_snapshot, - escalation_chain_snapshot, - last_active_escalation_policy_order, - escalation_policies_snapshots, - slack_channel_id, - pause_escalation, - next_step_eta, + alert_group: "AlertGroup", + channel_filter_snapshot: "ChannelFilterSnapshot", + escalation_chain_snapshot: "EscalationChainSnapshot", + last_active_escalation_policy_order: int, + escalation_policies_snapshots: typing.List["EscalationPolicySnapshot"], + slack_channel_id: str, + pause_escalation: bool, + next_step_eta: typing.Optional[str], ): self.alert_group = alert_group - self.channel_filter_snapshot = channel_filter_snapshot # ChannelFilterSnapshot object - self.escalation_chain_snapshot = escalation_chain_snapshot # EscalationChainSnapshot object + self.channel_filter_snapshot = channel_filter_snapshot + self.escalation_chain_snapshot = escalation_chain_snapshot self.last_active_escalation_policy_order = last_active_escalation_policy_order - self.escalation_policies_snapshots = escalation_policies_snapshots # list of EscalationPolicySnapshot objects + self.escalation_policies_snapshots = escalation_policies_snapshots self.slack_channel_id = slack_channel_id self.pause_escalation = pause_escalation self.next_step_eta = next_step_eta self.stop_escalation = False @property - def last_active_escalation_policy_snapshot(self) -> Optional[EscalationPolicySnapshot]: + def last_active_escalation_policy_snapshot(self) -> typing.Optional["EscalationPolicySnapshot"]: order = self.last_active_escalation_policy_order if order is None: return None return self.escalation_policies_snapshots[order] @property - def next_active_escalation_policy_snapshot(self) -> Optional[EscalationPolicySnapshot]: + def next_active_escalation_policy_snapshot(self) -> typing.Optional["EscalationPolicySnapshot"]: order = self.next_active_escalation_policy_order if len(self.escalation_policies_snapshots) < order + 1: next_link = None @@ -71,6 +79,31 @@ class EscalationSnapshot: next_order = self.last_active_escalation_policy_order + 1 return next_order + @property + def executed_escalation_policy_snapshots(self) -> typing.List["EscalationPolicySnapshot"]: + """ + Returns a list of escalation policy snapshots that have already been executed, according + to the value of last_active_escalation_policy_order + """ + if self.last_active_escalation_policy_order is None: + return [] + elif self.last_active_escalation_policy_order == 0: + return [self.escalation_policies_snapshots[0]] + return self.escalation_policies_snapshots[: self.last_active_escalation_policy_order] + + def next_step_eta_is_valid(self) -> typing.Union[None, bool]: + """ + `next_step_eta` should never be less than the current time (with a 5 minute buffer provided) + as this field should be updated as the escalation policy is executed over time. If it is, this means that + an escalation policy step has been missed, or is substantially delayed + + if `next_step_eta` is `None` then `None` is returned, otherwise a boolean is returned + representing the result of the time comparision + """ + if self.next_step_eta is None: + return None + return self.next_step_eta > (timezone.now() - timezone.timedelta(minutes=5)) + def save_to_alert_group(self) -> None: self.alert_group.raw_escalation_snapshot = self.convert_to_dict() self.alert_group.save(update_fields=["raw_escalation_snapshot"]) @@ -83,7 +116,6 @@ class EscalationSnapshot: Executes actual escalation step and saves result of execution like stop_escalation param and eta, that will be used for start next escalate_alert_group task. Also updates self.last_active_escalation_policy_order if escalation step was executed. - :return: None """ escalation_policy_snapshot = self.next_active_escalation_policy_snapshot if escalation_policy_snapshot is None: diff --git a/engine/apps/alerts/incident_log_builder/incident_log_builder.py b/engine/apps/alerts/incident_log_builder/incident_log_builder.py index f3f55d58..1b204bae 100644 --- a/engine/apps/alerts/incident_log_builder/incident_log_builder.py +++ b/engine/apps/alerts/incident_log_builder/incident_log_builder.py @@ -134,7 +134,7 @@ class IncidentLogBuilder: # check if escalation snapshot wasn't saved and channel filter was deleted. # We cannot generate escalation plan in this case escalation_snapshot = self.alert_group.escalation_snapshot - if escalation_snapshot is None: + if not self.alert_group.has_escalation_policies_snapshots: return escalation_plan_dict if self.alert_group.silenced_until: diff --git a/engine/apps/alerts/models/alert_group.py b/engine/apps/alerts/models/alert_group.py index bf58d3d3..7d4c4c95 100644 --- a/engine/apps/alerts/models/alert_group.py +++ b/engine/apps/alerts/models/alert_group.py @@ -13,6 +13,7 @@ from django.db import IntegrityError, models, transaction from django.db.models import JSONField, Q, QuerySet from django.utils import timezone from django.utils.functional import cached_property +from django_deprecate_fields import deprecate_field from apps.alerts.escalation_snapshot import EscalationSnapshotMixin from apps.alerts.incident_appearance.renderers.constants import DEFAULT_BACKUP_TITLE @@ -336,7 +337,9 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. maintenance_uuid = models.CharField(max_length=100, unique=True, null=True, default=None) raw_escalation_snapshot = JSONField(null=True, default=None) - estimate_escalation_finish_time = models.DateTimeField(null=True, default=None) + + # THIS FIELD IS DEPRECATED AND SHOULD EVENTUALLY BE REMOVED + estimate_escalation_finish_time = deprecate_field(models.DateTimeField(null=True, default=None)) # This field is used for constraints so we can use get_or_create() in concurrent calls # https://docs.djangoproject.com/en/3.2/ref/models/querysets/#get-or-create @@ -1464,14 +1467,7 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. def start_unsilence_task(self, countdown): task_id = celery_uuid() self.unsilence_task_uuid = task_id - - # recalculate finish escalation time - escalation_start_time = timezone.now() + timezone.timedelta(seconds=countdown) - self.estimate_escalation_finish_time = self.calculate_eta_for_finish_escalation( - start_time=escalation_start_time - ) - - self.save(update_fields=["unsilence_task_uuid", "estimate_escalation_finish_time"]) + self.save(update_fields=["unsilence_task_uuid"]) unsilence_task.apply_async((self.pk,), task_id=task_id, countdown=countdown) @property diff --git a/engine/apps/alerts/tasks/__init__.py b/engine/apps/alerts/tasks/__init__.py index 0ccc5b70..70146f9c 100644 --- a/engine/apps/alerts/tasks/__init__.py +++ b/engine/apps/alerts/tasks/__init__.py @@ -3,7 +3,6 @@ from .alert_group_web_title_cache import ( # noqa:F401 update_web_title_cache, update_web_title_cache_for_alert_receive_channel, ) -from .calculcate_escalation_finish_time import calculate_escalation_finish_time # noqa from .call_ack_url import call_ack_url # noqa: F401 from .check_escalation_finished import check_escalation_finished_task # noqa: F401 from .create_contact_points_for_datasource import create_contact_points_for_datasource # noqa: F401 diff --git a/engine/apps/alerts/tasks/calculcate_escalation_finish_time.py b/engine/apps/alerts/tasks/calculcate_escalation_finish_time.py deleted file mode 100644 index ef1483c1..00000000 --- a/engine/apps/alerts/tasks/calculcate_escalation_finish_time.py +++ /dev/null @@ -1,15 +0,0 @@ -from django.apps import apps -from django.conf import settings - -from common.custom_celery_tasks import shared_dedicated_queue_retry_task - - -@shared_dedicated_queue_retry_task( - autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None -) -def calculate_escalation_finish_time(alert_group_pk): - AlertGroup = apps.get_model("alerts", "AlertGroup") - alert_group = AlertGroup.all_objects.filter(pk=alert_group_pk)[0] - if alert_group.escalation_snapshot: - alert_group.estimate_escalation_finish_time = alert_group.calculate_eta_for_finish_escalation() - alert_group.save(update_fields=["estimate_escalation_finish_time"]) diff --git a/engine/apps/alerts/tasks/check_escalation_finished.py b/engine/apps/alerts/tasks/check_escalation_finished.py index 64f44d3d..c382aa5f 100644 --- a/engine/apps/alerts/tasks/check_escalation_finished.py +++ b/engine/apps/alerts/tasks/check_escalation_finished.py @@ -1,48 +1,148 @@ +import datetime +import typing + +import requests +from celery import shared_task from django.apps import apps from django.conf import settings from django.db.models import Q from django.utils import timezone from apps.alerts.tasks.task_logger import task_logger -from common.custom_celery_tasks import shared_dedicated_queue_retry_task +from common.database import get_random_readonly_database_key_if_present_otherwise_default + +if typing.TYPE_CHECKING: + from apps.alerts.models.alert_group import AlertGroup -@shared_dedicated_queue_retry_task( - autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None, default_retry_delay=60 -) +class AlertGroupEscalationPolicyExecutionAuditException(BaseException): + """This exception is raised when an alert group's escalation policy did not execute execute properly for some reason""" + + +def send_alert_group_escalation_auditor_task_heartbeat() -> None: + heartbeat_url = settings.ALERT_GROUP_ESCALATION_AUDITOR_CELERY_TASK_HEARTBEAT_URL + if heartbeat_url: + task_logger.info(f"Sending heartbeat to configured URL: {heartbeat_url}") + requests.get(heartbeat_url).raise_for_status() + task_logger.info(f"Heartbeat successfully sent to {heartbeat_url}") + else: + task_logger.info(f"Skipping sending heartbeat as no heartbeat URL is configured") + + +def audit_alert_group_escalation(alert_group: "AlertGroup") -> None: + escalation_snapshot = alert_group.escalation_snapshot + alert_group_id = alert_group.id + base_msg = f"Alert group {alert_group_id}" + + if not escalation_snapshot: + raise AlertGroupEscalationPolicyExecutionAuditException( + f"{base_msg} does not have an escalation snapshot associated with it, this should never occur" + ) + task_logger.info(f"{base_msg} has an escalation snapshot associated with it, auditing if it executed properly") + + escalation_policies_snapshots = escalation_snapshot.escalation_policies_snapshots + + if not escalation_policies_snapshots: + task_logger.info( + f"{base_msg}'s escalation snapshot has an empty escalation_policies_snapshots, skipping further validation" + ) + return + task_logger.info( + f"{base_msg}'s escalation snapshot has a populated escalation_policies_snapshots, continuing validation" + ) + + if escalation_snapshot.next_step_eta_is_valid() is False: + raise AlertGroupEscalationPolicyExecutionAuditException( + f"{base_msg}'s escalation snapshot does not have a valid next_step_eta: {escalation_snapshot.next_step_eta}" + ) + task_logger.info(f"{base_msg}'s escalation snapshot has a valid next_step_eta: {escalation_snapshot.next_step_eta}") + + executed_escalation_policy_snapshots = escalation_snapshot.executed_escalation_policy_snapshots + num_of_executed_escalation_policy_snapshots = len(executed_escalation_policy_snapshots) + + if num_of_executed_escalation_policy_snapshots == 0: + task_logger.info( + f"{base_msg}'s escalation snapshot does not have any executed escalation policies, skipping further validation" + ) + else: + task_logger.info( + f"{base_msg}'s escalation snapshot has {num_of_executed_escalation_policy_snapshots} executed escalation policies" + ) + + # TODO: consider adding the below checks later on. This is it a bit trickier to properly audit as the + # number of log records can vary if there are any STEP_NOTIFY_IF_NUM_ALERTS_IN_TIME_WINDOW or + # STEP_REPEAT_ESCALATION_N_TIMES escalation policy steps in the escalation chain + # see conversations in the original PR (https://github.com/grafana/oncall/pull/1266) for more context on this + # + # compare number of triggered/failed alert group log records to the number of executed + # escalation policy snapshot steps + # num_of_relevant_log_records = AlertGroupLogRecord.objects.filter( + # alert_group_id=alert_group_id, + # type__in=[AlertGroupLogRecord.TYPE_ESCALATION_TRIGGERED, AlertGroupLogRecord.TYPE_ESCALATION_FAILED], + # ).count() + + # if num_of_relevant_log_records < num_of_executed_escalation_policy_snapshots: + # raise AlertGroupEscalationPolicyExecutionAuditException( + # f"{base_msg}'s number of triggered/failed alert group log records ({num_of_relevant_log_records}) is less " + # f"than the number of executed escalation policy snapshot steps ({num_of_executed_escalation_policy_snapshots})" + # ) + + # task_logger.info( + # f"{base_msg}'s number of triggered/failed alert group log records ({num_of_relevant_log_records}) is greater " + # f"than or equal to the number of executed escalation policy snapshot steps ({num_of_executed_escalation_policy_snapshots})" + # ) + + task_logger.info(f"{base_msg} passed the audit checks") + + +def get_auditable_alert_groups_started_at_range() -> typing.Tuple[datetime.datetime, datetime.datetime]: + """ + NOTE: this started_at__range is a bit of a hack.. + we wanted to avoid performing a migration on the alerts_alertgroup table to update + alert groups where raw_escalation_snapshot was None. raw_escalation_snapshot being None is a legitimate case, + where the alert group's integration does not have an escalation chain associated with it. + + However, we wanted a way to be able to differentiate between "actually None" and "there was an error writing to + raw_escalation_snapshot" (as this is performed async by a celery task). + + This field was updated, in the commit that added this comment, to no longer be set to None by default. + As part of this celery task we do a check that this field is in fact not None, so if we were to check older + alert groups, whose integration did not have an escalation chain at the time the alert group was created + we would raise errors + """ + return (datetime.datetime(2023, 3, 25), timezone.now() - timezone.timedelta(days=2)) + + +# don't retry this task as the AlertGroup DB query is rather expensive +@shared_task def check_escalation_finished_task(): - """ - This task periodically checks if there are no alert groups with not finished escalations. - TODO: QA this properly, check if new type of escalations had been added - """ AlertGroup = apps.get_model("alerts", "AlertGroup") AlertReceiveChannel = apps.get_model("alerts", "AlertReceiveChannel") - CHECKING_TOLERANCE = timezone.timedelta(minutes=5) - CHECKING_TIME = timezone.now() - CHECKING_TOLERANCE - - alert_groups = AlertGroup.all_objects.filter( + alert_groups = AlertGroup.all_objects.using(get_random_readonly_database_key_if_present_otherwise_default()).filter( ~Q(channel__integration=AlertReceiveChannel.INTEGRATION_MAINTENANCE), ~Q(silenced=True, silenced_until__isnull=True), # filter silenced forever alert_groups is_escalation_finished=False, resolved=False, acknowledged=False, root_alert_group=None, - estimate_escalation_finish_time__lte=CHECKING_TIME, + started_at__range=get_auditable_alert_groups_started_at_range(), ) if not alert_groups.exists(): - return + task_logger.info("There are no alert groups to audit, everything is good :)") - exception_template = "Escalation for alert_group {} is not finished at expected time {}, now {}" + alert_group_ids_that_failed_audit: typing.List[str] = [] - now = timezone.now() - exception_text = "\n".join( - exception_template.format(alert_group.pk, alert_group.estimate_escalation_finish_time, now) - for alert_group in alert_groups - ) + for alert_group in alert_groups: + try: + audit_alert_group_escalation(alert_group) + except AlertGroupEscalationPolicyExecutionAuditException: + alert_group_ids_that_failed_audit.append(str(alert_group.id)) - ids = alert_groups.values_list("pk", flat=True) - task_logger.debug(ids) + if alert_group_ids_that_failed_audit: + raise AlertGroupEscalationPolicyExecutionAuditException( + f"The following alert group id(s) failed auditing: {', '.join(alert_group_ids_that_failed_audit)}" + ) - raise Exception(exception_text) + send_alert_group_escalation_auditor_task_heartbeat() diff --git a/engine/apps/alerts/tests/conftest.py b/engine/apps/alerts/tests/conftest.py index 2550fdef..6038a091 100644 --- a/engine/apps/alerts/tests/conftest.py +++ b/engine/apps/alerts/tests/conftest.py @@ -1,6 +1,9 @@ +import datetime + import pytest from apps.alerts.incident_appearance.templaters import AlertSlackTemplater +from apps.alerts.models import EscalationPolicy @pytest.fixture() @@ -9,3 +12,51 @@ def mock_alert_renderer_render_for(monkeypatch): return "invalid_render_for" monkeypatch.setattr(AlertSlackTemplater, "_render_for", mock_render_for) + + +@pytest.fixture() +def escalation_snapshot_test_setup( + make_organization_and_user, + make_user_for_organization, + make_alert_receive_channel, + make_channel_filter, + make_escalation_chain, + make_escalation_policy, + make_alert_group, +): + organization, user_1 = make_organization_and_user() + user_2 = make_user_for_organization(organization) + + alert_receive_channel = make_alert_receive_channel(organization) + + escalation_chain = make_escalation_chain(organization) + channel_filter = make_channel_filter( + alert_receive_channel, + escalation_chain=escalation_chain, + notification_backends={"BACKEND": {"channel_id": "abc123"}}, + ) + + notify_to_multiple_users_step = make_escalation_policy( + escalation_chain=channel_filter.escalation_chain, + escalation_policy_step=EscalationPolicy.STEP_NOTIFY_MULTIPLE_USERS, + ) + notify_to_multiple_users_step.notify_to_users_queue.set([user_1, user_2]) + wait_step = make_escalation_policy( + escalation_chain=channel_filter.escalation_chain, + escalation_policy_step=EscalationPolicy.STEP_WAIT, + wait_delay=EscalationPolicy.FIFTEEN_MINUTES, + ) + # random time for test + from_time = datetime.time(10, 30) + to_time = datetime.time(18, 45) + notify_if_time_step = make_escalation_policy( + escalation_chain=channel_filter.escalation_chain, + escalation_policy_step=EscalationPolicy.STEP_NOTIFY_IF_TIME, + from_time=from_time, + to_time=to_time, + ) + + alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter) + alert_group.raw_escalation_snapshot = alert_group.build_raw_escalation_snapshot() + alert_group.save() + return alert_group, notify_to_multiple_users_step, wait_step, notify_if_time_step diff --git a/engine/apps/alerts/tests/test_check_escalation_finished_task.py b/engine/apps/alerts/tests/test_check_escalation_finished_task.py index 2e2ce436..ad057102 100644 --- a/engine/apps/alerts/tests/test_check_escalation_finished_task.py +++ b/engine/apps/alerts/tests/test_check_escalation_finished_task.py @@ -1,45 +1,329 @@ +from unittest.mock import Mock, PropertyMock, patch + import pytest +import requests +from django.test import override_settings from django.utils import timezone -from apps.alerts.models import AlertReceiveChannel -from apps.alerts.tasks import check_escalation_finished_task +from apps.alerts.models import AlertGroup +from apps.alerts.tasks.check_escalation_finished import ( + AlertGroupEscalationPolicyExecutionAuditException, + audit_alert_group_escalation, + check_escalation_finished_task, + send_alert_group_escalation_auditor_task_heartbeat, +) + +MOCKED_HEARTBEAT_URL = "https://hello.com/lsdjjkf" + + +# def _get_relevant_log_record_type() -> int: +# return random.choice([AlertGroupLogRecord.TYPE_ESCALATION_TRIGGERED, AlertGroupLogRecord.TYPE_ESCALATION_FAILED]) + + +def test_send_alert_group_escalation_auditor_task_heartbeat_does_not_call_the_heartbeat_url_if_one_is_not_configured(): + with patch("apps.alerts.tasks.check_escalation_finished.requests") as mock_requests: + send_alert_group_escalation_auditor_task_heartbeat() + mock_requests.get.assert_not_called() + + +@override_settings(ALERT_GROUP_ESCALATION_AUDITOR_CELERY_TASK_HEARTBEAT_URL=MOCKED_HEARTBEAT_URL) +def test_send_alert_group_escalation_auditor_task_heartbeat_calls_the_heartbeat_url_if_one_is_configured(): + with patch("apps.alerts.tasks.check_escalation_finished.requests") as mock_requests: + send_alert_group_escalation_auditor_task_heartbeat() + + mock_requests.get.assert_called_once_with(MOCKED_HEARTBEAT_URL) + mock_requests.get.return_value.raise_for_status.assert_called_once_with() + + +@override_settings(ALERT_GROUP_ESCALATION_AUDITOR_CELERY_TASK_HEARTBEAT_URL=MOCKED_HEARTBEAT_URL) +def test_send_alert_group_escalation_auditor_task_heartbeat_raises_an_exception_if_the_heartbeat_url_request_fails(): + with patch("apps.alerts.tasks.check_escalation_finished.requests") as mock_requests: + mock_response = Mock() + mock_response.status_code = 500 + mock_response.raise_for_status.side_effect = requests.exceptions.HTTPError + + mock_requests.get.return_value = mock_response + + with pytest.raises(requests.exceptions.HTTPError): + send_alert_group_escalation_auditor_task_heartbeat() + + mock_requests.get.assert_called_once_with(MOCKED_HEARTBEAT_URL) + mock_requests.get.return_value.raise_for_status.assert_called_once_with() @pytest.mark.django_db -def test_check_escalation_finished_task( +def test_audit_alert_group_escalation_raises_exception_if_the_alert_group_does_not_have_an_escalation_snapshot( + escalation_snapshot_test_setup, +): + alert_group, _, _, _ = escalation_snapshot_test_setup + alert_group.escalation_snapshot = None + + with pytest.raises(AlertGroupEscalationPolicyExecutionAuditException): + audit_alert_group_escalation(alert_group) + + +@pytest.mark.django_db +def test_audit_alert_group_escalation_skips_further_validation_if_the_escalation_policies_snapshots_is_empty( + escalation_snapshot_test_setup, +): + alert_group, _, _, _ = escalation_snapshot_test_setup + + alert_group.escalation_snapshot.escalation_policies_snapshots = [] + audit_alert_group_escalation(alert_group) + + alert_group.escalation_snapshot.escalation_policies_snapshots = None + audit_alert_group_escalation(alert_group) + + +@pytest.mark.django_db +@pytest.mark.parametrize( + "next_step_eta_is_valid_return_value,raises_exception", + [ + (None, False), + (True, False), + (False, True), + ], +) +def test_audit_alert_group_escalation_next_step_eta_validation( + escalation_snapshot_test_setup, next_step_eta_is_valid_return_value, raises_exception +): + alert_group, _, _, _ = escalation_snapshot_test_setup + + with patch( + "apps.alerts.escalation_snapshot.snapshot_classes.escalation_snapshot.EscalationSnapshot.next_step_eta_is_valid" + ) as mock_next_step_eta_is_valid: + mock_next_step_eta_is_valid.return_value = next_step_eta_is_valid_return_value + + if raises_exception: + with pytest.raises(AlertGroupEscalationPolicyExecutionAuditException): + audit_alert_group_escalation(alert_group) + else: + try: + audit_alert_group_escalation(alert_group) + except AlertGroupEscalationPolicyExecutionAuditException: + pytest.fail() + + mock_next_step_eta_is_valid.assert_called_once_with() + + +@pytest.mark.django_db +def test_audit_alert_group_escalation_no_executed_escalation_policy_snapshots(escalation_snapshot_test_setup): + alert_group, _, _, _ = escalation_snapshot_test_setup + + with patch( + "apps.alerts.escalation_snapshot.snapshot_classes.escalation_snapshot.EscalationSnapshot.executed_escalation_policy_snapshots", + new_callable=PropertyMock, + ) as mock_executed_escalation_policy_snapshots: + mock_executed_escalation_policy_snapshots.return_value = [] + audit_alert_group_escalation(alert_group) + mock_executed_escalation_policy_snapshots.assert_called_once_with() + + +# # see TODO: comment in engine/apps/alerts/tasks/check_escalation_finished.py +# @pytest.mark.django_db +# def test_audit_alert_group_escalation_all_executed_escalation_policy_snapshots_have_triggered_log_records( +# escalation_snapshot_test_setup, +# make_organization_and_user, +# make_alert_group_log_record, +# ): +# _, user = make_organization_and_user() +# alert_group, _, _, _ = escalation_snapshot_test_setup +# escalation_policies_snapshots = alert_group.escalation_snapshot.escalation_policies_snapshots + +# for escalation_policy_snapshot in escalation_policies_snapshots: +# escalation_policy = EscalationPolicy.objects.get(id=escalation_policy_snapshot.id) +# log_record_type = _get_relevant_log_record_type() + +# make_alert_group_log_record(alert_group, log_record_type, user, escalation_policy=escalation_policy) + +# with patch( +# "apps.alerts.escalation_snapshot.snapshot_classes.escalation_snapshot.EscalationSnapshot.executed_escalation_policy_snapshots", +# new_callable=PropertyMock, +# ) as mock_executed_escalation_policy_snapshots: +# mock_executed_escalation_policy_snapshots.return_value = escalation_policies_snapshots +# audit_alert_group_escalation(alert_group) +# mock_executed_escalation_policy_snapshots.assert_called_once_with() + +# see TODO: comment in engine/apps/alerts/tasks/check_escalation_finished.py +# @pytest.mark.django_db +# def test_audit_alert_group_escalation_one_executed_escalation_policy_snapshot_does_not_have_a_triggered_log_record( +# escalation_snapshot_test_setup, +# make_organization_and_user, +# make_alert_group_log_record, +# ): +# _, user = make_organization_and_user() +# alert_group, _, _, _ = escalation_snapshot_test_setup +# escalation_policies_snapshots = alert_group.escalation_snapshot.escalation_policies_snapshots + +# # let's skip creating a relevant alert group log record for the first executed escalation policy +# for idx, escalation_policy_snapshot in enumerate(escalation_policies_snapshots): +# if idx != 0: +# escalation_policy = EscalationPolicy.objects.get(id=escalation_policy_snapshot.id) +# make_alert_group_log_record( +# alert_group, _get_relevant_log_record_type(), user, escalation_policy=escalation_policy +# ) + +# with patch( +# "apps.alerts.escalation_snapshot.snapshot_classes.escalation_snapshot.EscalationSnapshot.executed_escalation_policy_snapshots", +# new_callable=PropertyMock, +# ) as mock_executed_escalation_policy_snapshots: +# mock_executed_escalation_policy_snapshots.return_value = escalation_policies_snapshots + +# with pytest.raises(AlertGroupEscalationPolicyExecutionAuditException): +# audit_alert_group_escalation(alert_group) +# mock_executed_escalation_policy_snapshots.assert_called_once_with() + + +@pytest.mark.django_db +def test_check_escalation_finished_task_queries_doesnt_grab_alert_groups_outside_of_date_range( make_organization_and_user, make_alert_receive_channel, make_alert_group, ): - organization, user = make_organization_and_user() - alert_receive_channel = make_alert_receive_channel( - organization, integration=AlertReceiveChannel.INTEGRATION_GRAFANA - ) - alert_group = make_alert_group(alert_receive_channel) + organization, _ = make_organization_and_user() + alert_receive_channel = make_alert_receive_channel(organization) now = timezone.now() + two_days_ago = now - timezone.timedelta(days=2) + two_days_in_future = now + timezone.timedelta(days=2) - # we don't have escalation finish time, seems we cannot calculate it due escalation chain snapshot has uncalculated - # steps or does not exist, no exception is raised - check_escalation_finished_task() + # we can't simply pass started_at to the fixture because started_at is being "auto-set" on the Model + alert_group1 = make_alert_group(alert_receive_channel) + alert_group1.started_at = now - # it's acceptable time for finish escalation, because we have tolerance time 5 min from now, no exception is raised - alert_group.estimate_escalation_finish_time = now - alert_group.save() - check_escalation_finished_task() + alert_group2 = make_alert_group(alert_receive_channel) + alert_group2.started_at = now - timezone.timedelta(days=5) - # it is acceptable time for finish escalation, so no exception is raised - alert_group.estimate_escalation_finish_time = now + timezone.timedelta(minutes=10) - alert_group.save() - check_escalation_finished_task() + alert_group3 = make_alert_group(alert_receive_channel) + alert_group3.started_at = now + timezone.timedelta(days=5) - # escalation is not finished yet and passed more than 5 minutes after estimate time, exception is raised - alert_group.estimate_escalation_finish_time = now - timezone.timedelta(minutes=10) - alert_group.save() - with pytest.raises(Exception): - check_escalation_finished_task() + AlertGroup.all_objects.bulk_update([alert_group1, alert_group2, alert_group3], ["started_at"]) - # escalation is finished and we don't care anymore about its finish time, so no exception is raised - alert_group.is_escalation_finished = True - alert_group.save() - check_escalation_finished_task() + with patch( + "apps.alerts.tasks.check_escalation_finished.get_auditable_alert_groups_started_at_range" + ) as mocked_get_auditable_alert_groups_started_at_range: + with patch( + "apps.alerts.tasks.check_escalation_finished.audit_alert_group_escalation" + ) as mocked_audit_alert_group_escalation: + with patch( + "apps.alerts.tasks.check_escalation_finished.send_alert_group_escalation_auditor_task_heartbeat" + ) as mocked_send_alert_group_escalation_auditor_task_heartbeat: + mocked_get_auditable_alert_groups_started_at_range.return_value = (two_days_ago, two_days_in_future) + + check_escalation_finished_task() + + mocked_audit_alert_group_escalation.assert_called_once_with(alert_group1) + mocked_send_alert_group_escalation_auditor_task_heartbeat.assert_called_once_with() + + +@pytest.mark.django_db +def test_check_escalation_finished_task_calls_audit_alert_group_escalation_for_every_alert_group( + make_organization_and_user, + make_alert_receive_channel, + make_alert_group, +): + organization, _ = make_organization_and_user() + alert_receive_channel = make_alert_receive_channel(organization) + + now = timezone.now() + two_days_ago = now - timezone.timedelta(days=2) + two_days_in_future = now + timezone.timedelta(days=2) + + # we can't simply pass started_at to the fixture because started_at is being "auto-set" on the Model + alert_group1 = make_alert_group(alert_receive_channel) + alert_group1.started_at = now + + alert_group2 = make_alert_group(alert_receive_channel) + alert_group2.started_at = now + + alert_group3 = make_alert_group(alert_receive_channel) + alert_group3.started_at = now + + AlertGroup.all_objects.bulk_update([alert_group1, alert_group2, alert_group3], ["started_at"]) + + with patch( + "apps.alerts.tasks.check_escalation_finished.get_auditable_alert_groups_started_at_range" + ) as mocked_get_auditable_alert_groups_started_at_range: + with patch( + "apps.alerts.tasks.check_escalation_finished.audit_alert_group_escalation" + ) as mocked_audit_alert_group_escalation: + with patch( + "apps.alerts.tasks.check_escalation_finished.send_alert_group_escalation_auditor_task_heartbeat" + ) as mocked_send_alert_group_escalation_auditor_task_heartbeat: + mocked_get_auditable_alert_groups_started_at_range.return_value = (two_days_ago, two_days_in_future) + + check_escalation_finished_task() + + mocked_audit_alert_group_escalation.assert_any_call(alert_group1) + mocked_audit_alert_group_escalation.assert_any_call(alert_group2) + mocked_audit_alert_group_escalation.assert_any_call(alert_group3) + mocked_send_alert_group_escalation_auditor_task_heartbeat.assert_called_once_with() + + +@pytest.mark.django_db +def test_check_escalation_finished_task_simply_calls_heartbeat_when_no_alert_groups_found(): + with patch( + "apps.alerts.tasks.check_escalation_finished.audit_alert_group_escalation" + ) as mocked_audit_alert_group_escalation: + with patch( + "apps.alerts.tasks.check_escalation_finished.send_alert_group_escalation_auditor_task_heartbeat" + ) as mocked_send_alert_group_escalation_auditor_task_heartbeat: + check_escalation_finished_task() + mocked_audit_alert_group_escalation.assert_not_called() + mocked_send_alert_group_escalation_auditor_task_heartbeat.assert_called_once_with() + + +@pytest.mark.django_db +def test_check_escalation_finished_task_calls_audit_alert_group_escalation_for_every_alert_group_even_if_one_fails( + make_organization_and_user, + make_alert_receive_channel, + make_alert_group, +): + organization, _ = make_organization_and_user() + alert_receive_channel = make_alert_receive_channel(organization) + + now = timezone.now() + two_days_ago = now - timezone.timedelta(days=2) + two_days_in_future = now + timezone.timedelta(days=2) + + # we can't simply pass started_at to the fixture because started_at is being "auto-set" on the Model + alert_group1 = make_alert_group(alert_receive_channel) + alert_group1.started_at = now + + alert_group2 = make_alert_group(alert_receive_channel) + alert_group2.started_at = now + + alert_group3 = make_alert_group(alert_receive_channel) + alert_group3.started_at = now + + AlertGroup.all_objects.bulk_update([alert_group1, alert_group2, alert_group3], ["started_at"]) + + def _mocked_audit_alert_group_escalation(alert_group): + if not alert_group.id == alert_group3.id: + raise AlertGroupEscalationPolicyExecutionAuditException("asdfasdf") + + with patch( + "apps.alerts.tasks.check_escalation_finished.get_auditable_alert_groups_started_at_range" + ) as mocked_get_auditable_alert_groups_started_at_range: + with patch( + "apps.alerts.tasks.check_escalation_finished.audit_alert_group_escalation" + ) as mocked_audit_alert_group_escalation: + with patch( + "apps.alerts.tasks.check_escalation_finished.send_alert_group_escalation_auditor_task_heartbeat" + ) as mocked_send_alert_group_escalation_auditor_task_heartbeat: + mocked_get_auditable_alert_groups_started_at_range.return_value = (two_days_ago, two_days_in_future) + mocked_audit_alert_group_escalation.side_effect = _mocked_audit_alert_group_escalation + + with pytest.raises(AlertGroupEscalationPolicyExecutionAuditException) as exc: + check_escalation_finished_task() + + assert ( + str(exc.value) + == f"The following alert group id(s) failed auditing: {alert_group1.id}, {alert_group2.id}" + ) + + mocked_audit_alert_group_escalation.assert_any_call(alert_group1) + mocked_audit_alert_group_escalation.assert_any_call(alert_group2) + mocked_audit_alert_group_escalation.assert_any_call(alert_group3) + + mocked_send_alert_group_escalation_auditor_task_heartbeat.assert_not_called() diff --git a/engine/apps/alerts/tests/test_escalation_policy_snapshot.py b/engine/apps/alerts/tests/test_escalation_policy_snapshot.py index e84050b2..038a93f8 100644 --- a/engine/apps/alerts/tests/test_escalation_policy_snapshot.py +++ b/engine/apps/alerts/tests/test_escalation_policy_snapshot.py @@ -134,7 +134,7 @@ def test_escalation_step_notify_multiple_users( escalation_step_test_setup, make_escalation_policy, ): - organization, user, _, channel_filter, alert_group, reason = escalation_step_test_setup + _, user, _, channel_filter, alert_group, reason = escalation_step_test_setup notify_users_step = make_escalation_policy( escalation_chain=channel_filter.escalation_chain, @@ -386,7 +386,7 @@ def test_escalation_step_notify_if_num_alerts_in_window( ).exists() assert not mocked_execute_tasks.called - organization, user, _, channel_filter, alert_group, reason = escalation_step_test_setup + _, _, _, channel_filter, alert_group, reason = escalation_step_test_setup make_alert(alert_group=alert_group, raw_request_data={}) diff --git a/engine/apps/alerts/tests/test_escalation_snapshot.py b/engine/apps/alerts/tests/test_escalation_snapshot.py index 98aaad03..8ce55b8f 100644 --- a/engine/apps/alerts/tests/test_escalation_snapshot.py +++ b/engine/apps/alerts/tests/test_escalation_snapshot.py @@ -1,5 +1,3 @@ -import datetime - import pytest from django.utils import timezone @@ -11,54 +9,6 @@ from apps.alerts.escalation_snapshot.snapshot_classes import ( from apps.alerts.models import EscalationPolicy -@pytest.fixture() -def escalation_snapshot_test_setup( - make_organization_and_user, - make_user_for_organization, - make_alert_receive_channel, - make_channel_filter, - make_escalation_chain, - make_escalation_policy, - make_alert_group, -): - organization, user_1 = make_organization_and_user() - user_2 = make_user_for_organization(organization) - - alert_receive_channel = make_alert_receive_channel(organization) - - escalation_chain = make_escalation_chain(organization) - channel_filter = make_channel_filter( - alert_receive_channel, - escalation_chain=escalation_chain, - notification_backends={"BACKEND": {"channel_id": "abc123"}}, - ) - - notify_to_multiple_users_step = make_escalation_policy( - escalation_chain=channel_filter.escalation_chain, - escalation_policy_step=EscalationPolicy.STEP_NOTIFY_MULTIPLE_USERS, - ) - notify_to_multiple_users_step.notify_to_users_queue.set([user_1, user_2]) - wait_step = make_escalation_policy( - escalation_chain=channel_filter.escalation_chain, - escalation_policy_step=EscalationPolicy.STEP_WAIT, - wait_delay=EscalationPolicy.FIFTEEN_MINUTES, - ) - # random time for test - from_time = datetime.time(10, 30) - to_time = datetime.time(18, 45) - notify_if_time_step = make_escalation_policy( - escalation_chain=channel_filter.escalation_chain, - escalation_policy_step=EscalationPolicy.STEP_NOTIFY_IF_TIME, - from_time=from_time, - to_time=to_time, - ) - - alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter) - alert_group.raw_escalation_snapshot = alert_group.build_raw_escalation_snapshot() - alert_group.save() - return alert_group, notify_to_multiple_users_step, wait_step, notify_if_time_step - - @pytest.mark.django_db def test_raw_escalation_snapshot(escalation_snapshot_test_setup): alert_group, notify_to_multiple_users_step, wait_step, notify_if_time_step = escalation_snapshot_test_setup @@ -142,7 +92,7 @@ def test_raw_escalation_snapshot(escalation_snapshot_test_setup): @pytest.mark.django_db def test_serialized_escalation_snapshot(escalation_snapshot_test_setup): - alert_group, notify_to_multiple_users_step, wait_step, notify_if_time_step = escalation_snapshot_test_setup + alert_group, _, _, _ = escalation_snapshot_test_setup escalation_snapshot = alert_group.escalation_snapshot assert isinstance(escalation_snapshot, EscalationSnapshot) assert escalation_snapshot.channel_filter_snapshot is not None and isinstance( @@ -163,7 +113,7 @@ def test_serialized_escalation_snapshot(escalation_snapshot_test_setup): @pytest.mark.django_db def test_escalation_snapshot_with_deleted_channel_filter(escalation_snapshot_test_setup): - alert_group, notify_to_multiple_users_step, wait_step, notify_if_time_step = escalation_snapshot_test_setup + alert_group, _, _, _ = escalation_snapshot_test_setup alert_group.channel_filter.delete() escalation_snapshot = alert_group.escalation_snapshot @@ -174,7 +124,7 @@ def test_escalation_snapshot_with_deleted_channel_filter(escalation_snapshot_tes @pytest.mark.django_db def test_change_escalation_snapshot(escalation_snapshot_test_setup): - alert_group, notify_to_multiple_users_step, wait_step, notify_if_time_step = escalation_snapshot_test_setup + alert_group, _, _, _ = escalation_snapshot_test_setup new_active_order = 2 now = timezone.now() @@ -194,7 +144,7 @@ def test_change_escalation_snapshot(escalation_snapshot_test_setup): @pytest.mark.django_db def test_next_escalation_policy_snapshot(escalation_snapshot_test_setup): - alert_group, notify_to_multiple_users_step, wait_step, notify_if_time_step = escalation_snapshot_test_setup + alert_group, _, _, _ = escalation_snapshot_test_setup escalation_snapshot = alert_group.escalation_snapshot assert escalation_snapshot.last_active_escalation_policy_order is None @@ -226,3 +176,39 @@ def test_next_escalation_policy_snapshot(escalation_snapshot_test_setup): is escalation_snapshot.escalation_policies_snapshots[-1] ) assert escalation_snapshot.next_active_escalation_policy_snapshot is None + + +@pytest.mark.django_db +@pytest.mark.parametrize( + "next_step_eta,expected", + [ + (None, None), + (timezone.now() - timezone.timedelta(weeks=50), False), + (timezone.now() - timezone.timedelta(minutes=4), True), + (timezone.now() + timezone.timedelta(minutes=4), True), + ], +) +def test_next_step_eta_is_valid(escalation_snapshot_test_setup, next_step_eta, expected) -> None: + alert_group, _, _, _ = escalation_snapshot_test_setup + escalation_snapshot = alert_group.escalation_snapshot + + escalation_snapshot.next_step_eta = next_step_eta + + assert escalation_snapshot.next_step_eta_is_valid() is expected + + +@pytest.mark.django_db +def test_executed_escalation_policy_snapshots(escalation_snapshot_test_setup): + alert_group, _, _, _ = escalation_snapshot_test_setup + escalation_snapshot = alert_group.escalation_snapshot + + escalation_snapshot.last_active_escalation_policy_order = None + assert escalation_snapshot.executed_escalation_policy_snapshots == [] + + escalation_snapshot.last_active_escalation_policy_order = 0 + assert escalation_snapshot.executed_escalation_policy_snapshots == [ + escalation_snapshot.escalation_policies_snapshots[0] + ] + + escalation_snapshot.last_active_escalation_policy_order = len(escalation_snapshot.escalation_policies_snapshots) + assert escalation_snapshot.executed_escalation_policy_snapshots == escalation_snapshot.escalation_policies_snapshots diff --git a/engine/apps/alerts/tests/test_escalation_snapshot_mixin.py b/engine/apps/alerts/tests/test_escalation_snapshot_mixin.py new file mode 100644 index 00000000..4aa984e0 --- /dev/null +++ b/engine/apps/alerts/tests/test_escalation_snapshot_mixin.py @@ -0,0 +1,658 @@ +from unittest.mock import PropertyMock, patch + +import pytest +import pytz +from rest_framework.exceptions import ValidationError + +from apps.alerts.escalation_snapshot.snapshot_classes import EscalationSnapshot +from apps.alerts.models import EscalationPolicy + +MOCK_SLACK_CHANNEL_ID = "asdfljaskdf" +EMPTY_RAW_ESCALATION_SNAPSHOT = { + "channel_filter_snapshot": None, + "escalation_chain_snapshot": None, + "last_active_escalation_policy_order": None, + "escalation_policies_snapshots": [], + "slack_channel_id": None, + "pause_escalation": False, + "next_step_eta": None, +} + + +@patch("apps.alerts.models.alert_group.AlertGroup.slack_channel_id", new_callable=PropertyMock) +@pytest.mark.django_db +def test_build_raw_escalation_snapshot_escalation_chain_exists( + mock_alert_group_slack_channel_id, + make_organization_and_user, + make_alert_receive_channel, + make_channel_filter, + make_escalation_chain, + make_escalation_policy, + make_alert_group, +): + mock_alert_group_slack_channel_id.return_value = MOCK_SLACK_CHANNEL_ID + + organization, _ = make_organization_and_user() + alert_receive_channel = make_alert_receive_channel(organization) + escalation_chain = make_escalation_chain(organization=organization) + channel_filter = make_channel_filter(alert_receive_channel, escalation_chain=escalation_chain) + make_escalation_policy( + escalation_chain=channel_filter.escalation_chain, + escalation_policy_step=EscalationPolicy.STEP_WAIT, + wait_delay=EscalationPolicy.FIFTEEN_MINUTES, + ) + + alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter) + + expected_snapshot = EscalationSnapshot.serializer( + { + "channel_filter_snapshot": alert_group.channel_filter, + "escalation_chain_snapshot": alert_group.channel_filter.escalation_chain, + "escalation_policies_snapshots": alert_group.channel_filter.escalation_chain.escalation_policies.all(), + "slack_channel_id": MOCK_SLACK_CHANNEL_ID, + } + ) + + assert alert_group.build_raw_escalation_snapshot() == expected_snapshot.data + + +@patch( + "apps.alerts.escalation_snapshot.escalation_snapshot_mixin.EscalationSnapshotMixin.pause_escalation", + new_callable=PropertyMock, +) +@pytest.mark.django_db +def test_build_raw_escalation_snapshot_escalation_chain_does_not_exist_escalation_paused( + mocked_pause_escalation, + make_organization_and_user, + make_alert_receive_channel, + make_channel_filter, + make_escalation_chain, + make_alert_group, +): + mocked_pause_escalation.return_value = True + + organization, _ = make_organization_and_user() + alert_receive_channel = make_alert_receive_channel(organization) + escalation_chain = make_escalation_chain(organization=organization) + channel_filter = make_channel_filter(alert_receive_channel, escalation_chain=escalation_chain) + alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter) + + assert alert_group.build_raw_escalation_snapshot() == EMPTY_RAW_ESCALATION_SNAPSHOT + + +@pytest.mark.django_db +def test_build_raw_escalation_snapshot_escalation_chain_does_not_exist_no_channel_filter( + make_organization_and_user, + make_alert_receive_channel, + make_alert_group, +): + organization, _ = make_organization_and_user() + alert_receive_channel = make_alert_receive_channel(organization) + alert_group = make_alert_group(alert_receive_channel) + + assert alert_group.build_raw_escalation_snapshot() == EMPTY_RAW_ESCALATION_SNAPSHOT + + +@pytest.mark.django_db +def test_build_raw_escalation_snapshot_escalation_chain_does_not_exist_no_channel_filter_escalation_chain( + make_organization_and_user, + make_alert_receive_channel, + make_channel_filter, + make_alert_group, +): + organization, _ = make_organization_and_user() + alert_receive_channel = make_alert_receive_channel(organization) + channel_filter = make_channel_filter(alert_receive_channel) + alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter) + + assert alert_group.build_raw_escalation_snapshot() == EMPTY_RAW_ESCALATION_SNAPSHOT + + +@patch( + "apps.alerts.escalation_snapshot.escalation_snapshot_mixin.EscalationSnapshotMixin.channel_filter_snapshot", + new_callable=PropertyMock, +) +@pytest.mark.django_db +def test_channel_filter_with_respect_to_escalation_snapshot( + mock_channel_filter_snapshot, + make_organization_and_user, + make_alert_receive_channel, + make_channel_filter, + make_alert_group, +): + channel_filter_snapshot = "asdfasdfadsfadsf" + mock_channel_filter_snapshot.return_value = channel_filter_snapshot + + organization, _ = make_organization_and_user() + alert_receive_channel = make_alert_receive_channel(organization) + channel_filter = make_channel_filter(alert_receive_channel) + alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter) + + assert alert_group.channel_filter_with_respect_to_escalation_snapshot == channel_filter_snapshot + + +@patch( + "apps.alerts.escalation_snapshot.escalation_snapshot_mixin.EscalationSnapshotMixin.channel_filter_snapshot", + new_callable=PropertyMock, +) +@pytest.mark.django_db +def test_channel_filter_with_respect_to_escalation_snapshot_no_channel_filter_snapshot( + mock_channel_filter_snapshot, + make_organization_and_user, + make_alert_receive_channel, + make_channel_filter, + make_alert_group, +): + mock_channel_filter_snapshot.return_value = None + + organization, _ = make_organization_and_user() + alert_receive_channel = make_alert_receive_channel(organization) + channel_filter = make_channel_filter(alert_receive_channel) + alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter) + + assert alert_group.channel_filter_with_respect_to_escalation_snapshot == channel_filter + + +@patch( + "apps.alerts.escalation_snapshot.escalation_snapshot_mixin.EscalationSnapshotMixin.escalation_chain_snapshot", + new_callable=PropertyMock, +) +@pytest.mark.django_db +def test_escalation_chain_with_respect_to_escalation_snapshot( + mock_escalation_chain_snapshot, + make_organization_and_user, + make_alert_receive_channel, + make_channel_filter, + make_alert_group, +): + escalation_chain_snapshot = "asdfasdfadsfadsf" + mock_escalation_chain_snapshot.return_value = escalation_chain_snapshot + + organization, _ = make_organization_and_user() + alert_receive_channel = make_alert_receive_channel(organization) + channel_filter = make_channel_filter(alert_receive_channel) + alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter) + + assert alert_group.escalation_chain_with_respect_to_escalation_snapshot == escalation_chain_snapshot + + +@patch( + "apps.alerts.escalation_snapshot.escalation_snapshot_mixin.EscalationSnapshotMixin.escalation_chain_snapshot", + new_callable=PropertyMock, +) +@pytest.mark.django_db +def test_escalation_chain_with_respect_to_escalation_snapshot_no_escalation_chain_snapshot( + mock_escalation_chain_snapshot, + make_organization_and_user, + make_alert_receive_channel, + make_channel_filter, + make_escalation_chain, + make_alert_group, +): + mock_escalation_chain_snapshot.return_value = None + + organization, _ = make_organization_and_user() + alert_receive_channel = make_alert_receive_channel(organization) + escalation_chain = make_escalation_chain(organization=organization) + channel_filter = make_channel_filter(alert_receive_channel, escalation_chain=escalation_chain) + + alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter) + + assert alert_group.escalation_chain_with_respect_to_escalation_snapshot == escalation_chain + + alert_group = make_alert_group(alert_receive_channel) + + assert alert_group.channel_filter is None + assert alert_group.escalation_chain_with_respect_to_escalation_snapshot is None + + +@patch( + "apps.alerts.escalation_snapshot.escalation_snapshot_mixin.EscalationSnapshotMixin.escalation_chain_snapshot", + new_callable=PropertyMock, +) +@pytest.mark.django_db +def test_escalation_chain_with_respect_to_escalation_snapshot_no_escalation_chain_snapshot_and_no_channel_filter( + mock_escalation_chain_snapshot, + make_organization_and_user, + make_alert_receive_channel, + make_alert_group, +): + mock_escalation_chain_snapshot.return_value = None + + organization, _ = make_organization_and_user() + alert_receive_channel = make_alert_receive_channel(organization) + alert_group = make_alert_group(alert_receive_channel) + + assert alert_group.escalation_chain_with_respect_to_escalation_snapshot is None + + +@pytest.mark.django_db +def test_channel_filter_snapshot( + make_organization_and_user, + make_alert_receive_channel, + make_channel_filter, + make_escalation_chain, + make_escalation_policy, + make_alert_group, +): + organization, _ = make_organization_and_user() + alert_receive_channel = make_alert_receive_channel(organization) + escalation_chain = make_escalation_chain(organization=organization) + channel_filter = make_channel_filter(alert_receive_channel, escalation_chain=escalation_chain) + make_escalation_policy( + escalation_chain=channel_filter.escalation_chain, + escalation_policy_step=EscalationPolicy.STEP_WAIT, + wait_delay=EscalationPolicy.FIFTEEN_MINUTES, + ) + + alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter) + alert_group.raw_escalation_snapshot = alert_group.build_raw_escalation_snapshot() + + assert alert_group.channel_filter_snapshot.id == channel_filter.id + + +@pytest.mark.django_db +def test_channel_filter_snapshot_no_escalation_chain_exists( + make_organization_and_user, + make_alert_receive_channel, + make_channel_filter, + make_alert_group, +): + organization, _ = make_organization_and_user() + alert_receive_channel = make_alert_receive_channel(organization) + channel_filter = make_channel_filter(alert_receive_channel) + alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter) + + alert_group.raw_escalation_snapshot = alert_group.build_raw_escalation_snapshot() + + assert alert_group.raw_escalation_snapshot["channel_filter_snapshot"] is None + assert alert_group.channel_filter_snapshot is None + + +@pytest.mark.django_db +def test_channel_filter_snapshot_no_alert_group_raw_escalation_snapshot( + make_organization_and_user, + make_alert_receive_channel, + make_channel_filter, + make_alert_group, +): + organization, _ = make_organization_and_user() + alert_receive_channel = make_alert_receive_channel(organization) + channel_filter = make_channel_filter(alert_receive_channel) + alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter) + + assert alert_group.channel_filter_snapshot is None + + +@pytest.mark.django_db +def test_escalation_chain_snapshot( + make_organization_and_user, + make_alert_receive_channel, + make_channel_filter, + make_escalation_chain, + make_escalation_policy, + make_alert_group, +): + organization, _ = make_organization_and_user() + alert_receive_channel = make_alert_receive_channel(organization) + escalation_chain = make_escalation_chain(organization=organization) + channel_filter = make_channel_filter(alert_receive_channel, escalation_chain=escalation_chain) + make_escalation_policy( + escalation_chain=channel_filter.escalation_chain, + escalation_policy_step=EscalationPolicy.STEP_WAIT, + wait_delay=EscalationPolicy.FIFTEEN_MINUTES, + ) + + alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter) + alert_group.raw_escalation_snapshot = alert_group.build_raw_escalation_snapshot() + + assert alert_group.escalation_chain_snapshot.id == escalation_chain.id + + +@pytest.mark.django_db +def test_escalation_chain_snapshot_no_escalation_chain_exists( + make_organization_and_user, + make_alert_receive_channel, + make_channel_filter, + make_alert_group, +): + organization, _ = make_organization_and_user() + alert_receive_channel = make_alert_receive_channel(organization) + channel_filter = make_channel_filter(alert_receive_channel) + alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter) + + alert_group.raw_escalation_snapshot = alert_group.build_raw_escalation_snapshot() + + assert alert_group.raw_escalation_snapshot["escalation_chain_snapshot"] is None + assert alert_group.escalation_chain_snapshot is None + + +@pytest.mark.django_db +def test_escalation_chain_snapshot_no_alert_group_raw_escalation_snapshot( + make_organization_and_user, + make_alert_receive_channel, + make_channel_filter, + make_alert_group, +): + organization, _ = make_organization_and_user() + alert_receive_channel = make_alert_receive_channel(organization) + channel_filter = make_channel_filter(alert_receive_channel) + alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter) + + assert alert_group.escalation_chain_snapshot is None + + +@pytest.mark.django_db +def test_escalation_snapshot( + make_organization_and_user, + make_alert_receive_channel, + make_channel_filter, + make_alert_group, +): + organization, _ = make_organization_and_user() + alert_receive_channel = make_alert_receive_channel(organization) + channel_filter = make_channel_filter(alert_receive_channel) + alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter) + alert_group.raw_escalation_snapshot = alert_group.build_raw_escalation_snapshot() + + return_value = "asdfasdfasdf" + with patch( + "apps.alerts.escalation_snapshot.escalation_snapshot_mixin.EscalationSnapshotMixin._deserialize_escalation_snapshot", + return_value=return_value, + ): + assert alert_group.escalation_snapshot == return_value + + +@pytest.mark.django_db +def test_escalation_snapshot_validation_error( + make_organization_and_user, + make_alert_receive_channel, + make_channel_filter, + make_alert_group, +): + organization, _ = make_organization_and_user() + alert_receive_channel = make_alert_receive_channel(organization) + channel_filter = make_channel_filter(alert_receive_channel) + alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter) + alert_group.raw_escalation_snapshot = alert_group.build_raw_escalation_snapshot() + + with patch( + "apps.alerts.escalation_snapshot.escalation_snapshot_mixin.EscalationSnapshotMixin._deserialize_escalation_snapshot", + side_effect=ValidationError("asdfasdf"), + ): + assert alert_group.escalation_snapshot is None + + +@pytest.mark.django_db +def test_escalation_snapshot_no_alert_group_raw_escalation_snapshot( + make_organization_and_user, + make_alert_receive_channel, + make_channel_filter, + make_alert_group, +): + organization, _ = make_organization_and_user() + alert_receive_channel = make_alert_receive_channel(organization) + channel_filter = make_channel_filter(alert_receive_channel) + alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter) + + assert alert_group.escalation_snapshot is None + + +@pytest.mark.django_db +def test_escalation_snapshot_empty_escalation_policies_snapshot( + make_organization_and_user, + make_alert_receive_channel, + make_channel_filter, + make_alert_group, +): + organization, _ = make_organization_and_user() + alert_receive_channel = make_alert_receive_channel(organization) + channel_filter = make_channel_filter(alert_receive_channel) + alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter) + alert_group.raw_escalation_snapshot = alert_group.build_raw_escalation_snapshot() + + assert alert_group.raw_escalation_snapshot is not None + assert alert_group.has_escalation_policies_snapshots is False + + +@pytest.mark.django_db +def test_escalation_snapshot_nonempty_escalation_policies_snapshot( + make_organization_and_user, + make_alert_receive_channel, + make_channel_filter, + make_escalation_chain, + make_escalation_policy, + make_alert_group, +): + organization, _ = make_organization_and_user() + alert_receive_channel = make_alert_receive_channel(organization) + escalation_chain = make_escalation_chain(organization=organization) + channel_filter = make_channel_filter(alert_receive_channel, escalation_chain=escalation_chain) + make_escalation_policy( + escalation_chain=channel_filter.escalation_chain, + escalation_policy_step=EscalationPolicy.STEP_WAIT, + wait_delay=EscalationPolicy.FIFTEEN_MINUTES, + ) + + alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter) + alert_group.raw_escalation_snapshot = alert_group.build_raw_escalation_snapshot() + + assert alert_group.raw_escalation_snapshot is not None + assert alert_group.has_escalation_policies_snapshots is True + + +@pytest.mark.django_db +def test_has_escalation_policies_snapshots_no_alert_group_raw_escalation_snapshot( + make_organization_and_user, + make_alert_receive_channel, + make_channel_filter, + make_alert_group, +): + organization, _ = make_organization_and_user() + alert_receive_channel = make_alert_receive_channel(organization) + channel_filter = make_channel_filter(alert_receive_channel) + alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter) + + assert alert_group.raw_escalation_snapshot is None + assert alert_group.has_escalation_policies_snapshots is False + + +@patch("apps.alerts.models.alert_group.AlertGroup.slack_channel_id", new_callable=PropertyMock) +@pytest.mark.django_db +def test_deserialize_escalation_snapshot( + mock_alert_group_slack_channel_id, + make_organization_and_user, + make_alert_receive_channel, + make_channel_filter, + make_escalation_chain, + make_escalation_policy, + make_alert_group, +): + mock_alert_group_slack_channel_id.return_value = MOCK_SLACK_CHANNEL_ID + + organization, _ = make_organization_and_user() + alert_receive_channel = make_alert_receive_channel(organization) + escalation_chain = make_escalation_chain(organization=organization) + channel_filter = make_channel_filter(alert_receive_channel, escalation_chain=escalation_chain) + escalation_policy = make_escalation_policy( + escalation_chain=channel_filter.escalation_chain, + escalation_policy_step=EscalationPolicy.STEP_WAIT, + wait_delay=EscalationPolicy.FIFTEEN_MINUTES, + ) + + alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter) + alert_group.raw_escalation_snapshot = alert_group.build_raw_escalation_snapshot() + + deserialized_escalation_snapshot = alert_group._deserialize_escalation_snapshot(alert_group.raw_escalation_snapshot) + + assert deserialized_escalation_snapshot.alert_group == alert_group + assert deserialized_escalation_snapshot.channel_filter_snapshot.id == channel_filter.id + assert deserialized_escalation_snapshot.escalation_chain_snapshot.id == escalation_chain.id + assert deserialized_escalation_snapshot.last_active_escalation_policy_order is None + assert len(deserialized_escalation_snapshot.escalation_policies_snapshots) == 1 + assert deserialized_escalation_snapshot.escalation_policies_snapshots[0].id == escalation_policy.id + assert deserialized_escalation_snapshot.slack_channel_id == MOCK_SLACK_CHANNEL_ID + assert deserialized_escalation_snapshot.pause_escalation is False + assert deserialized_escalation_snapshot.next_step_eta is None + assert deserialized_escalation_snapshot.stop_escalation is False + + +@pytest.mark.django_db +def test_escalation_chain_exists( + make_organization_and_user, + make_alert_receive_channel, + make_channel_filter, + make_escalation_chain, + make_alert_group, +): + organization, _ = make_organization_and_user() + alert_receive_channel = make_alert_receive_channel(organization) + escalation_chain = make_escalation_chain(organization=organization) + channel_filter = make_channel_filter(alert_receive_channel, escalation_chain=escalation_chain) + alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter) + + assert alert_group.pause_escalation is False + assert alert_group.escalation_chain_exists is True + + +@patch( + "apps.alerts.escalation_snapshot.escalation_snapshot_mixin.EscalationSnapshotMixin.pause_escalation", + new_callable=PropertyMock, +) +@pytest.mark.django_db +def test_escalation_chain_exists_paused_escalation( + mocked_pause_escalation, + make_organization_and_user, + make_alert_receive_channel, + make_alert_group, +): + mocked_pause_escalation.return_value = True + + organization, _ = make_organization_and_user() + alert_receive_channel = make_alert_receive_channel(organization) + alert_group = make_alert_group(alert_receive_channel) + + assert alert_group.pause_escalation is True + assert alert_group.escalation_chain_exists is False + + +@pytest.mark.django_db +def test_escalation_chain_exists_no_channel_filter( + make_organization_and_user, + make_alert_receive_channel, + make_alert_group, +): + organization, _ = make_organization_and_user() + alert_receive_channel = make_alert_receive_channel(organization) + alert_group = make_alert_group(alert_receive_channel) + + assert alert_group.pause_escalation is False + assert alert_group.channel_filter is None + assert alert_group.escalation_chain_exists is False + + +@pytest.mark.django_db +def test_escalation_chain_exists_no_channel_filter_escalation_chain( + make_organization_and_user, + make_alert_receive_channel, + make_channel_filter, + make_alert_group, +): + organization, _ = make_organization_and_user() + alert_receive_channel = make_alert_receive_channel(organization) + channel_filter = make_channel_filter(alert_receive_channel) + alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter) + + assert alert_group.pause_escalation is False + assert alert_group.channel_filter == channel_filter + assert alert_group.channel_filter.escalation_chain is None + assert alert_group.escalation_chain_exists is False + + +@pytest.mark.django_db +def test_pause_escalation_no_raw_escalation_snapshot( + make_organization_and_user, + make_alert_receive_channel, + make_alert_group, +): + organization, _ = make_organization_and_user() + alert_receive_channel = make_alert_receive_channel(organization) + alert_group = make_alert_group(alert_receive_channel) + + assert alert_group.raw_escalation_snapshot is None + assert alert_group.pause_escalation is False + + +@pytest.mark.django_db +def test_pause_escalation_raw_escalation_snapshot_exists( + make_organization_and_user, + make_alert_receive_channel, + make_alert_group, +): + organization, _ = make_organization_and_user() + alert_receive_channel = make_alert_receive_channel(organization) + alert_group = make_alert_group(alert_receive_channel) + alert_group.raw_escalation_snapshot = alert_group.build_raw_escalation_snapshot() + + assert alert_group.raw_escalation_snapshot is not None + assert alert_group.raw_escalation_snapshot["pause_escalation"] is False + + alert_group.raw_escalation_snapshot["pause_escalation"] = True + + assert alert_group.pause_escalation is True + + +@pytest.mark.django_db +def test_next_step_eta_no_raw_escalation_snapshot( + make_organization_and_user, + make_alert_receive_channel, + make_alert_group, +): + organization, _ = make_organization_and_user() + alert_receive_channel = make_alert_receive_channel(organization) + alert_group = make_alert_group(alert_receive_channel) + + assert alert_group.raw_escalation_snapshot is None + assert alert_group.next_step_eta is None + + +@pytest.mark.django_db +def test_next_step_eta_no_next_step_eta( + make_organization_and_user, + make_alert_receive_channel, + make_alert_group, +): + organization, _ = make_organization_and_user() + alert_receive_channel = make_alert_receive_channel(organization) + alert_group = make_alert_group(alert_receive_channel) + alert_group.raw_escalation_snapshot = alert_group.build_raw_escalation_snapshot() + + assert alert_group.raw_escalation_snapshot is not None + assert alert_group.raw_escalation_snapshot["next_step_eta"] is None + assert alert_group.next_step_eta is None + + +@patch("apps.alerts.escalation_snapshot.escalation_snapshot_mixin.parse") +@pytest.mark.django_db +def test_next_step_eta( + mock_dateutil_parser, + make_organization_and_user, + make_alert_receive_channel, + make_alert_group, +): + mocked_raw_date = "mcvnmcvmnvc" + mocked_parsed_date = "asdfasdfaf" + mock_dateutil_parser.return_value.replace.return_value = mocked_parsed_date + + organization, _ = make_organization_and_user() + alert_receive_channel = make_alert_receive_channel(organization) + alert_group = make_alert_group(alert_receive_channel) + alert_group.raw_escalation_snapshot = alert_group.build_raw_escalation_snapshot() + alert_group.raw_escalation_snapshot["next_step_eta"] = mocked_raw_date + + assert alert_group.raw_escalation_snapshot is not None + assert alert_group.raw_escalation_snapshot["next_step_eta"] is mocked_raw_date + assert alert_group.next_step_eta == mocked_parsed_date + + mock_dateutil_parser.assert_called_once_with(mocked_raw_date) + mock_dateutil_parser.return_value.replace.assert_called_once_with(tzinfo=pytz.UTC) diff --git a/engine/common/database.py b/engine/common/database.py index dd40a39e..d2ca1929 100644 --- a/engine/common/database.py +++ b/engine/common/database.py @@ -5,11 +5,11 @@ from django.conf import settings def get_random_readonly_database_key_if_present_otherwise_default() -> str: """ - This function returns a string, representing a key in the DATABASES django settings. - If settings.READONLY_DATABASES is set, and non-empty, it randomly chooses one of the read-only databases, + This function returns a string, representing a key in the `DATABASES` django settings. + If `settings.READONLY_DATABASES` is set, and non-empty, it randomly chooses one of the read-only databases, otherwise it falls back to "default". - This is primarily intended to be used for django's QuerySet.using() function + This is primarily intended to be used for django's `QuerySet.using()` function """ using_db = "default" if hasattr(settings, "READONLY_DATABASES") and len(settings.READONLY_DATABASES) > 0: diff --git a/engine/engine/management/commands/restart_escalation.py b/engine/engine/management/commands/restart_escalation.py index 6c4c0823..8ceaf01d 100644 --- a/engine/engine/management/commands/restart_escalation.py +++ b/engine/engine/management/commands/restart_escalation.py @@ -49,9 +49,6 @@ class Command(BaseCommand): alert_group.unsilence_task_uuid = task_id escalation_start_time = max(now, alert_group.silenced_until) - alert_group.estimate_escalation_finish_time = alert_group.calculate_eta_for_finish_escalation( - start_time=escalation_start_time, - ) alert_groups_to_update.append(alert_group) tasks.append( @@ -65,9 +62,6 @@ class Command(BaseCommand): # otherwise start escalate_alert_group task else: if alert_group.escalation_snapshot: - alert_group.estimate_escalation_finish_time = alert_group.calculate_eta_for_finish_escalation( - escalation_started=True, - ) alert_group.active_escalation_id = task_id alert_groups_to_update.append(alert_group) @@ -82,7 +76,7 @@ class Command(BaseCommand): AlertGroup.all_objects.bulk_update( alert_groups_to_update, - ["estimate_escalation_finish_time", "active_escalation_id", "unsilence_task_uuid"], + ["active_escalation_id", "unsilence_task_uuid"], batch_size=5000, ) diff --git a/engine/engine/management/commands/start_celery.py b/engine/engine/management/commands/start_celery.py index e61a9430..23ced0ec 100644 --- a/engine/engine/management/commands/start_celery.py +++ b/engine/engine/management/commands/start_celery.py @@ -1,9 +1,12 @@ +import os import shlex import subprocess from django.core.management.base import BaseCommand from django.utils import autoreload +from common.utils import getenv_boolean + WORKER_ID = 0 @@ -11,8 +14,18 @@ def restart_celery(*args, **kwargs): global WORKER_ID kill_worker_cmd = "celery -A engine control shutdown" subprocess.call(shlex.split(kill_worker_cmd)) - start_worker_cmd = "celery -A engine worker -l info --concurrency=3 -Q celery,retry -n {}".format(WORKER_ID) - subprocess.call(shlex.split(start_worker_cmd)) + + queues = os.environ.get("CELERY_WORKER_QUEUE", "celery,retry") + max_tasks_per_child = os.environ.get("CELERY_WORKER_MAX_TASKS_PER_CHILD", 100) + concurrency = os.environ.get("CELERY_WORKER_CONCURRENCY", 3) + log_level = "debug" if getenv_boolean("CELERY_WORKER_DEBUG_LOGS", False) else "info" + + celery_args = f"-A engine worker -l {log_level} --concurrency={concurrency} -Q {queues} --max-tasks-per-child={max_tasks_per_child} -n {WORKER_ID}" + + if getenv_boolean("CELERY_WORKER_BEAT_ENABLED", False): + celery_args += " --beat" + + subprocess.call(shlex.split(f"celery {celery_args}")) WORKER_ID = 1 + WORKER_ID diff --git a/engine/requirements.txt b/engine/requirements.txt index 5a8a1837..bc2769f4 100644 --- a/engine/requirements.txt +++ b/engine/requirements.txt @@ -51,3 +51,4 @@ pyroscope-io==0.8.1 django-dbconn-retry==0.1.7 django-ipware==4.0.2 django-anymail==8.6 +django-deprecate-fields==0.1.1 diff --git a/engine/settings/base.py b/engine/settings/base.py index dc611872..68c36f2e 100644 --- a/engine/settings/base.py +++ b/engine/settings/base.py @@ -395,6 +395,10 @@ CELERY_MAX_TASKS_PER_CHILD = 1 CELERY_WORKER_SEND_TASK_EVENTS = True CELERY_TASK_SEND_SENT_EVENT = True +ALERT_GROUP_ESCALATION_AUDITOR_CELERY_TASK_HEARTBEAT_URL = os.getenv( + "ALERT_GROUP_ESCALATION_AUDITOR_CELERY_TASK_HEARTBEAT_URL", None +) + CELERY_BEAT_SCHEDULE = { "restore_heartbeat_tasks": { "task": "apps.heartbeat.tasks.restore_heartbeat_tasks", @@ -403,7 +407,11 @@ CELERY_BEAT_SCHEDULE = { }, "check_escalations": { "task": "apps.alerts.tasks.check_escalation_finished.check_escalation_finished_task", - "schedule": 10 * 60, + # the task should be executed a minute or two less than the integration's configured interval + # + # ex. if the integration is configured to expect a heartbeat every 15 minutes then this value should be set + # to something like 13 * 60 (every 13 minutes) + "schedule": getenv_integer("ALERT_GROUP_ESCALATION_AUDITOR_CELERY_TASK_HEARTBEAT_INTERVAL", 13 * 60), "args": (), }, "start_refresh_ical_files": {