From 191814b25ea43c317837eb0da9a18bee4f31f519 Mon Sep 17 00:00:00 2001 From: Yulya Artyukhina Date: Tue, 16 Jul 2024 13:24:08 +0200 Subject: [PATCH] User notifications bundle (#4457) # What this PR does This PR adds two new models: UserNotificationBundle and BundledNotification (proposals for naming are welcome). `UserNotificationBundle` manages the information about last notification time and scheduled notification task for bundled notifications. It is unique per user + notification_channel + notification importance. `BundledNotification` contains notification policy and alert group, that triggered the notification. The BundledNotification instance is created in `notify_user_task` for every notification, that should be bundled, and is attached to UserNotificationBundle by ForeignKey connection. How it works: If the user was notified recently (within the last two minutes) by the current notification channel, and this channel is bundlable, BundledNotification instance will be created and attached to the UserNotificationBundle instance, and `send_bundled_notification` task will be scheduled to execute in 2 min. In `send_bundled_notification` task we get all BundledNotification attached to the current UserNotificationBundle instance, check if alert groups are still active and if there is only one notification - perform regular notification by calling `perform_notification` task, otherwise call "notify_by__bundle" method for the current notification channel. PR with method to send notification bundle by SMS - https://github.com/grafana/oncall/pull/4624 **This feature is disabled by default by feature flag. Public docs will be added in a separate PR with enabling this feature.** ## Which issue(s) this PR closes related to https://github.com/grafana/oncall-private/issues/2712 ## Checklist - [x] Unit, integration, and e2e (if applicable) tests updated - [x] Documentation added (or `pr:no public docs` PR label added if not required) - [x] Added the relevant release notes label (see labels prefixed w/ `release:`). These labels dictate how your PR will show up in the autogenerated release notes. --- engine/apps/alerts/constants.py | 2 + ...tionbundle_bundlednotification_and_more.py | 45 +++ engine/apps/alerts/models/__init__.py | 1 + .../alerts/models/user_has_notification.py | 8 + .../alerts/models/user_notification_bundle.py | 87 ++++ engine/apps/alerts/paging.py | 3 +- engine/apps/alerts/tasks/notify_user.py | 374 ++++++++++++++---- engine/apps/alerts/tests/factories.py | 6 + engine/apps/alerts/tests/test_notify_user.py | 207 +++++++++- engine/apps/metrics_exporter/helpers.py | 9 +- engine/apps/metrics_exporter/tasks.py | 4 +- .../tests/test_update_metrics_cache.py | 77 ++++ .../apps/phone_notifications/phone_backend.py | 4 + engine/conftest.py | 12 + engine/settings/base.py | 1 + engine/settings/celery_task_routes.py | 1 + 16 files changed, 746 insertions(+), 95 deletions(-) create mode 100644 engine/apps/alerts/migrations/0054_usernotificationbundle_bundlednotification_and_more.py create mode 100644 engine/apps/alerts/models/user_notification_bundle.py diff --git a/engine/apps/alerts/constants.py b/engine/apps/alerts/constants.py index f5c9f196..496836ca 100644 --- a/engine/apps/alerts/constants.py +++ b/engine/apps/alerts/constants.py @@ -16,6 +16,8 @@ TASK_DELAY_SECONDS = 1 NEXT_ESCALATION_DELAY = 5 +BUNDLED_NOTIFICATION_DELAY_SECONDS = 60 * 2 # 2 min + # AlertGroup states verbal class AlertGroupState(str, Enum): diff --git a/engine/apps/alerts/migrations/0054_usernotificationbundle_bundlednotification_and_more.py b/engine/apps/alerts/migrations/0054_usernotificationbundle_bundlednotification_and_more.py new file mode 100644 index 00000000..a4d4fcc8 --- /dev/null +++ b/engine/apps/alerts/migrations/0054_usernotificationbundle_bundlednotification_and_more.py @@ -0,0 +1,45 @@ +# Generated by Django 4.2.10 on 2024-06-20 11:00 + +import apps.base.models.user_notification_policy +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ('base', '0005_drop_unused_dynamic_settings'), + ('user_management', '0022_alter_team_unique_together'), + ('alerts', '0053_channelfilter_filtering_labels'), + ] + + operations = [ + migrations.CreateModel( + name='UserNotificationBundle', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('important', models.BooleanField()), + ('notification_channel', models.PositiveSmallIntegerField(default=None, null=True, validators=[apps.base.models.user_notification_policy.validate_channel_choice])), + ('last_notified_at', models.DateTimeField(default=None, null=True)), + ('notification_task_id', models.CharField(default=None, max_length=100, null=True)), + ('eta', models.DateTimeField(default=None, null=True)), + ('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='notification_bundles', to='user_management.user')), + ], + ), + migrations.CreateModel( + name='BundledNotification', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('created_at', models.DateTimeField(auto_now_add=True)), + ('bundle_uuid', models.CharField(db_index=True, default=None, max_length=100, null=True)), + ('alert_group', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='alerts.alertgroup')), + ('alert_receive_channel', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='alerts.alertreceivechannel')), + ('notification_bundle', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='notifications', to='alerts.usernotificationbundle')), + ('notification_policy', models.ForeignKey(null=True, on_delete=django.db.models.deletion.SET_NULL, to='base.usernotificationpolicy')), + ], + ), + migrations.AddConstraint( + model_name='usernotificationbundle', + constraint=models.UniqueConstraint(fields=('user', 'important', 'notification_channel'), name='unique_user_notification_bundle'), + ), + ] diff --git a/engine/apps/alerts/models/__init__.py b/engine/apps/alerts/models/__init__.py index c537fc31..51b44158 100644 --- a/engine/apps/alerts/models/__init__.py +++ b/engine/apps/alerts/models/__init__.py @@ -15,3 +15,4 @@ from .invitation import Invitation # noqa: F401 from .maintainable_object import MaintainableObject # noqa: F401 from .resolution_note import ResolutionNote, ResolutionNoteSlackMessage # noqa: F401 from .user_has_notification import UserHasNotification # noqa: F401 +from .user_notification_bundle import BundledNotification, UserNotificationBundle # noqa: F401 diff --git a/engine/apps/alerts/models/user_has_notification.py b/engine/apps/alerts/models/user_has_notification.py index 967a93cf..1dc51d0b 100644 --- a/engine/apps/alerts/models/user_has_notification.py +++ b/engine/apps/alerts/models/user_has_notification.py @@ -16,3 +16,11 @@ class UserHasNotification(models.Model): class Meta: unique_together = ("user", "alert_group") + + def update_active_task_id(self, task_id): + """ + `active_notification_policy_id` keeps celery task_id of the next scheduled `notify_user_task` + for the current user + """ + self.active_notification_policy_id = task_id + self.save(update_fields=["active_notification_policy_id"]) diff --git a/engine/apps/alerts/models/user_notification_bundle.py b/engine/apps/alerts/models/user_notification_bundle.py new file mode 100644 index 00000000..4c793637 --- /dev/null +++ b/engine/apps/alerts/models/user_notification_bundle.py @@ -0,0 +1,87 @@ +import datetime +import typing + +from django.db import models +from django.utils import timezone + +from apps.alerts.constants import BUNDLED_NOTIFICATION_DELAY_SECONDS +from apps.base.models import UserNotificationPolicy +from apps.base.models.user_notification_policy import validate_channel_choice + +if typing.TYPE_CHECKING: + from django.db.models.manager import RelatedManager + + from apps.alerts.models import AlertGroup, AlertReceiveChannel + from apps.user_management.models import User + + +class UserNotificationBundle(models.Model): + user: "User" + notifications: "RelatedManager['BundledNotification']" + + NOTIFICATION_CHANNELS_TO_BUNDLE = [ + UserNotificationPolicy.NotificationChannel.SMS, + ] + + user = models.ForeignKey("user_management.User", on_delete=models.CASCADE, related_name="notification_bundles") + important = models.BooleanField() + notification_channel = models.PositiveSmallIntegerField( + validators=[validate_channel_choice], null=True, default=None + ) + last_notified_at = models.DateTimeField(default=None, null=True) + notification_task_id = models.CharField(max_length=100, null=True, default=None) + # estimated time of arrival for scheduled send_bundled_notification task + eta = models.DateTimeField(default=None, null=True) + + class Meta: + constraints = [ + models.UniqueConstraint( + fields=["user", "important", "notification_channel"], name="unique_user_notification_bundle" + ) + ] + + def notified_recently(self) -> bool: + return ( + timezone.now() - self.last_notified_at < timezone.timedelta(seconds=BUNDLED_NOTIFICATION_DELAY_SECONDS) + if self.last_notified_at + else False + ) + + def eta_is_valid(self) -> bool: + """ + `eta` shows eta of scheduled send_bundled_notification task and should never be less than the current time + (with a 1 minute buffer provided). + `eta` is None means that there is no scheduled task. + """ + if not self.eta or self.eta + timezone.timedelta(minutes=1) >= timezone.now(): + return True + return False + + def get_notification_eta(self) -> datetime.datetime: + last_notified = self.last_notified_at if self.last_notified_at else timezone.now() + return last_notified + timezone.timedelta(seconds=BUNDLED_NOTIFICATION_DELAY_SECONDS) + + def append_notification(self, alert_group: "AlertGroup", notification_policy: "UserNotificationPolicy"): + self.notifications.create( + alert_group=alert_group, notification_policy=notification_policy, alert_receive_channel=alert_group.channel + ) + + @classmethod + def notification_is_bundleable(cls, notification_channel): + return notification_channel in cls.NOTIFICATION_CHANNELS_TO_BUNDLE + + +class BundledNotification(models.Model): + alert_group: "AlertGroup" + alert_receive_channel: "AlertReceiveChannel" + notification_policy: typing.Optional["UserNotificationPolicy"] + notification_bundle: "UserNotificationBundle" + + alert_group = models.ForeignKey("alerts.AlertGroup", on_delete=models.CASCADE) + alert_receive_channel = models.ForeignKey("alerts.AlertReceiveChannel", on_delete=models.CASCADE) + notification_policy = models.ForeignKey("base.UserNotificationPolicy", on_delete=models.SET_NULL, null=True) + notification_bundle = models.ForeignKey( + UserNotificationBundle, on_delete=models.CASCADE, related_name="notifications" + ) + created_at = models.DateTimeField(auto_now_add=True) + bundle_uuid = models.CharField(max_length=100, null=True, default=None, db_index=True) diff --git a/engine/apps/alerts/paging.py b/engine/apps/alerts/paging.py index 6a3cadd3..b03f52d7 100644 --- a/engine/apps/alerts/paging.py +++ b/engine/apps/alerts/paging.py @@ -187,8 +187,7 @@ def unpage_user(alert_group: AlertGroup, user: User, from_user: User) -> None: user_has_notification = UserHasNotification.objects.filter( user=user, alert_group=alert_group ).select_for_update()[0] - user_has_notification.active_notification_policy_id = None - user_has_notification.save(update_fields=["active_notification_policy_id"]) + user_has_notification.update_active_task_id(task_id=None) except IndexError: return finally: diff --git a/engine/apps/alerts/tasks/notify_user.py b/engine/apps/alerts/tasks/notify_user.py index cdfd35d6..bbcec36d 100644 --- a/engine/apps/alerts/tasks/notify_user.py +++ b/engine/apps/alerts/tasks/notify_user.py @@ -1,23 +1,94 @@ -import time import typing from functools import partial +from uuid import uuid4 from celery.exceptions import Retry from django.conf import settings from django.db import transaction +from django.db.models import Count from django.utils import timezone from kombu.utils.uuid import uuid as celery_uuid from telegram.error import RetryAfter from apps.alerts.constants import NEXT_ESCALATION_DELAY -from apps.alerts.signals import user_notification_action_triggered_signal +from apps.alerts.tasks.send_update_log_report_signal import send_update_log_report_signal from apps.base.messaging import get_messaging_backend_from_id from apps.metrics_exporter.tasks import update_metrics_for_user from apps.phone_notifications.phone_backend import PhoneBackend from common.custom_celery_tasks import shared_dedicated_queue_retry_task +from .compare_escalations import compare_escalations from .task_logger import task_logger +if typing.TYPE_CHECKING: + from apps.alerts.models import AlertGroup, UserNotificationBundle + from apps.base.models import UserNotificationPolicy + from apps.user_management.models import User + + +def schedule_send_bundled_notification_task( + user_notification_bundle: "UserNotificationBundle", alert_group: "AlertGroup" +): + """Schedule a task to send bundled notifications""" + send_bundled_notification.apply_async( + (user_notification_bundle.id,), + eta=user_notification_bundle.eta, + task_id=user_notification_bundle.notification_task_id, + ) + task_logger.info( + f"Scheduled send_bundled_notification task {user_notification_bundle.notification_task_id}, " + f"user_notification_bundle: {user_notification_bundle.id}, alert_group {alert_group.id}, " + f"eta: {user_notification_bundle.eta}" + ) + + +def schedule_perform_notification_task( + log_record_pk: int, alert_group_pk: int, use_default_notification_policy_fallback: bool +): + task = perform_notification.apply_async((log_record_pk, use_default_notification_policy_fallback)) + task_logger.info( + f"Created perform_notification task {task} log_record={log_record_pk} " f"alert_group={alert_group_pk}" + ) + + +def build_notification_reason_for_log_record( + notification_policies: typing.List["UserNotificationPolicy"], reason: typing.Optional[str] +) -> str: + from apps.base.models import UserNotificationPolicy + + # Here we collect a brief overview of notification steps configured for user to send it to thread. + collected_steps_ids = [] + for next_notification_policy in notification_policies: + if next_notification_policy.step == UserNotificationPolicy.Step.NOTIFY: + if next_notification_policy.notify_by not in collected_steps_ids: + collected_steps_ids.append(next_notification_policy.notify_by) + + collected_steps = ", ".join( + UserNotificationPolicy.NotificationChannel(step_id).label for step_id in collected_steps_ids + ) + reason = ("Reason: " + reason + "\n") if reason is not None else "" + reason += ("Further notification plan: " + collected_steps) if len(collected_steps_ids) > 0 else "" + return reason + + +def update_metric_if_needed(user: "User", active_alert_group_ids: typing.List[int]): + from apps.base.models import UserNotificationPolicyLogRecord + + # get count of alert groups with only one personal log record with type "triggered" + alert_groups_with_one_log = ( + user.personal_log_records.filter( + type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED, + alert_group_id__in=active_alert_group_ids, + ) + .values("alert_group") + .annotate(count=Count("alert_group")) + .filter(count=1) + .count() + ) + + if alert_groups_with_one_log > 0: + update_metrics_for_user.apply_async((user.id, alert_groups_with_one_log)) + @shared_dedicated_queue_retry_task( autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None @@ -32,7 +103,7 @@ def notify_user_task( important=False, notify_anyway=False, ): - from apps.alerts.models import AlertGroup, UserHasNotification + from apps.alerts.models import AlertGroup, UserHasNotification, UserNotificationBundle from apps.base.models import UserNotificationPolicy, UserNotificationPolicyLogRecord from apps.user_management.models import User @@ -44,6 +115,8 @@ def notify_user_task( countdown = 0 stop_escalation = False log_record = None + is_notification_bundled = False + user_notification_bundle = None with transaction.atomic(): try: @@ -51,8 +124,6 @@ def notify_user_task( except User.DoesNotExist: return f"notify_user_task: user {user_pk} doesn't exist" - organization = alert_group.channel.organization - if not user.is_notification_allowed: task_logger.info(f"notify_user_task: user {user.pk} notification is not allowed") UserNotificationPolicyLogRecord( @@ -82,21 +153,8 @@ def notify_user_task( f"notify_user_task: Failed to notify. No notification policies. user_id={user_pk} alert_group_id={alert_group_pk} important={important}" ) return - - # Here we collect a brief overview of notification steps configured for user to send it to thread. - collected_steps_ids = [] - for next_notification_policy in notification_policies: - if next_notification_policy.step == UserNotificationPolicy.Step.NOTIFY: - if next_notification_policy.notify_by not in collected_steps_ids: - collected_steps_ids.append(next_notification_policy.notify_by) - + reason = build_notification_reason_for_log_record(notification_policies, reason) notification_policy = notification_policies[0] - - collected_steps = ", ".join( - UserNotificationPolicy.NotificationChannel(step_id).label for step_id in collected_steps_ids - ) - reason = ("Reason: " + reason + "\n") if reason is not None else "" - reason += ("Further notification plan: " + collected_steps) if len(collected_steps_ids) > 0 else "" else: if notify_user_task.request.id != user_has_notification.active_notification_policy_id: task_logger.info( @@ -108,14 +166,14 @@ def notify_user_task( try: notification_policy = UserNotificationPolicy.objects.get(pk=previous_notification_policy_pk) - if notification_policy.user.organization != organization: + if notification_policy.user != user: notification_policy = UserNotificationPolicy.objects.get( order=notification_policy.order, user=user, important=important ) notification_policy = notification_policy.next() except UserNotificationPolicy.DoesNotExist: task_logger.info( - f"notify_user_taskLNotification policy {previous_notification_policy_pk} has been deleted" + f"notify_user_task: Notification policy {previous_notification_policy_pk} has been deleted" ) return reason = None @@ -138,27 +196,25 @@ def notify_user_task( if notification_policy is None: stop_escalation = True log_record = _create_notification_finished_user_notification_policy_log_record() + task_logger.info(f"Personal escalation exceeded. User: {user.pk}, alert_group: {alert_group.pk}") else: if ( (alert_group.acknowledged and not notify_even_acknowledged) + or (alert_group.silenced and not notify_anyway) or alert_group.resolved or alert_group.wiped_at or alert_group.root_alert_group ): - return "Acknowledged, resolved, attached or wiped." - - if alert_group.silenced and not notify_anyway: task_logger.info( - f"notify_user_task: skip notification user {user.pk} because alert_group {alert_group.pk} is silenced" + f"notify_user_task: skip notification user {user.pk}, alert_group {alert_group.pk} is " + f"{alert_group.state} and/or attached or wiped" ) - return + return "Acknowledged, resolved, silenced, attached or wiped." if notification_policy.step == UserNotificationPolicy.Step.WAIT: - if notification_policy.wait_delay is not None: - delay_in_seconds = notification_policy.wait_delay.total_seconds() - else: - delay_in_seconds = 0 - countdown = delay_in_seconds + countdown = ( + notification_policy.wait_delay.total_seconds() if notification_policy.wait_delay is not None else 0 + ) log_record = _create_user_notification_policy_log_record( author=user, type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED, @@ -167,7 +223,7 @@ def notify_user_task( slack_prevent_posting=prevent_posting_to_thread, notification_step=notification_policy.step, ) - task_logger.info(f"notify_user_task: Waiting {delay_in_seconds} to notify user {user.pk}") + task_logger.info(f"notify_user_task: Waiting {countdown} to notify user {user.pk}") elif notification_policy.step == UserNotificationPolicy.Step.NOTIFY: user_to_be_notified_in_slack = ( notification_policy.notify_by == UserNotificationPolicy.NotificationChannel.SLACK @@ -185,17 +241,49 @@ def notify_user_task( notification_error_code=UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_POSTING_TO_SLACK_IS_DISABLED, ) else: - log_record = _create_user_notification_policy_log_record( - author=user, - type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED, - notification_policy=notification_policy, - alert_group=alert_group, - reason=reason, - slack_prevent_posting=prevent_posting_to_thread, - notification_step=notification_policy.step, - notification_channel=notification_policy.notify_by, - ) + if ( + settings.FEATURE_NOTIFICATION_BUNDLE_ENABLED + and UserNotificationBundle.notification_is_bundleable(notification_policy.notify_by) + ): + user_notification_bundle, _ = UserNotificationBundle.objects.select_for_update().get_or_create( + user=user, important=important, notification_channel=notification_policy.notify_by + ) + # check if notification needs to be bundled + if user_notification_bundle.notified_recently(): + user_notification_bundle.append_notification(alert_group, notification_policy) + # schedule send_bundled_notification task if it hasn't been scheduled or the task eta is + # outdated + eta_is_valid = user_notification_bundle.eta_is_valid() + if not eta_is_valid: + task_logger.warning( + f"ETA is not valid - {user_notification_bundle.eta}, " + f"user_notification_bundle {user_notification_bundle.id}, " + f"task_id {user_notification_bundle.notification_task_id}. " + f"Rescheduling the send_bundled_notification task" + ) + if not user_notification_bundle.notification_task_id or not eta_is_valid: + user_notification_bundle.notification_task_id = celery_uuid() + user_notification_bundle.eta = user_notification_bundle.get_notification_eta() + user_notification_bundle.save(update_fields=["notification_task_id", "eta"]) + transaction.on_commit( + partial( + schedule_send_bundled_notification_task, user_notification_bundle, alert_group + ) + ) + is_notification_bundled = True + + if not is_notification_bundled: + log_record = _create_user_notification_policy_log_record( + author=user, + type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED, + notification_policy=notification_policy, + alert_group=alert_group, + reason=reason, + slack_prevent_posting=prevent_posting_to_thread, + notification_step=notification_policy.step, + notification_channel=notification_policy.notify_by, + ) if log_record: # log_record is None if user notification policy step is unspecified # if this is the first notification step, and user hasn't been notified for this alert group - update metric if ( @@ -207,48 +295,41 @@ def notify_user_task( ).exists() ): update_metrics_for_user.apply_async((user.id,)) - log_record.save() - if notify_user_task.request.retries == 0: - transaction.on_commit(partial(send_user_notification_signal.apply_async, (log_record.pk,))) - - def _create_perform_notification_task(log_record_pk, alert_group_pk, use_default_notification_policy_fallback): - task = perform_notification.apply_async((log_record_pk, use_default_notification_policy_fallback)) - task_logger.info( - f"Created perform_notification task {task} log_record={log_record_pk} " f"alert_group={alert_group_pk}" - ) - - def _update_user_has_notification_active_notification_policy_id(active_policy_id: typing.Optional[str]) -> None: - user_has_notification.active_notification_policy_id = active_policy_id - user_has_notification.save(update_fields=["active_notification_policy_id"]) - - def _reset_user_has_notification_active_notification_policy_id() -> None: - _update_user_has_notification_active_notification_policy_id(None) - - create_perform_notification_task = partial( - _create_perform_notification_task, - log_record.pk, - alert_group_pk, - using_fallback_default_notification_policy_step, - ) if using_fallback_default_notification_policy_step: # if we are using default notification policy, we're done escalating.. there's no further notification # policy steps in this case. Kick off the perform_notification task, create the # TYPE_PERSONAL_NOTIFICATION_FINISHED log record, and reset the active_notification_policy_id - transaction.on_commit(create_perform_notification_task) + transaction.on_commit( + partial( + schedule_perform_notification_task, + log_record.pk, + alert_group_pk, + using_fallback_default_notification_policy_step, + ) + ) _create_notification_finished_user_notification_policy_log_record() - _reset_user_has_notification_active_notification_policy_id() + user_has_notification.update_active_task_id(None) elif not stop_escalation: - if notification_policy.step != UserNotificationPolicy.Step.WAIT: - transaction.on_commit(create_perform_notification_task) + # if the step is NOTIFY and notification was not not bundled, perform regular notification + # and update time when user was notified + if notification_policy.step != UserNotificationPolicy.Step.WAIT and not is_notification_bundled: + transaction.on_commit( + partial( + schedule_perform_notification_task, + log_record.pk, + alert_group_pk, + using_fallback_default_notification_policy_step, + ) + ) + + if user_notification_bundle: + user_notification_bundle.last_notified_at = timezone.now() + user_notification_bundle.save(update_fields=["last_notified_at"]) - delay = NEXT_ESCALATION_DELAY - if countdown is not None: - delay += countdown task_id = celery_uuid() - - _update_user_has_notification_active_notification_policy_id(task_id) + user_has_notification.update_active_task_id(task_id) transaction.on_commit( partial( @@ -259,12 +340,12 @@ def notify_user_task( "notify_anyway": notify_anyway, "prevent_posting_to_thread": prevent_posting_to_thread, }, - countdown=delay, + countdown=countdown + NEXT_ESCALATION_DELAY, task_id=task_id, ) ) else: - _reset_user_has_notification_active_notification_policy_id() + user_has_notification.update_active_task_id(None) @shared_dedicated_queue_retry_task( @@ -458,18 +539,137 @@ def perform_notification(log_record_pk, use_default_notification_policy_fallback backend.notify_user(user, alert_group, notification_policy) +@shared_dedicated_queue_retry_task( + autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None +) +def send_bundled_notification(user_notification_bundle_id: int): + """ + The task filters bundled notifications, attached to the current user_notification_bundle, by active alert groups, + creates notification log records and updates user_notification_bundle. + If there are no active alert groups - nothing else happens. If there is only one active alert group - regular + notification will be performed (called perform_notification task). Otherwise - "send bundled notification" method of + the current notification channel will be called. + """ + from apps.alerts.models import AlertGroup, UserNotificationBundle + from apps.base.models import UserNotificationPolicy, UserNotificationPolicyLogRecord + + task_logger.info(f"Start send_bundled_notification for user_notification_bundle {user_notification_bundle_id}") + with transaction.atomic(): + try: + user_notification_bundle = UserNotificationBundle.objects.filter( + pk=user_notification_bundle_id + ).select_for_update()[0] + except IndexError: + task_logger.info( + f"user_notification_bundle {user_notification_bundle_id} doesn't exist. " + f"The user associated with this notification bundle may have been deleted." + ) + return + + if not compare_escalations(send_bundled_notification.request.id, user_notification_bundle.notification_task_id): + task_logger.info( + f"send_bundled_notification: notification_task_id mismatch. " + f"Duplication or non-active notification triggered. " + f"Active: {user_notification_bundle.notification_task_id}" + ) + return + + notifications = user_notification_bundle.notifications.filter(bundle_uuid__isnull=True).select_related( + "alert_group" + ) + + log_records_to_create: typing.List["UserNotificationPolicyLogRecord"] = [] + skip_notification_ids: typing.List[int] = [] + active_alert_group_ids: typing.Set[int] = set() + log_record_notification_triggered = None + is_notification_allowed = user_notification_bundle.user.is_notification_allowed + + # create logs + for notification in notifications: + if notification.alert_group.status != AlertGroup.NEW: + task_logger.info(f"alert_group {notification.alert_group_id} is not active, skip notification") + skip_notification_ids.append(notification.id) + continue + elif not is_notification_allowed: + log_record_notification_failed = UserNotificationPolicyLogRecord( + author=user_notification_bundle.user, + type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED, + reason="notification is not allowed for user", + alert_group=notification.alert_group, + notification_error_code=UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_FORBIDDEN, + ) + log_records_to_create.append(log_record_notification_failed) + active_alert_group_ids.add(notification.alert_group_id) + continue + + # collect notifications for active alert groups + active_alert_group_ids.add(notification.alert_group_id) + + log_record_notification_triggered = UserNotificationPolicyLogRecord( + author=user_notification_bundle.user, + type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED, + alert_group=notification.alert_group, + notification_step=UserNotificationPolicy.Step.NOTIFY, + notification_channel=user_notification_bundle.notification_channel, + ) + log_records_to_create.append(log_record_notification_triggered) + + if len(log_records_to_create) == 1 and log_record_notification_triggered: + # perform regular notification + log_record_notification_triggered.save() + task_logger.info( + f"there is only one alert group in bundled notification, perform regular notification. " + f"alert_group {log_record_notification_triggered.alert_group_id}" + ) + transaction.on_commit( + partial( + schedule_perform_notification_task, + log_record_notification_triggered.pk, + log_record_notification_triggered.alert_group_id, + ) + ) + notifications.delete() + else: + UserNotificationPolicyLogRecord.objects.bulk_create(log_records_to_create, batch_size=5000) + + if not active_alert_group_ids or not is_notification_allowed: + task_logger.info( + f"no alert groups to notify about or notification is not allowed for user " + f"{user_notification_bundle.user_id}" + ) + notifications.delete() + else: + notifications.filter(id__in=skip_notification_ids).delete() + bundle_uuid = uuid4() + notifications.update(bundle_uuid=bundle_uuid) + task_logger.info( + f"perform bundled notification for alert groups with ids: {active_alert_group_ids}, " + f"bundle_uuid: {bundle_uuid}" + ) + if user_notification_bundle.notification_channel == UserNotificationPolicy.NotificationChannel.SMS: + PhoneBackend.notify_by_sms_bundle_async(user_notification_bundle.user, bundle_uuid) + + user_notification_bundle.notification_task_id = None + user_notification_bundle.last_notified_at = timezone.now() + user_notification_bundle.eta = None + user_notification_bundle.save(update_fields=["notification_task_id", "last_notified_at", "eta"]) + + for alert_group_id in active_alert_group_ids: + transaction.on_commit(partial(send_update_log_report_signal.apply_async, (None, alert_group_id))) + + # update metric + transaction.on_commit(partial(update_metric_if_needed, user_notification_bundle.user, active_alert_group_ids)) + + task_logger.info(f"Finished send_bundled_notification for user_notification_bundle {user_notification_bundle_id}") + + +# deprecated @shared_dedicated_queue_retry_task( autoretry_for=(Exception,), retry_backoff=True, max_retries=0 if settings.DEBUG else None ) def send_user_notification_signal(log_record_pk): - start_time = time.time() - - from apps.base.models import UserNotificationPolicyLogRecord - - task_logger.debug(f"LOG RECORD PK: {log_record_pk}") - task_logger.debug(f"LOG RECORD LAST: {UserNotificationPolicyLogRecord.objects.last()}") - - log_record = UserNotificationPolicyLogRecord.objects.get(pk=log_record_pk) - user_notification_action_triggered_signal.send(sender=send_user_notification_signal, log_record=log_record) - - task_logger.debug("--- USER SIGNAL TOOK %s seconds ---" % (time.time() - start_time)) + # Triggers user_notification_action_triggered_signal + # This signal is only connected to UserSlackRepresentative and triggers posting message to Slack about some + # FAILED notifications (see NotificationDeliveryStep and ERRORS_TO_SEND_IN_SLACK_CHANNEL). + # No need to call it here. + pass diff --git a/engine/apps/alerts/tests/factories.py b/engine/apps/alerts/tests/factories.py index 2b72c805..f07ef900 100644 --- a/engine/apps/alerts/tests/factories.py +++ b/engine/apps/alerts/tests/factories.py @@ -13,6 +13,7 @@ from apps.alerts.models import ( Invitation, ResolutionNote, ResolutionNoteSlackMessage, + UserNotificationBundle, ) from common.utils import UniqueFaker @@ -85,3 +86,8 @@ class CustomActionFactory(factory.DjangoModelFactory): class InvitationFactory(factory.DjangoModelFactory): class Meta: model = Invitation + + +class UserNotificationBundleFactory(factory.DjangoModelFactory): + class Meta: + model = UserNotificationBundle diff --git a/engine/apps/alerts/tests/test_notify_user.py b/engine/apps/alerts/tests/test_notify_user.py index 7dd2b812..e482f896 100644 --- a/engine/apps/alerts/tests/test_notify_user.py +++ b/engine/apps/alerts/tests/test_notify_user.py @@ -1,10 +1,11 @@ from unittest.mock import patch import pytest +from django.utils import timezone from telegram.error import RetryAfter from apps.alerts.models import AlertGroup -from apps.alerts.tasks.notify_user import notify_user_task, perform_notification +from apps.alerts.tasks.notify_user import notify_user_task, perform_notification, send_bundled_notification from apps.api.permissions import LegacyAccessControlRole from apps.base.models.user_notification_policy import UserNotificationPolicy from apps.base.models.user_notification_policy_log_record import UserNotificationPolicyLogRecord @@ -369,3 +370,207 @@ def test_perform_notification_use_default_notification_policy_fallback( perform_notification(log_record.pk, True) mock_notify_user.assert_called_once_with(user, alert_group, fallback_notification_policy) + + +@pytest.mark.django_db +def test_notify_user_task_notification_bundle_is_enabled( + make_organization_and_user, + make_user_for_organization, + make_user_notification_policy, + make_alert_receive_channel, + make_alert_group, + settings, +): + settings.FEATURE_NOTIFICATION_BUNDLE_ENABLED = True + organization, user_1 = make_organization_and_user() + user_2 = make_user_for_organization(organization) + make_user_notification_policy( + user=user_1, + step=UserNotificationPolicy.Step.NOTIFY, + notify_by=UserNotificationPolicy.NotificationChannel.SMS, # channel is in NOTIFICATION_CHANNELS_TO_BUNDLE + ) + make_user_notification_policy( + user=user_1, + step=UserNotificationPolicy.Step.NOTIFY, + notify_by=UserNotificationPolicy.NotificationChannel.SMS, + important=True, + ) + make_user_notification_policy( + user=user_2, + step=UserNotificationPolicy.Step.NOTIFY, + notify_by=UserNotificationPolicy.NotificationChannel.SLACK, # channel is not in NOTIFICATION_CHANNELS_TO_BUNDLE + ) + alert_receive_channel = make_alert_receive_channel(organization=organization) + alert_group_1 = make_alert_group(alert_receive_channel=alert_receive_channel) + alert_group_2 = make_alert_group(alert_receive_channel=alert_receive_channel) + assert not user_1.notification_bundles.exists() + # send 1st notification to user_1, check notification_bundle was created + # without scheduling send_bundled_notification task + notify_user_task(user_1.id, alert_group_1.id) + assert user_1.notification_bundles.count() == 1 + notification_bundle = user_1.notification_bundles.first() + assert notification_bundle.notification_task_id is None + assert not notification_bundle.notifications.exists() + # send 2nd notification to user_1, check bundled notification was attached to notification_bundle + # and send_bundled_notification was scheduled + notify_user_task(user_1.id, alert_group_2.id) + notification_bundle.refresh_from_db() + assert notification_bundle.notifications.count() == 1 + assert notification_bundle.notification_task_id is not None + # send important notification to user_1, check new notification_bundle was created + notify_user_task(user_1.id, alert_group_1.id, important=True) + assert user_1.notification_bundles.count() == 2 + important_notification_bundle = user_1.notification_bundles.get(important=True) + assert important_notification_bundle.notification_task_id is None + assert not important_notification_bundle.notifications.exists() + # send notification to user_2 (notification channel is not in NOTIFICATION_CHANNELS_TO_BUNDLE), + # check notification_bundle was not created + notify_user_task(user_2.id, alert_group_1.id) + assert not user_2.notification_bundles.exists() + + +@pytest.mark.django_db +def test_notify_user_task_notification_bundle_is_not_enabled( + make_organization_and_user, + make_user_notification_policy, + make_alert_receive_channel, + make_alert_group, + settings, +): + settings.FEATURE_NOTIFICATION_BUNDLE_ENABLED = False + organization, user = make_organization_and_user() + make_user_notification_policy( + user=user, + step=UserNotificationPolicy.Step.NOTIFY, + notify_by=UserNotificationPolicy.NotificationChannel.SMS, # channel is in NOTIFICATION_CHANNELS_TO_BUNDLE + ) + alert_receive_channel = make_alert_receive_channel(organization=organization) + alert_group = make_alert_group(alert_receive_channel=alert_receive_channel) + + # send notification, check notification_bundle was not created + notify_user_task(user.id, alert_group.id) + assert not user.notification_bundles.exists() + + +@pytest.mark.django_db +def test_send_bundle_notification( + make_organization_and_user, + make_user_notification_policy, + make_user_notification_bundle, + make_alert_receive_channel, + make_alert_group, + settings, + caplog, +): + settings.FEATURE_NOTIFICATION_BUNDLE_ENABLED = True + organization, user = make_organization_and_user() + notification_policy = make_user_notification_policy( + user=user, + step=UserNotificationPolicy.Step.NOTIFY, + notify_by=UserNotificationPolicy.NotificationChannel.SMS, # channel is in NOTIFICATION_CHANNELS_TO_BUNDLE + ) + alert_receive_channel = make_alert_receive_channel(organization=organization) + alert_group_1 = make_alert_group(alert_receive_channel=alert_receive_channel) + alert_group_2 = make_alert_group(alert_receive_channel=alert_receive_channel) + alert_group_3 = make_alert_group(alert_receive_channel=alert_receive_channel) + notification_bundle = make_user_notification_bundle( + user, UserNotificationPolicy.NotificationChannel.SMS, notification_task_id="test_task_id", eta=timezone.now() + ) + notification_bundle.append_notification(alert_group_1, notification_policy) + notification_bundle.append_notification(alert_group_2, notification_policy) + notification_bundle.append_notification(alert_group_3, notification_policy) + assert notification_bundle.notifications.filter(bundle_uuid__isnull=True).count() == 3 + alert_group_3.resolve() + with patch("apps.alerts.tasks.notify_user.compare_escalations", return_value=True): + # send notification for 2 active alert groups + send_bundled_notification(notification_bundle.id) + assert f"alert_group {alert_group_3.id} is not active, skip notification" in caplog.text + assert "perform bundled notification for alert groups with ids:" in caplog.text + # check bundle_uuid was set, notification for resolved alert group was deleted + assert notification_bundle.notifications.filter(bundle_uuid__isnull=True).count() == 0 + assert notification_bundle.notifications.all().count() == 2 + assert not notification_bundle.notifications.filter(alert_group=alert_group_3).exists() + + # send notification for 1 active alert group + notification_bundle.notifications.update(bundle_uuid=None) + alert_group_2.resolve() + send_bundled_notification(notification_bundle.id) + assert f"alert_group {alert_group_2.id} is not active, skip notification" in caplog.text + assert ( + f"there is only one alert group in bundled notification, perform regular notification. " + f"alert_group {alert_group_1.id}" + ) in caplog.text + # check all notifications were deleted + assert notification_bundle.notifications.all().count() == 0 + + # send notification for 0 active alert group + notification_bundle.append_notification(alert_group_1, notification_policy) + alert_group_1.resolve() + send_bundled_notification(notification_bundle.id) + assert f"alert_group {alert_group_1.id} is not active, skip notification" in caplog.text + assert f"no alert groups to notify about or notification is not allowed for user {user.id}" in caplog.text + # check all notifications were deleted + assert notification_bundle.notifications.all().count() == 0 + + +@pytest.mark.django_db +def test_send_bundle_notification_task_id_mismatch( + make_organization_and_user, + make_user_notification_bundle, + settings, + caplog, +): + settings.FEATURE_NOTIFICATION_BUNDLE_ENABLED = True + organization, user = make_organization_and_user() + notification_bundle = make_user_notification_bundle( + user, UserNotificationPolicy.NotificationChannel.SMS, notification_task_id="test_task_id", eta=timezone.now() + ) + send_bundled_notification(notification_bundle.id) + assert ( + f"send_bundled_notification: notification_task_id mismatch. " + f"Duplication or non-active notification triggered. " + f"Active: {notification_bundle.notification_task_id}" + ) in caplog.text + + +@pytest.mark.django_db +def test_notify_user_task_notification_bundle_eta_is_outdated( + make_organization_and_user, + make_user_for_organization, + make_user_notification_policy, + make_user_notification_bundle, + make_alert_receive_channel, + make_alert_group, + settings, +): + settings.FEATURE_NOTIFICATION_BUNDLE_ENABLED = True + organization, user = make_organization_and_user() + notification_policy = make_user_notification_policy( + user=user, + step=UserNotificationPolicy.Step.NOTIFY, + notify_by=UserNotificationPolicy.NotificationChannel.SMS, # channel is in NOTIFICATION_CHANNELS_TO_BUNDLE + ) + alert_receive_channel = make_alert_receive_channel(organization=organization) + alert_group_1 = make_alert_group(alert_receive_channel=alert_receive_channel) + alert_group_2 = make_alert_group(alert_receive_channel=alert_receive_channel) + now = timezone.now() + outdated_eta = now - timezone.timedelta(minutes=5) + test_task_id = "test_task_id" + notification_bundle = make_user_notification_bundle( + user, + UserNotificationPolicy.NotificationChannel.SMS, + eta=outdated_eta, + notification_task_id=test_task_id, + last_notified_at=now, + ) + notification_bundle.append_notification(alert_group_1, notification_policy) + assert not notification_bundle.eta_is_valid() + assert notification_bundle.notifications.count() == 1 + + # call notify_user_task and check that new notification task for notification_bundle was scheduled + notify_user_task(user.id, alert_group_2.id) + notification_bundle.refresh_from_db() + assert notification_bundle.eta_is_valid() + assert notification_bundle.notification_task_id != test_task_id + assert notification_bundle.last_notified_at == now + assert notification_bundle.notifications.count() == 2 diff --git a/engine/apps/metrics_exporter/helpers.py b/engine/apps/metrics_exporter/helpers.py index 7baf85a6..db1164bc 100644 --- a/engine/apps/metrics_exporter/helpers.py +++ b/engine/apps/metrics_exporter/helpers.py @@ -327,8 +327,11 @@ def metrics_update_alert_groups_response_time_cache(integrations_response_time: cache.set(metric_alert_groups_response_time_key, metric_alert_groups_response_time, timeout=metrics_cache_timeout) -def metrics_update_user_cache(user): - """Update "user_was_notified_of_alert_groups" metric cache.""" +def metrics_update_user_cache(user, counter=1): + """ + Increase "user_was_notified_of_alert_groups" metric cache by counter. + Counter shows how many alert groups user was notified of. + """ metrics_cache_timeout = get_metrics_cache_timeout(user.organization_id) metric_user_was_notified_key = get_metric_user_was_notified_of_alert_groups_key(user.organization_id) metric_user_was_notified: typing.Dict[int, UserWasNotifiedOfAlertGroupsMetricsDict] = cache.get( @@ -344,6 +347,6 @@ def metrics_update_user_cache(user): "id": user.organization.stack_id, "counter": 0, }, - )["counter"] += 1 + )["counter"] += counter cache.set(metric_user_was_notified_key, metric_user_was_notified, timeout=metrics_cache_timeout) diff --git a/engine/apps/metrics_exporter/tasks.py b/engine/apps/metrics_exporter/tasks.py index af1f6393..1663ddec 100644 --- a/engine/apps/metrics_exporter/tasks.py +++ b/engine/apps/metrics_exporter/tasks.py @@ -285,8 +285,8 @@ def update_metrics_for_alert_group(alert_group_id, organization_id, previous_sta @shared_dedicated_queue_retry_task( autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else 10 ) -def update_metrics_for_user(user_id): +def update_metrics_for_user(user_id, counter=1): from apps.user_management.models import User user = User.objects.get(id=user_id) - metrics_update_user_cache(user) + metrics_update_user_cache(user, counter) diff --git a/engine/apps/metrics_exporter/tests/test_update_metrics_cache.py b/engine/apps/metrics_exporter/tests/test_update_metrics_cache.py index 7292f953..84a5a342 100644 --- a/engine/apps/metrics_exporter/tests/test_update_metrics_cache.py +++ b/engine/apps/metrics_exporter/tests/test_update_metrics_cache.py @@ -6,6 +6,7 @@ from django.test import override_settings from apps.alerts.signals import alert_group_created_signal from apps.alerts.tasks import notify_user_task +from apps.alerts.tasks.notify_user import update_metric_if_needed from apps.base.models import UserNotificationPolicy, UserNotificationPolicyLogRecord from apps.metrics_exporter.constants import NO_SERVICE_VALUE, SERVICE_LABEL from apps.metrics_exporter.helpers import ( @@ -607,6 +608,82 @@ def test_update_metrics_cache_on_user_notification( arg_idx = get_called_arg_index_and_compare_results() +@override_settings(CELERY_TASK_ALWAYS_EAGER=True) +@pytest.mark.django_db +def test_update_metric_if_needed( + mock_apply_async, + make_organization, + make_user_for_organization, + make_alert_receive_channel, + make_alert_group, + make_user_notification_policy_log_record, + make_user_was_notified_metrics_cache_params, + monkeypatch, + mock_get_metrics_cache, +): + organization = make_organization( + org_id=METRICS_TEST_ORG_ID, + stack_slug=METRICS_TEST_INSTANCE_SLUG, + stack_id=METRICS_TEST_INSTANCE_ID, + ) + alert_receive_channel = make_alert_receive_channel( + organization, + verbal_name=METRICS_TEST_INTEGRATION_NAME, + ) + user = make_user_for_organization(organization, username=METRICS_TEST_USER_USERNAME) + + alert_group_1 = make_alert_group(alert_receive_channel) + alert_group_2 = make_alert_group(alert_receive_channel) + alert_group_3 = make_alert_group(alert_receive_channel) + + # create 1 log record for alert_group_1, 2 log records for alert_group_2 and 3 log records for alert_group_3 + for idx, alert_group in enumerate([alert_group_1, alert_group_2, alert_group_3], start=1): + for _ in range(idx): + make_user_notification_policy_log_record( + type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED, + author=user, + alert_group=alert_group, + ) + + metrics_cache = make_user_was_notified_metrics_cache_params(user.id, organization.id) + monkeypatch.setattr(cache, "get", metrics_cache) + + metric_user_was_notified_key = get_metric_user_was_notified_of_alert_groups_key(organization.id) + + expected_result_metric_user_was_notified = { + user.id: { + "org_id": organization.org_id, + "slug": organization.stack_slug, + "id": organization.stack_id, + "user_username": METRICS_TEST_USER_USERNAME, + "counter": 1, + } + } + + with patch("apps.metrics_exporter.tasks.cache.set") as mock_cache_set: + # check current metric value + assert cache.get(metric_user_was_notified_key, {}) == expected_result_metric_user_was_notified + + expected_result_metric_user_was_notified[user.id]["counter"] += 1 + update_metric_if_needed(user, [alert_group_1.id, alert_group_2.id, alert_group_3.id]) + + # check user_was_notified_of_alert_groups metric counter was increased by 1 + # (for alert_group_1 that has only one log record with type "triggered") + mock_cache_set.assert_called_once() + mock_cache_set_called_args = mock_cache_set.call_args_list + assert mock_cache_set_called_args[0].args[0] == metric_user_was_notified_key + assert mock_cache_set_called_args[0].args[1] == expected_result_metric_user_was_notified + # create new log record for alert_group_1 + make_user_notification_policy_log_record( + type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED, + author=user, + alert_group=alert_group_1, + ) + # check metric was not updated + update_metric_if_needed(user, [alert_group_1.id, alert_group_2.id, alert_group_3.id]) + mock_cache_set.assert_called_once() + + @pytest.mark.django_db def test_metrics_add_integrations_to_cache(make_organization, make_alert_receive_channel): organization = make_organization( diff --git a/engine/apps/phone_notifications/phone_backend.py b/engine/apps/phone_notifications/phone_backend.py index c3db1094..71ccf626 100644 --- a/engine/apps/phone_notifications/phone_backend.py +++ b/engine/apps/phone_notifications/phone_backend.py @@ -193,6 +193,10 @@ class PhoneBackend: log_record.save() user_notification_action_triggered_signal.send(sender=PhoneBackend.notify_by_sms, log_record=log_record) + @staticmethod + def notify_by_sms_bundle_async(user, bundle_uuid): + pass # todo: will be added in a separate PR + def _notify_by_provider_sms(self, user, message) -> Optional[ProviderSMS]: """ _notify_by_provider_sms sends a notification sms using configured phone provider. diff --git a/engine/conftest.py b/engine/conftest.py index 899679fd..c4de1730 100644 --- a/engine/conftest.py +++ b/engine/conftest.py @@ -37,6 +37,7 @@ from apps.alerts.tests.factories import ( InvitationFactory, ResolutionNoteFactory, ResolutionNoteSlackMessageFactory, + UserNotificationBundleFactory, ) from apps.api.permissions import ( ACTION_PREFIX, @@ -145,6 +146,7 @@ register(LabelKeyFactory) register(LabelValueFactory) register(AlertReceiveChannelAssociatedLabelFactory) register(GoogleOAuth2UserFactory) +register(UserNotificationBundleFactory) IS_RBAC_ENABLED = os.getenv("ONCALL_TESTING_RBAC_ENABLED", "True") == "True" @@ -1077,3 +1079,13 @@ def make_google_oauth2_user_for_user(): return GoogleOAuth2UserFactory(user=user) return _make_google_oauth2_user_for_user + + +@pytest.fixture +def make_user_notification_bundle(): + def _make_user_notification_bundle(user, notification_channel, important=False, **kwargs): + return UserNotificationBundleFactory( + user=user, notification_channel=notification_channel, important=important, **kwargs + ) + + return _make_user_notification_bundle diff --git a/engine/settings/base.py b/engine/settings/base.py index 6638d16b..3e40442b 100644 --- a/engine/settings/base.py +++ b/engine/settings/base.py @@ -72,6 +72,7 @@ FEATURE_LABELS_ENABLED_FOR_ALL = getenv_boolean("FEATURE_LABELS_ENABLED_FOR_ALL" # Enable labels feature for organizations from the list. Use OnCall organization ID, for this flag FEATURE_LABELS_ENABLED_PER_ORG = getenv_list("FEATURE_LABELS_ENABLED_PER_ORG", default=list()) FEATURE_ALERT_GROUP_SEARCH_ENABLED = getenv_boolean("FEATURE_ALERT_GROUP_SEARCH_ENABLED", default=False) +FEATURE_NOTIFICATION_BUNDLE_ENABLED = getenv_boolean("FEATURE_NOTIFICATION_BUNDLE_ENABLED", default=False) TWILIO_API_KEY_SID = os.environ.get("TWILIO_API_KEY_SID") TWILIO_API_KEY_SECRET = os.environ.get("TWILIO_API_KEY_SECRET") diff --git a/engine/settings/celery_task_routes.py b/engine/settings/celery_task_routes.py index 9d8e0537..fa747165 100644 --- a/engine/settings/celery_task_routes.py +++ b/engine/settings/celery_task_routes.py @@ -105,6 +105,7 @@ CELERY_TASK_ROUTES = { "apps.alerts.tasks.notify_ical_schedule_shift.notify_ical_schedule_shift": {"queue": "critical"}, "apps.alerts.tasks.notify_user.notify_user_task": {"queue": "critical"}, "apps.alerts.tasks.notify_user.perform_notification": {"queue": "critical"}, + "apps.alerts.tasks.notify_user.send_bundled_notification": {"queue": "critical"}, "apps.alerts.tasks.notify_user.send_user_notification_signal": {"queue": "critical"}, "apps.alerts.tasks.resolve_alert_group_by_source_if_needed.resolve_alert_group_by_source_if_needed": { "queue": "critical"