diff --git a/engine/apps/alerts/constants.py b/engine/apps/alerts/constants.py index f5c9f196..496836ca 100644 --- a/engine/apps/alerts/constants.py +++ b/engine/apps/alerts/constants.py @@ -16,6 +16,8 @@ TASK_DELAY_SECONDS = 1 NEXT_ESCALATION_DELAY = 5 +BUNDLED_NOTIFICATION_DELAY_SECONDS = 60 * 2 # 2 min + # AlertGroup states verbal class AlertGroupState(str, Enum): diff --git a/engine/apps/alerts/migrations/0054_usernotificationbundle_bundlednotification_and_more.py b/engine/apps/alerts/migrations/0054_usernotificationbundle_bundlednotification_and_more.py new file mode 100644 index 00000000..a4d4fcc8 --- /dev/null +++ b/engine/apps/alerts/migrations/0054_usernotificationbundle_bundlednotification_and_more.py @@ -0,0 +1,45 @@ +# Generated by Django 4.2.10 on 2024-06-20 11:00 + +import apps.base.models.user_notification_policy +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ('base', '0005_drop_unused_dynamic_settings'), + ('user_management', '0022_alter_team_unique_together'), + ('alerts', '0053_channelfilter_filtering_labels'), + ] + + operations = [ + migrations.CreateModel( + name='UserNotificationBundle', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('important', models.BooleanField()), + ('notification_channel', models.PositiveSmallIntegerField(default=None, null=True, validators=[apps.base.models.user_notification_policy.validate_channel_choice])), + ('last_notified_at', models.DateTimeField(default=None, null=True)), + ('notification_task_id', models.CharField(default=None, max_length=100, null=True)), + ('eta', models.DateTimeField(default=None, null=True)), + ('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='notification_bundles', to='user_management.user')), + ], + ), + migrations.CreateModel( + name='BundledNotification', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('created_at', models.DateTimeField(auto_now_add=True)), + ('bundle_uuid', models.CharField(db_index=True, default=None, max_length=100, null=True)), + ('alert_group', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='alerts.alertgroup')), + ('alert_receive_channel', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='alerts.alertreceivechannel')), + ('notification_bundle', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='notifications', to='alerts.usernotificationbundle')), + ('notification_policy', models.ForeignKey(null=True, on_delete=django.db.models.deletion.SET_NULL, to='base.usernotificationpolicy')), + ], + ), + migrations.AddConstraint( + model_name='usernotificationbundle', + constraint=models.UniqueConstraint(fields=('user', 'important', 'notification_channel'), name='unique_user_notification_bundle'), + ), + ] diff --git a/engine/apps/alerts/models/__init__.py b/engine/apps/alerts/models/__init__.py index c537fc31..51b44158 100644 --- a/engine/apps/alerts/models/__init__.py +++ b/engine/apps/alerts/models/__init__.py @@ -15,3 +15,4 @@ from .invitation import Invitation # noqa: F401 from .maintainable_object import MaintainableObject # noqa: F401 from .resolution_note import ResolutionNote, ResolutionNoteSlackMessage # noqa: F401 from .user_has_notification import UserHasNotification # noqa: F401 +from .user_notification_bundle import BundledNotification, UserNotificationBundle # noqa: F401 diff --git a/engine/apps/alerts/models/user_has_notification.py b/engine/apps/alerts/models/user_has_notification.py index 967a93cf..1dc51d0b 100644 --- a/engine/apps/alerts/models/user_has_notification.py +++ b/engine/apps/alerts/models/user_has_notification.py @@ -16,3 +16,11 @@ class UserHasNotification(models.Model): class Meta: unique_together = ("user", "alert_group") + + def update_active_task_id(self, task_id): + """ + `active_notification_policy_id` keeps celery task_id of the next scheduled `notify_user_task` + for the current user + """ + self.active_notification_policy_id = task_id + self.save(update_fields=["active_notification_policy_id"]) diff --git a/engine/apps/alerts/models/user_notification_bundle.py b/engine/apps/alerts/models/user_notification_bundle.py new file mode 100644 index 00000000..4c793637 --- /dev/null +++ b/engine/apps/alerts/models/user_notification_bundle.py @@ -0,0 +1,87 @@ +import datetime +import typing + +from django.db import models +from django.utils import timezone + +from apps.alerts.constants import BUNDLED_NOTIFICATION_DELAY_SECONDS +from apps.base.models import UserNotificationPolicy +from apps.base.models.user_notification_policy import validate_channel_choice + +if typing.TYPE_CHECKING: + from django.db.models.manager import RelatedManager + + from apps.alerts.models import AlertGroup, AlertReceiveChannel + from apps.user_management.models import User + + +class UserNotificationBundle(models.Model): + user: "User" + notifications: "RelatedManager['BundledNotification']" + + NOTIFICATION_CHANNELS_TO_BUNDLE = [ + UserNotificationPolicy.NotificationChannel.SMS, + ] + + user = models.ForeignKey("user_management.User", on_delete=models.CASCADE, related_name="notification_bundles") + important = models.BooleanField() + notification_channel = models.PositiveSmallIntegerField( + validators=[validate_channel_choice], null=True, default=None + ) + last_notified_at = models.DateTimeField(default=None, null=True) + notification_task_id = models.CharField(max_length=100, null=True, default=None) + # estimated time of arrival for scheduled send_bundled_notification task + eta = models.DateTimeField(default=None, null=True) + + class Meta: + constraints = [ + models.UniqueConstraint( + fields=["user", "important", "notification_channel"], name="unique_user_notification_bundle" + ) + ] + + def notified_recently(self) -> bool: + return ( + timezone.now() - self.last_notified_at < timezone.timedelta(seconds=BUNDLED_NOTIFICATION_DELAY_SECONDS) + if self.last_notified_at + else False + ) + + def eta_is_valid(self) -> bool: + """ + `eta` shows eta of scheduled send_bundled_notification task and should never be less than the current time + (with a 1 minute buffer provided). + `eta` is None means that there is no scheduled task. + """ + if not self.eta or self.eta + timezone.timedelta(minutes=1) >= timezone.now(): + return True + return False + + def get_notification_eta(self) -> datetime.datetime: + last_notified = self.last_notified_at if self.last_notified_at else timezone.now() + return last_notified + timezone.timedelta(seconds=BUNDLED_NOTIFICATION_DELAY_SECONDS) + + def append_notification(self, alert_group: "AlertGroup", notification_policy: "UserNotificationPolicy"): + self.notifications.create( + alert_group=alert_group, notification_policy=notification_policy, alert_receive_channel=alert_group.channel + ) + + @classmethod + def notification_is_bundleable(cls, notification_channel): + return notification_channel in cls.NOTIFICATION_CHANNELS_TO_BUNDLE + + +class BundledNotification(models.Model): + alert_group: "AlertGroup" + alert_receive_channel: "AlertReceiveChannel" + notification_policy: typing.Optional["UserNotificationPolicy"] + notification_bundle: "UserNotificationBundle" + + alert_group = models.ForeignKey("alerts.AlertGroup", on_delete=models.CASCADE) + alert_receive_channel = models.ForeignKey("alerts.AlertReceiveChannel", on_delete=models.CASCADE) + notification_policy = models.ForeignKey("base.UserNotificationPolicy", on_delete=models.SET_NULL, null=True) + notification_bundle = models.ForeignKey( + UserNotificationBundle, on_delete=models.CASCADE, related_name="notifications" + ) + created_at = models.DateTimeField(auto_now_add=True) + bundle_uuid = models.CharField(max_length=100, null=True, default=None, db_index=True) diff --git a/engine/apps/alerts/paging.py b/engine/apps/alerts/paging.py index 6a3cadd3..b03f52d7 100644 --- a/engine/apps/alerts/paging.py +++ b/engine/apps/alerts/paging.py @@ -187,8 +187,7 @@ def unpage_user(alert_group: AlertGroup, user: User, from_user: User) -> None: user_has_notification = UserHasNotification.objects.filter( user=user, alert_group=alert_group ).select_for_update()[0] - user_has_notification.active_notification_policy_id = None - user_has_notification.save(update_fields=["active_notification_policy_id"]) + user_has_notification.update_active_task_id(task_id=None) except IndexError: return finally: diff --git a/engine/apps/alerts/tasks/notify_user.py b/engine/apps/alerts/tasks/notify_user.py index cdfd35d6..bbcec36d 100644 --- a/engine/apps/alerts/tasks/notify_user.py +++ b/engine/apps/alerts/tasks/notify_user.py @@ -1,23 +1,94 @@ -import time import typing from functools import partial +from uuid import uuid4 from celery.exceptions import Retry from django.conf import settings from django.db import transaction +from django.db.models import Count from django.utils import timezone from kombu.utils.uuid import uuid as celery_uuid from telegram.error import RetryAfter from apps.alerts.constants import NEXT_ESCALATION_DELAY -from apps.alerts.signals import user_notification_action_triggered_signal +from apps.alerts.tasks.send_update_log_report_signal import send_update_log_report_signal from apps.base.messaging import get_messaging_backend_from_id from apps.metrics_exporter.tasks import update_metrics_for_user from apps.phone_notifications.phone_backend import PhoneBackend from common.custom_celery_tasks import shared_dedicated_queue_retry_task +from .compare_escalations import compare_escalations from .task_logger import task_logger +if typing.TYPE_CHECKING: + from apps.alerts.models import AlertGroup, UserNotificationBundle + from apps.base.models import UserNotificationPolicy + from apps.user_management.models import User + + +def schedule_send_bundled_notification_task( + user_notification_bundle: "UserNotificationBundle", alert_group: "AlertGroup" +): + """Schedule a task to send bundled notifications""" + send_bundled_notification.apply_async( + (user_notification_bundle.id,), + eta=user_notification_bundle.eta, + task_id=user_notification_bundle.notification_task_id, + ) + task_logger.info( + f"Scheduled send_bundled_notification task {user_notification_bundle.notification_task_id}, " + f"user_notification_bundle: {user_notification_bundle.id}, alert_group {alert_group.id}, " + f"eta: {user_notification_bundle.eta}" + ) + + +def schedule_perform_notification_task( + log_record_pk: int, alert_group_pk: int, use_default_notification_policy_fallback: bool +): + task = perform_notification.apply_async((log_record_pk, use_default_notification_policy_fallback)) + task_logger.info( + f"Created perform_notification task {task} log_record={log_record_pk} " f"alert_group={alert_group_pk}" + ) + + +def build_notification_reason_for_log_record( + notification_policies: typing.List["UserNotificationPolicy"], reason: typing.Optional[str] +) -> str: + from apps.base.models import UserNotificationPolicy + + # Here we collect a brief overview of notification steps configured for user to send it to thread. + collected_steps_ids = [] + for next_notification_policy in notification_policies: + if next_notification_policy.step == UserNotificationPolicy.Step.NOTIFY: + if next_notification_policy.notify_by not in collected_steps_ids: + collected_steps_ids.append(next_notification_policy.notify_by) + + collected_steps = ", ".join( + UserNotificationPolicy.NotificationChannel(step_id).label for step_id in collected_steps_ids + ) + reason = ("Reason: " + reason + "\n") if reason is not None else "" + reason += ("Further notification plan: " + collected_steps) if len(collected_steps_ids) > 0 else "" + return reason + + +def update_metric_if_needed(user: "User", active_alert_group_ids: typing.List[int]): + from apps.base.models import UserNotificationPolicyLogRecord + + # get count of alert groups with only one personal log record with type "triggered" + alert_groups_with_one_log = ( + user.personal_log_records.filter( + type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED, + alert_group_id__in=active_alert_group_ids, + ) + .values("alert_group") + .annotate(count=Count("alert_group")) + .filter(count=1) + .count() + ) + + if alert_groups_with_one_log > 0: + update_metrics_for_user.apply_async((user.id, alert_groups_with_one_log)) + @shared_dedicated_queue_retry_task( autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None @@ -32,7 +103,7 @@ def notify_user_task( important=False, notify_anyway=False, ): - from apps.alerts.models import AlertGroup, UserHasNotification + from apps.alerts.models import AlertGroup, UserHasNotification, UserNotificationBundle from apps.base.models import UserNotificationPolicy, UserNotificationPolicyLogRecord from apps.user_management.models import User @@ -44,6 +115,8 @@ def notify_user_task( countdown = 0 stop_escalation = False log_record = None + is_notification_bundled = False + user_notification_bundle = None with transaction.atomic(): try: @@ -51,8 +124,6 @@ def notify_user_task( except User.DoesNotExist: return f"notify_user_task: user {user_pk} doesn't exist" - organization = alert_group.channel.organization - if not user.is_notification_allowed: task_logger.info(f"notify_user_task: user {user.pk} notification is not allowed") UserNotificationPolicyLogRecord( @@ -82,21 +153,8 @@ def notify_user_task( f"notify_user_task: Failed to notify. No notification policies. user_id={user_pk} alert_group_id={alert_group_pk} important={important}" ) return - - # Here we collect a brief overview of notification steps configured for user to send it to thread. - collected_steps_ids = [] - for next_notification_policy in notification_policies: - if next_notification_policy.step == UserNotificationPolicy.Step.NOTIFY: - if next_notification_policy.notify_by not in collected_steps_ids: - collected_steps_ids.append(next_notification_policy.notify_by) - + reason = build_notification_reason_for_log_record(notification_policies, reason) notification_policy = notification_policies[0] - - collected_steps = ", ".join( - UserNotificationPolicy.NotificationChannel(step_id).label for step_id in collected_steps_ids - ) - reason = ("Reason: " + reason + "\n") if reason is not None else "" - reason += ("Further notification plan: " + collected_steps) if len(collected_steps_ids) > 0 else "" else: if notify_user_task.request.id != user_has_notification.active_notification_policy_id: task_logger.info( @@ -108,14 +166,14 @@ def notify_user_task( try: notification_policy = UserNotificationPolicy.objects.get(pk=previous_notification_policy_pk) - if notification_policy.user.organization != organization: + if notification_policy.user != user: notification_policy = UserNotificationPolicy.objects.get( order=notification_policy.order, user=user, important=important ) notification_policy = notification_policy.next() except UserNotificationPolicy.DoesNotExist: task_logger.info( - f"notify_user_taskLNotification policy {previous_notification_policy_pk} has been deleted" + f"notify_user_task: Notification policy {previous_notification_policy_pk} has been deleted" ) return reason = None @@ -138,27 +196,25 @@ def notify_user_task( if notification_policy is None: stop_escalation = True log_record = _create_notification_finished_user_notification_policy_log_record() + task_logger.info(f"Personal escalation exceeded. User: {user.pk}, alert_group: {alert_group.pk}") else: if ( (alert_group.acknowledged and not notify_even_acknowledged) + or (alert_group.silenced and not notify_anyway) or alert_group.resolved or alert_group.wiped_at or alert_group.root_alert_group ): - return "Acknowledged, resolved, attached or wiped." - - if alert_group.silenced and not notify_anyway: task_logger.info( - f"notify_user_task: skip notification user {user.pk} because alert_group {alert_group.pk} is silenced" + f"notify_user_task: skip notification user {user.pk}, alert_group {alert_group.pk} is " + f"{alert_group.state} and/or attached or wiped" ) - return + return "Acknowledged, resolved, silenced, attached or wiped." if notification_policy.step == UserNotificationPolicy.Step.WAIT: - if notification_policy.wait_delay is not None: - delay_in_seconds = notification_policy.wait_delay.total_seconds() - else: - delay_in_seconds = 0 - countdown = delay_in_seconds + countdown = ( + notification_policy.wait_delay.total_seconds() if notification_policy.wait_delay is not None else 0 + ) log_record = _create_user_notification_policy_log_record( author=user, type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED, @@ -167,7 +223,7 @@ def notify_user_task( slack_prevent_posting=prevent_posting_to_thread, notification_step=notification_policy.step, ) - task_logger.info(f"notify_user_task: Waiting {delay_in_seconds} to notify user {user.pk}") + task_logger.info(f"notify_user_task: Waiting {countdown} to notify user {user.pk}") elif notification_policy.step == UserNotificationPolicy.Step.NOTIFY: user_to_be_notified_in_slack = ( notification_policy.notify_by == UserNotificationPolicy.NotificationChannel.SLACK @@ -185,17 +241,49 @@ def notify_user_task( notification_error_code=UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_POSTING_TO_SLACK_IS_DISABLED, ) else: - log_record = _create_user_notification_policy_log_record( - author=user, - type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED, - notification_policy=notification_policy, - alert_group=alert_group, - reason=reason, - slack_prevent_posting=prevent_posting_to_thread, - notification_step=notification_policy.step, - notification_channel=notification_policy.notify_by, - ) + if ( + settings.FEATURE_NOTIFICATION_BUNDLE_ENABLED + and UserNotificationBundle.notification_is_bundleable(notification_policy.notify_by) + ): + user_notification_bundle, _ = UserNotificationBundle.objects.select_for_update().get_or_create( + user=user, important=important, notification_channel=notification_policy.notify_by + ) + # check if notification needs to be bundled + if user_notification_bundle.notified_recently(): + user_notification_bundle.append_notification(alert_group, notification_policy) + # schedule send_bundled_notification task if it hasn't been scheduled or the task eta is + # outdated + eta_is_valid = user_notification_bundle.eta_is_valid() + if not eta_is_valid: + task_logger.warning( + f"ETA is not valid - {user_notification_bundle.eta}, " + f"user_notification_bundle {user_notification_bundle.id}, " + f"task_id {user_notification_bundle.notification_task_id}. " + f"Rescheduling the send_bundled_notification task" + ) + if not user_notification_bundle.notification_task_id or not eta_is_valid: + user_notification_bundle.notification_task_id = celery_uuid() + user_notification_bundle.eta = user_notification_bundle.get_notification_eta() + user_notification_bundle.save(update_fields=["notification_task_id", "eta"]) + transaction.on_commit( + partial( + schedule_send_bundled_notification_task, user_notification_bundle, alert_group + ) + ) + is_notification_bundled = True + + if not is_notification_bundled: + log_record = _create_user_notification_policy_log_record( + author=user, + type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED, + notification_policy=notification_policy, + alert_group=alert_group, + reason=reason, + slack_prevent_posting=prevent_posting_to_thread, + notification_step=notification_policy.step, + notification_channel=notification_policy.notify_by, + ) if log_record: # log_record is None if user notification policy step is unspecified # if this is the first notification step, and user hasn't been notified for this alert group - update metric if ( @@ -207,48 +295,41 @@ def notify_user_task( ).exists() ): update_metrics_for_user.apply_async((user.id,)) - log_record.save() - if notify_user_task.request.retries == 0: - transaction.on_commit(partial(send_user_notification_signal.apply_async, (log_record.pk,))) - - def _create_perform_notification_task(log_record_pk, alert_group_pk, use_default_notification_policy_fallback): - task = perform_notification.apply_async((log_record_pk, use_default_notification_policy_fallback)) - task_logger.info( - f"Created perform_notification task {task} log_record={log_record_pk} " f"alert_group={alert_group_pk}" - ) - - def _update_user_has_notification_active_notification_policy_id(active_policy_id: typing.Optional[str]) -> None: - user_has_notification.active_notification_policy_id = active_policy_id - user_has_notification.save(update_fields=["active_notification_policy_id"]) - - def _reset_user_has_notification_active_notification_policy_id() -> None: - _update_user_has_notification_active_notification_policy_id(None) - - create_perform_notification_task = partial( - _create_perform_notification_task, - log_record.pk, - alert_group_pk, - using_fallback_default_notification_policy_step, - ) if using_fallback_default_notification_policy_step: # if we are using default notification policy, we're done escalating.. there's no further notification # policy steps in this case. Kick off the perform_notification task, create the # TYPE_PERSONAL_NOTIFICATION_FINISHED log record, and reset the active_notification_policy_id - transaction.on_commit(create_perform_notification_task) + transaction.on_commit( + partial( + schedule_perform_notification_task, + log_record.pk, + alert_group_pk, + using_fallback_default_notification_policy_step, + ) + ) _create_notification_finished_user_notification_policy_log_record() - _reset_user_has_notification_active_notification_policy_id() + user_has_notification.update_active_task_id(None) elif not stop_escalation: - if notification_policy.step != UserNotificationPolicy.Step.WAIT: - transaction.on_commit(create_perform_notification_task) + # if the step is NOTIFY and notification was not not bundled, perform regular notification + # and update time when user was notified + if notification_policy.step != UserNotificationPolicy.Step.WAIT and not is_notification_bundled: + transaction.on_commit( + partial( + schedule_perform_notification_task, + log_record.pk, + alert_group_pk, + using_fallback_default_notification_policy_step, + ) + ) + + if user_notification_bundle: + user_notification_bundle.last_notified_at = timezone.now() + user_notification_bundle.save(update_fields=["last_notified_at"]) - delay = NEXT_ESCALATION_DELAY - if countdown is not None: - delay += countdown task_id = celery_uuid() - - _update_user_has_notification_active_notification_policy_id(task_id) + user_has_notification.update_active_task_id(task_id) transaction.on_commit( partial( @@ -259,12 +340,12 @@ def notify_user_task( "notify_anyway": notify_anyway, "prevent_posting_to_thread": prevent_posting_to_thread, }, - countdown=delay, + countdown=countdown + NEXT_ESCALATION_DELAY, task_id=task_id, ) ) else: - _reset_user_has_notification_active_notification_policy_id() + user_has_notification.update_active_task_id(None) @shared_dedicated_queue_retry_task( @@ -458,18 +539,137 @@ def perform_notification(log_record_pk, use_default_notification_policy_fallback backend.notify_user(user, alert_group, notification_policy) +@shared_dedicated_queue_retry_task( + autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None +) +def send_bundled_notification(user_notification_bundle_id: int): + """ + The task filters bundled notifications, attached to the current user_notification_bundle, by active alert groups, + creates notification log records and updates user_notification_bundle. + If there are no active alert groups - nothing else happens. If there is only one active alert group - regular + notification will be performed (called perform_notification task). Otherwise - "send bundled notification" method of + the current notification channel will be called. + """ + from apps.alerts.models import AlertGroup, UserNotificationBundle + from apps.base.models import UserNotificationPolicy, UserNotificationPolicyLogRecord + + task_logger.info(f"Start send_bundled_notification for user_notification_bundle {user_notification_bundle_id}") + with transaction.atomic(): + try: + user_notification_bundle = UserNotificationBundle.objects.filter( + pk=user_notification_bundle_id + ).select_for_update()[0] + except IndexError: + task_logger.info( + f"user_notification_bundle {user_notification_bundle_id} doesn't exist. " + f"The user associated with this notification bundle may have been deleted." + ) + return + + if not compare_escalations(send_bundled_notification.request.id, user_notification_bundle.notification_task_id): + task_logger.info( + f"send_bundled_notification: notification_task_id mismatch. " + f"Duplication or non-active notification triggered. " + f"Active: {user_notification_bundle.notification_task_id}" + ) + return + + notifications = user_notification_bundle.notifications.filter(bundle_uuid__isnull=True).select_related( + "alert_group" + ) + + log_records_to_create: typing.List["UserNotificationPolicyLogRecord"] = [] + skip_notification_ids: typing.List[int] = [] + active_alert_group_ids: typing.Set[int] = set() + log_record_notification_triggered = None + is_notification_allowed = user_notification_bundle.user.is_notification_allowed + + # create logs + for notification in notifications: + if notification.alert_group.status != AlertGroup.NEW: + task_logger.info(f"alert_group {notification.alert_group_id} is not active, skip notification") + skip_notification_ids.append(notification.id) + continue + elif not is_notification_allowed: + log_record_notification_failed = UserNotificationPolicyLogRecord( + author=user_notification_bundle.user, + type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED, + reason="notification is not allowed for user", + alert_group=notification.alert_group, + notification_error_code=UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_FORBIDDEN, + ) + log_records_to_create.append(log_record_notification_failed) + active_alert_group_ids.add(notification.alert_group_id) + continue + + # collect notifications for active alert groups + active_alert_group_ids.add(notification.alert_group_id) + + log_record_notification_triggered = UserNotificationPolicyLogRecord( + author=user_notification_bundle.user, + type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED, + alert_group=notification.alert_group, + notification_step=UserNotificationPolicy.Step.NOTIFY, + notification_channel=user_notification_bundle.notification_channel, + ) + log_records_to_create.append(log_record_notification_triggered) + + if len(log_records_to_create) == 1 and log_record_notification_triggered: + # perform regular notification + log_record_notification_triggered.save() + task_logger.info( + f"there is only one alert group in bundled notification, perform regular notification. " + f"alert_group {log_record_notification_triggered.alert_group_id}" + ) + transaction.on_commit( + partial( + schedule_perform_notification_task, + log_record_notification_triggered.pk, + log_record_notification_triggered.alert_group_id, + ) + ) + notifications.delete() + else: + UserNotificationPolicyLogRecord.objects.bulk_create(log_records_to_create, batch_size=5000) + + if not active_alert_group_ids or not is_notification_allowed: + task_logger.info( + f"no alert groups to notify about or notification is not allowed for user " + f"{user_notification_bundle.user_id}" + ) + notifications.delete() + else: + notifications.filter(id__in=skip_notification_ids).delete() + bundle_uuid = uuid4() + notifications.update(bundle_uuid=bundle_uuid) + task_logger.info( + f"perform bundled notification for alert groups with ids: {active_alert_group_ids}, " + f"bundle_uuid: {bundle_uuid}" + ) + if user_notification_bundle.notification_channel == UserNotificationPolicy.NotificationChannel.SMS: + PhoneBackend.notify_by_sms_bundle_async(user_notification_bundle.user, bundle_uuid) + + user_notification_bundle.notification_task_id = None + user_notification_bundle.last_notified_at = timezone.now() + user_notification_bundle.eta = None + user_notification_bundle.save(update_fields=["notification_task_id", "last_notified_at", "eta"]) + + for alert_group_id in active_alert_group_ids: + transaction.on_commit(partial(send_update_log_report_signal.apply_async, (None, alert_group_id))) + + # update metric + transaction.on_commit(partial(update_metric_if_needed, user_notification_bundle.user, active_alert_group_ids)) + + task_logger.info(f"Finished send_bundled_notification for user_notification_bundle {user_notification_bundle_id}") + + +# deprecated @shared_dedicated_queue_retry_task( autoretry_for=(Exception,), retry_backoff=True, max_retries=0 if settings.DEBUG else None ) def send_user_notification_signal(log_record_pk): - start_time = time.time() - - from apps.base.models import UserNotificationPolicyLogRecord - - task_logger.debug(f"LOG RECORD PK: {log_record_pk}") - task_logger.debug(f"LOG RECORD LAST: {UserNotificationPolicyLogRecord.objects.last()}") - - log_record = UserNotificationPolicyLogRecord.objects.get(pk=log_record_pk) - user_notification_action_triggered_signal.send(sender=send_user_notification_signal, log_record=log_record) - - task_logger.debug("--- USER SIGNAL TOOK %s seconds ---" % (time.time() - start_time)) + # Triggers user_notification_action_triggered_signal + # This signal is only connected to UserSlackRepresentative and triggers posting message to Slack about some + # FAILED notifications (see NotificationDeliveryStep and ERRORS_TO_SEND_IN_SLACK_CHANNEL). + # No need to call it here. + pass diff --git a/engine/apps/alerts/tests/factories.py b/engine/apps/alerts/tests/factories.py index 2b72c805..f07ef900 100644 --- a/engine/apps/alerts/tests/factories.py +++ b/engine/apps/alerts/tests/factories.py @@ -13,6 +13,7 @@ from apps.alerts.models import ( Invitation, ResolutionNote, ResolutionNoteSlackMessage, + UserNotificationBundle, ) from common.utils import UniqueFaker @@ -85,3 +86,8 @@ class CustomActionFactory(factory.DjangoModelFactory): class InvitationFactory(factory.DjangoModelFactory): class Meta: model = Invitation + + +class UserNotificationBundleFactory(factory.DjangoModelFactory): + class Meta: + model = UserNotificationBundle diff --git a/engine/apps/alerts/tests/test_notify_user.py b/engine/apps/alerts/tests/test_notify_user.py index 7dd2b812..e482f896 100644 --- a/engine/apps/alerts/tests/test_notify_user.py +++ b/engine/apps/alerts/tests/test_notify_user.py @@ -1,10 +1,11 @@ from unittest.mock import patch import pytest +from django.utils import timezone from telegram.error import RetryAfter from apps.alerts.models import AlertGroup -from apps.alerts.tasks.notify_user import notify_user_task, perform_notification +from apps.alerts.tasks.notify_user import notify_user_task, perform_notification, send_bundled_notification from apps.api.permissions import LegacyAccessControlRole from apps.base.models.user_notification_policy import UserNotificationPolicy from apps.base.models.user_notification_policy_log_record import UserNotificationPolicyLogRecord @@ -369,3 +370,207 @@ def test_perform_notification_use_default_notification_policy_fallback( perform_notification(log_record.pk, True) mock_notify_user.assert_called_once_with(user, alert_group, fallback_notification_policy) + + +@pytest.mark.django_db +def test_notify_user_task_notification_bundle_is_enabled( + make_organization_and_user, + make_user_for_organization, + make_user_notification_policy, + make_alert_receive_channel, + make_alert_group, + settings, +): + settings.FEATURE_NOTIFICATION_BUNDLE_ENABLED = True + organization, user_1 = make_organization_and_user() + user_2 = make_user_for_organization(organization) + make_user_notification_policy( + user=user_1, + step=UserNotificationPolicy.Step.NOTIFY, + notify_by=UserNotificationPolicy.NotificationChannel.SMS, # channel is in NOTIFICATION_CHANNELS_TO_BUNDLE + ) + make_user_notification_policy( + user=user_1, + step=UserNotificationPolicy.Step.NOTIFY, + notify_by=UserNotificationPolicy.NotificationChannel.SMS, + important=True, + ) + make_user_notification_policy( + user=user_2, + step=UserNotificationPolicy.Step.NOTIFY, + notify_by=UserNotificationPolicy.NotificationChannel.SLACK, # channel is not in NOTIFICATION_CHANNELS_TO_BUNDLE + ) + alert_receive_channel = make_alert_receive_channel(organization=organization) + alert_group_1 = make_alert_group(alert_receive_channel=alert_receive_channel) + alert_group_2 = make_alert_group(alert_receive_channel=alert_receive_channel) + assert not user_1.notification_bundles.exists() + # send 1st notification to user_1, check notification_bundle was created + # without scheduling send_bundled_notification task + notify_user_task(user_1.id, alert_group_1.id) + assert user_1.notification_bundles.count() == 1 + notification_bundle = user_1.notification_bundles.first() + assert notification_bundle.notification_task_id is None + assert not notification_bundle.notifications.exists() + # send 2nd notification to user_1, check bundled notification was attached to notification_bundle + # and send_bundled_notification was scheduled + notify_user_task(user_1.id, alert_group_2.id) + notification_bundle.refresh_from_db() + assert notification_bundle.notifications.count() == 1 + assert notification_bundle.notification_task_id is not None + # send important notification to user_1, check new notification_bundle was created + notify_user_task(user_1.id, alert_group_1.id, important=True) + assert user_1.notification_bundles.count() == 2 + important_notification_bundle = user_1.notification_bundles.get(important=True) + assert important_notification_bundle.notification_task_id is None + assert not important_notification_bundle.notifications.exists() + # send notification to user_2 (notification channel is not in NOTIFICATION_CHANNELS_TO_BUNDLE), + # check notification_bundle was not created + notify_user_task(user_2.id, alert_group_1.id) + assert not user_2.notification_bundles.exists() + + +@pytest.mark.django_db +def test_notify_user_task_notification_bundle_is_not_enabled( + make_organization_and_user, + make_user_notification_policy, + make_alert_receive_channel, + make_alert_group, + settings, +): + settings.FEATURE_NOTIFICATION_BUNDLE_ENABLED = False + organization, user = make_organization_and_user() + make_user_notification_policy( + user=user, + step=UserNotificationPolicy.Step.NOTIFY, + notify_by=UserNotificationPolicy.NotificationChannel.SMS, # channel is in NOTIFICATION_CHANNELS_TO_BUNDLE + ) + alert_receive_channel = make_alert_receive_channel(organization=organization) + alert_group = make_alert_group(alert_receive_channel=alert_receive_channel) + + # send notification, check notification_bundle was not created + notify_user_task(user.id, alert_group.id) + assert not user.notification_bundles.exists() + + +@pytest.mark.django_db +def test_send_bundle_notification( + make_organization_and_user, + make_user_notification_policy, + make_user_notification_bundle, + make_alert_receive_channel, + make_alert_group, + settings, + caplog, +): + settings.FEATURE_NOTIFICATION_BUNDLE_ENABLED = True + organization, user = make_organization_and_user() + notification_policy = make_user_notification_policy( + user=user, + step=UserNotificationPolicy.Step.NOTIFY, + notify_by=UserNotificationPolicy.NotificationChannel.SMS, # channel is in NOTIFICATION_CHANNELS_TO_BUNDLE + ) + alert_receive_channel = make_alert_receive_channel(organization=organization) + alert_group_1 = make_alert_group(alert_receive_channel=alert_receive_channel) + alert_group_2 = make_alert_group(alert_receive_channel=alert_receive_channel) + alert_group_3 = make_alert_group(alert_receive_channel=alert_receive_channel) + notification_bundle = make_user_notification_bundle( + user, UserNotificationPolicy.NotificationChannel.SMS, notification_task_id="test_task_id", eta=timezone.now() + ) + notification_bundle.append_notification(alert_group_1, notification_policy) + notification_bundle.append_notification(alert_group_2, notification_policy) + notification_bundle.append_notification(alert_group_3, notification_policy) + assert notification_bundle.notifications.filter(bundle_uuid__isnull=True).count() == 3 + alert_group_3.resolve() + with patch("apps.alerts.tasks.notify_user.compare_escalations", return_value=True): + # send notification for 2 active alert groups + send_bundled_notification(notification_bundle.id) + assert f"alert_group {alert_group_3.id} is not active, skip notification" in caplog.text + assert "perform bundled notification for alert groups with ids:" in caplog.text + # check bundle_uuid was set, notification for resolved alert group was deleted + assert notification_bundle.notifications.filter(bundle_uuid__isnull=True).count() == 0 + assert notification_bundle.notifications.all().count() == 2 + assert not notification_bundle.notifications.filter(alert_group=alert_group_3).exists() + + # send notification for 1 active alert group + notification_bundle.notifications.update(bundle_uuid=None) + alert_group_2.resolve() + send_bundled_notification(notification_bundle.id) + assert f"alert_group {alert_group_2.id} is not active, skip notification" in caplog.text + assert ( + f"there is only one alert group in bundled notification, perform regular notification. " + f"alert_group {alert_group_1.id}" + ) in caplog.text + # check all notifications were deleted + assert notification_bundle.notifications.all().count() == 0 + + # send notification for 0 active alert group + notification_bundle.append_notification(alert_group_1, notification_policy) + alert_group_1.resolve() + send_bundled_notification(notification_bundle.id) + assert f"alert_group {alert_group_1.id} is not active, skip notification" in caplog.text + assert f"no alert groups to notify about or notification is not allowed for user {user.id}" in caplog.text + # check all notifications were deleted + assert notification_bundle.notifications.all().count() == 0 + + +@pytest.mark.django_db +def test_send_bundle_notification_task_id_mismatch( + make_organization_and_user, + make_user_notification_bundle, + settings, + caplog, +): + settings.FEATURE_NOTIFICATION_BUNDLE_ENABLED = True + organization, user = make_organization_and_user() + notification_bundle = make_user_notification_bundle( + user, UserNotificationPolicy.NotificationChannel.SMS, notification_task_id="test_task_id", eta=timezone.now() + ) + send_bundled_notification(notification_bundle.id) + assert ( + f"send_bundled_notification: notification_task_id mismatch. " + f"Duplication or non-active notification triggered. " + f"Active: {notification_bundle.notification_task_id}" + ) in caplog.text + + +@pytest.mark.django_db +def test_notify_user_task_notification_bundle_eta_is_outdated( + make_organization_and_user, + make_user_for_organization, + make_user_notification_policy, + make_user_notification_bundle, + make_alert_receive_channel, + make_alert_group, + settings, +): + settings.FEATURE_NOTIFICATION_BUNDLE_ENABLED = True + organization, user = make_organization_and_user() + notification_policy = make_user_notification_policy( + user=user, + step=UserNotificationPolicy.Step.NOTIFY, + notify_by=UserNotificationPolicy.NotificationChannel.SMS, # channel is in NOTIFICATION_CHANNELS_TO_BUNDLE + ) + alert_receive_channel = make_alert_receive_channel(organization=organization) + alert_group_1 = make_alert_group(alert_receive_channel=alert_receive_channel) + alert_group_2 = make_alert_group(alert_receive_channel=alert_receive_channel) + now = timezone.now() + outdated_eta = now - timezone.timedelta(minutes=5) + test_task_id = "test_task_id" + notification_bundle = make_user_notification_bundle( + user, + UserNotificationPolicy.NotificationChannel.SMS, + eta=outdated_eta, + notification_task_id=test_task_id, + last_notified_at=now, + ) + notification_bundle.append_notification(alert_group_1, notification_policy) + assert not notification_bundle.eta_is_valid() + assert notification_bundle.notifications.count() == 1 + + # call notify_user_task and check that new notification task for notification_bundle was scheduled + notify_user_task(user.id, alert_group_2.id) + notification_bundle.refresh_from_db() + assert notification_bundle.eta_is_valid() + assert notification_bundle.notification_task_id != test_task_id + assert notification_bundle.last_notified_at == now + assert notification_bundle.notifications.count() == 2 diff --git a/engine/apps/metrics_exporter/helpers.py b/engine/apps/metrics_exporter/helpers.py index 7baf85a6..db1164bc 100644 --- a/engine/apps/metrics_exporter/helpers.py +++ b/engine/apps/metrics_exporter/helpers.py @@ -327,8 +327,11 @@ def metrics_update_alert_groups_response_time_cache(integrations_response_time: cache.set(metric_alert_groups_response_time_key, metric_alert_groups_response_time, timeout=metrics_cache_timeout) -def metrics_update_user_cache(user): - """Update "user_was_notified_of_alert_groups" metric cache.""" +def metrics_update_user_cache(user, counter=1): + """ + Increase "user_was_notified_of_alert_groups" metric cache by counter. + Counter shows how many alert groups user was notified of. + """ metrics_cache_timeout = get_metrics_cache_timeout(user.organization_id) metric_user_was_notified_key = get_metric_user_was_notified_of_alert_groups_key(user.organization_id) metric_user_was_notified: typing.Dict[int, UserWasNotifiedOfAlertGroupsMetricsDict] = cache.get( @@ -344,6 +347,6 @@ def metrics_update_user_cache(user): "id": user.organization.stack_id, "counter": 0, }, - )["counter"] += 1 + )["counter"] += counter cache.set(metric_user_was_notified_key, metric_user_was_notified, timeout=metrics_cache_timeout) diff --git a/engine/apps/metrics_exporter/tasks.py b/engine/apps/metrics_exporter/tasks.py index af1f6393..1663ddec 100644 --- a/engine/apps/metrics_exporter/tasks.py +++ b/engine/apps/metrics_exporter/tasks.py @@ -285,8 +285,8 @@ def update_metrics_for_alert_group(alert_group_id, organization_id, previous_sta @shared_dedicated_queue_retry_task( autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else 10 ) -def update_metrics_for_user(user_id): +def update_metrics_for_user(user_id, counter=1): from apps.user_management.models import User user = User.objects.get(id=user_id) - metrics_update_user_cache(user) + metrics_update_user_cache(user, counter) diff --git a/engine/apps/metrics_exporter/tests/test_update_metrics_cache.py b/engine/apps/metrics_exporter/tests/test_update_metrics_cache.py index 7292f953..84a5a342 100644 --- a/engine/apps/metrics_exporter/tests/test_update_metrics_cache.py +++ b/engine/apps/metrics_exporter/tests/test_update_metrics_cache.py @@ -6,6 +6,7 @@ from django.test import override_settings from apps.alerts.signals import alert_group_created_signal from apps.alerts.tasks import notify_user_task +from apps.alerts.tasks.notify_user import update_metric_if_needed from apps.base.models import UserNotificationPolicy, UserNotificationPolicyLogRecord from apps.metrics_exporter.constants import NO_SERVICE_VALUE, SERVICE_LABEL from apps.metrics_exporter.helpers import ( @@ -607,6 +608,82 @@ def test_update_metrics_cache_on_user_notification( arg_idx = get_called_arg_index_and_compare_results() +@override_settings(CELERY_TASK_ALWAYS_EAGER=True) +@pytest.mark.django_db +def test_update_metric_if_needed( + mock_apply_async, + make_organization, + make_user_for_organization, + make_alert_receive_channel, + make_alert_group, + make_user_notification_policy_log_record, + make_user_was_notified_metrics_cache_params, + monkeypatch, + mock_get_metrics_cache, +): + organization = make_organization( + org_id=METRICS_TEST_ORG_ID, + stack_slug=METRICS_TEST_INSTANCE_SLUG, + stack_id=METRICS_TEST_INSTANCE_ID, + ) + alert_receive_channel = make_alert_receive_channel( + organization, + verbal_name=METRICS_TEST_INTEGRATION_NAME, + ) + user = make_user_for_organization(organization, username=METRICS_TEST_USER_USERNAME) + + alert_group_1 = make_alert_group(alert_receive_channel) + alert_group_2 = make_alert_group(alert_receive_channel) + alert_group_3 = make_alert_group(alert_receive_channel) + + # create 1 log record for alert_group_1, 2 log records for alert_group_2 and 3 log records for alert_group_3 + for idx, alert_group in enumerate([alert_group_1, alert_group_2, alert_group_3], start=1): + for _ in range(idx): + make_user_notification_policy_log_record( + type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED, + author=user, + alert_group=alert_group, + ) + + metrics_cache = make_user_was_notified_metrics_cache_params(user.id, organization.id) + monkeypatch.setattr(cache, "get", metrics_cache) + + metric_user_was_notified_key = get_metric_user_was_notified_of_alert_groups_key(organization.id) + + expected_result_metric_user_was_notified = { + user.id: { + "org_id": organization.org_id, + "slug": organization.stack_slug, + "id": organization.stack_id, + "user_username": METRICS_TEST_USER_USERNAME, + "counter": 1, + } + } + + with patch("apps.metrics_exporter.tasks.cache.set") as mock_cache_set: + # check current metric value + assert cache.get(metric_user_was_notified_key, {}) == expected_result_metric_user_was_notified + + expected_result_metric_user_was_notified[user.id]["counter"] += 1 + update_metric_if_needed(user, [alert_group_1.id, alert_group_2.id, alert_group_3.id]) + + # check user_was_notified_of_alert_groups metric counter was increased by 1 + # (for alert_group_1 that has only one log record with type "triggered") + mock_cache_set.assert_called_once() + mock_cache_set_called_args = mock_cache_set.call_args_list + assert mock_cache_set_called_args[0].args[0] == metric_user_was_notified_key + assert mock_cache_set_called_args[0].args[1] == expected_result_metric_user_was_notified + # create new log record for alert_group_1 + make_user_notification_policy_log_record( + type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED, + author=user, + alert_group=alert_group_1, + ) + # check metric was not updated + update_metric_if_needed(user, [alert_group_1.id, alert_group_2.id, alert_group_3.id]) + mock_cache_set.assert_called_once() + + @pytest.mark.django_db def test_metrics_add_integrations_to_cache(make_organization, make_alert_receive_channel): organization = make_organization( diff --git a/engine/apps/phone_notifications/phone_backend.py b/engine/apps/phone_notifications/phone_backend.py index c3db1094..71ccf626 100644 --- a/engine/apps/phone_notifications/phone_backend.py +++ b/engine/apps/phone_notifications/phone_backend.py @@ -193,6 +193,10 @@ class PhoneBackend: log_record.save() user_notification_action_triggered_signal.send(sender=PhoneBackend.notify_by_sms, log_record=log_record) + @staticmethod + def notify_by_sms_bundle_async(user, bundle_uuid): + pass # todo: will be added in a separate PR + def _notify_by_provider_sms(self, user, message) -> Optional[ProviderSMS]: """ _notify_by_provider_sms sends a notification sms using configured phone provider. diff --git a/engine/conftest.py b/engine/conftest.py index 899679fd..c4de1730 100644 --- a/engine/conftest.py +++ b/engine/conftest.py @@ -37,6 +37,7 @@ from apps.alerts.tests.factories import ( InvitationFactory, ResolutionNoteFactory, ResolutionNoteSlackMessageFactory, + UserNotificationBundleFactory, ) from apps.api.permissions import ( ACTION_PREFIX, @@ -145,6 +146,7 @@ register(LabelKeyFactory) register(LabelValueFactory) register(AlertReceiveChannelAssociatedLabelFactory) register(GoogleOAuth2UserFactory) +register(UserNotificationBundleFactory) IS_RBAC_ENABLED = os.getenv("ONCALL_TESTING_RBAC_ENABLED", "True") == "True" @@ -1077,3 +1079,13 @@ def make_google_oauth2_user_for_user(): return GoogleOAuth2UserFactory(user=user) return _make_google_oauth2_user_for_user + + +@pytest.fixture +def make_user_notification_bundle(): + def _make_user_notification_bundle(user, notification_channel, important=False, **kwargs): + return UserNotificationBundleFactory( + user=user, notification_channel=notification_channel, important=important, **kwargs + ) + + return _make_user_notification_bundle diff --git a/engine/settings/base.py b/engine/settings/base.py index 6638d16b..3e40442b 100644 --- a/engine/settings/base.py +++ b/engine/settings/base.py @@ -72,6 +72,7 @@ FEATURE_LABELS_ENABLED_FOR_ALL = getenv_boolean("FEATURE_LABELS_ENABLED_FOR_ALL" # Enable labels feature for organizations from the list. Use OnCall organization ID, for this flag FEATURE_LABELS_ENABLED_PER_ORG = getenv_list("FEATURE_LABELS_ENABLED_PER_ORG", default=list()) FEATURE_ALERT_GROUP_SEARCH_ENABLED = getenv_boolean("FEATURE_ALERT_GROUP_SEARCH_ENABLED", default=False) +FEATURE_NOTIFICATION_BUNDLE_ENABLED = getenv_boolean("FEATURE_NOTIFICATION_BUNDLE_ENABLED", default=False) TWILIO_API_KEY_SID = os.environ.get("TWILIO_API_KEY_SID") TWILIO_API_KEY_SECRET = os.environ.get("TWILIO_API_KEY_SECRET") diff --git a/engine/settings/celery_task_routes.py b/engine/settings/celery_task_routes.py index 9d8e0537..fa747165 100644 --- a/engine/settings/celery_task_routes.py +++ b/engine/settings/celery_task_routes.py @@ -105,6 +105,7 @@ CELERY_TASK_ROUTES = { "apps.alerts.tasks.notify_ical_schedule_shift.notify_ical_schedule_shift": {"queue": "critical"}, "apps.alerts.tasks.notify_user.notify_user_task": {"queue": "critical"}, "apps.alerts.tasks.notify_user.perform_notification": {"queue": "critical"}, + "apps.alerts.tasks.notify_user.send_bundled_notification": {"queue": "critical"}, "apps.alerts.tasks.notify_user.send_user_notification_signal": {"queue": "critical"}, "apps.alerts.tasks.resolve_alert_group_by_source_if_needed.resolve_alert_group_by_source_if_needed": { "queue": "critical"