User notifications bundle (#4457)

# What this PR does
This PR adds two new models: UserNotificationBundle and
BundledNotification (proposals for naming are welcome).

`UserNotificationBundle` manages the information about last notification
time and scheduled notification task for bundled notifications. It is
unique per user + notification_channel + notification importance.

`BundledNotification` contains notification policy and alert group, that
triggered the notification. The BundledNotification instance is created
in `notify_user_task` for every notification, that should be bundled,
and is attached to UserNotificationBundle by ForeignKey connection.

How it works:
If the user was notified recently (within the last two minutes) by the
current notification channel, and this channel is bundlable,
BundledNotification instance will be created and attached to the
UserNotificationBundle instance, and `send_bundled_notification` task
will be scheduled to execute in 2 min.
In `send_bundled_notification` task we get all BundledNotification
attached to the current UserNotificationBundle instance, check if alert
groups are still active and if there is only one notification - perform
regular notification by calling `perform_notification` task, otherwise
call "notify_by_<channel>_bundle" method for the current notification
channel.

PR with method to send notification bundle by SMS -
https://github.com/grafana/oncall/pull/4624

**This feature is disabled by default by feature flag. Public docs will
be added in a separate PR with enabling this feature.**
## Which issue(s) this PR closes
related to https://github.com/grafana/oncall-private/issues/2712

## Checklist

- [x] Unit, integration, and e2e (if applicable) tests updated
- [x] Documentation added (or `pr:no public docs` PR label added if not
required)
- [x] Added the relevant release notes label (see labels prefixed w/
`release:`). These labels dictate how your PR will
    show up in the autogenerated release notes.
This commit is contained in:
Yulya Artyukhina 2024-07-16 13:24:08 +02:00 committed by GitHub
parent 93a7c645fd
commit 191814b25e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 746 additions and 95 deletions

View file

@ -16,6 +16,8 @@ TASK_DELAY_SECONDS = 1
NEXT_ESCALATION_DELAY = 5
BUNDLED_NOTIFICATION_DELAY_SECONDS = 60 * 2 # 2 min
# AlertGroup states verbal
class AlertGroupState(str, Enum):

View file

@ -0,0 +1,45 @@
# Generated by Django 4.2.10 on 2024-06-20 11:00
import apps.base.models.user_notification_policy
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('base', '0005_drop_unused_dynamic_settings'),
('user_management', '0022_alter_team_unique_together'),
('alerts', '0053_channelfilter_filtering_labels'),
]
operations = [
migrations.CreateModel(
name='UserNotificationBundle',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('important', models.BooleanField()),
('notification_channel', models.PositiveSmallIntegerField(default=None, null=True, validators=[apps.base.models.user_notification_policy.validate_channel_choice])),
('last_notified_at', models.DateTimeField(default=None, null=True)),
('notification_task_id', models.CharField(default=None, max_length=100, null=True)),
('eta', models.DateTimeField(default=None, null=True)),
('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='notification_bundles', to='user_management.user')),
],
),
migrations.CreateModel(
name='BundledNotification',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created_at', models.DateTimeField(auto_now_add=True)),
('bundle_uuid', models.CharField(db_index=True, default=None, max_length=100, null=True)),
('alert_group', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='alerts.alertgroup')),
('alert_receive_channel', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='alerts.alertreceivechannel')),
('notification_bundle', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='notifications', to='alerts.usernotificationbundle')),
('notification_policy', models.ForeignKey(null=True, on_delete=django.db.models.deletion.SET_NULL, to='base.usernotificationpolicy')),
],
),
migrations.AddConstraint(
model_name='usernotificationbundle',
constraint=models.UniqueConstraint(fields=('user', 'important', 'notification_channel'), name='unique_user_notification_bundle'),
),
]

View file

@ -15,3 +15,4 @@ from .invitation import Invitation # noqa: F401
from .maintainable_object import MaintainableObject # noqa: F401
from .resolution_note import ResolutionNote, ResolutionNoteSlackMessage # noqa: F401
from .user_has_notification import UserHasNotification # noqa: F401
from .user_notification_bundle import BundledNotification, UserNotificationBundle # noqa: F401

View file

@ -16,3 +16,11 @@ class UserHasNotification(models.Model):
class Meta:
unique_together = ("user", "alert_group")
def update_active_task_id(self, task_id):
"""
`active_notification_policy_id` keeps celery task_id of the next scheduled `notify_user_task`
for the current user
"""
self.active_notification_policy_id = task_id
self.save(update_fields=["active_notification_policy_id"])

View file

@ -0,0 +1,87 @@
import datetime
import typing
from django.db import models
from django.utils import timezone
from apps.alerts.constants import BUNDLED_NOTIFICATION_DELAY_SECONDS
from apps.base.models import UserNotificationPolicy
from apps.base.models.user_notification_policy import validate_channel_choice
if typing.TYPE_CHECKING:
from django.db.models.manager import RelatedManager
from apps.alerts.models import AlertGroup, AlertReceiveChannel
from apps.user_management.models import User
class UserNotificationBundle(models.Model):
user: "User"
notifications: "RelatedManager['BundledNotification']"
NOTIFICATION_CHANNELS_TO_BUNDLE = [
UserNotificationPolicy.NotificationChannel.SMS,
]
user = models.ForeignKey("user_management.User", on_delete=models.CASCADE, related_name="notification_bundles")
important = models.BooleanField()
notification_channel = models.PositiveSmallIntegerField(
validators=[validate_channel_choice], null=True, default=None
)
last_notified_at = models.DateTimeField(default=None, null=True)
notification_task_id = models.CharField(max_length=100, null=True, default=None)
# estimated time of arrival for scheduled send_bundled_notification task
eta = models.DateTimeField(default=None, null=True)
class Meta:
constraints = [
models.UniqueConstraint(
fields=["user", "important", "notification_channel"], name="unique_user_notification_bundle"
)
]
def notified_recently(self) -> bool:
return (
timezone.now() - self.last_notified_at < timezone.timedelta(seconds=BUNDLED_NOTIFICATION_DELAY_SECONDS)
if self.last_notified_at
else False
)
def eta_is_valid(self) -> bool:
"""
`eta` shows eta of scheduled send_bundled_notification task and should never be less than the current time
(with a 1 minute buffer provided).
`eta` is None means that there is no scheduled task.
"""
if not self.eta or self.eta + timezone.timedelta(minutes=1) >= timezone.now():
return True
return False
def get_notification_eta(self) -> datetime.datetime:
last_notified = self.last_notified_at if self.last_notified_at else timezone.now()
return last_notified + timezone.timedelta(seconds=BUNDLED_NOTIFICATION_DELAY_SECONDS)
def append_notification(self, alert_group: "AlertGroup", notification_policy: "UserNotificationPolicy"):
self.notifications.create(
alert_group=alert_group, notification_policy=notification_policy, alert_receive_channel=alert_group.channel
)
@classmethod
def notification_is_bundleable(cls, notification_channel):
return notification_channel in cls.NOTIFICATION_CHANNELS_TO_BUNDLE
class BundledNotification(models.Model):
alert_group: "AlertGroup"
alert_receive_channel: "AlertReceiveChannel"
notification_policy: typing.Optional["UserNotificationPolicy"]
notification_bundle: "UserNotificationBundle"
alert_group = models.ForeignKey("alerts.AlertGroup", on_delete=models.CASCADE)
alert_receive_channel = models.ForeignKey("alerts.AlertReceiveChannel", on_delete=models.CASCADE)
notification_policy = models.ForeignKey("base.UserNotificationPolicy", on_delete=models.SET_NULL, null=True)
notification_bundle = models.ForeignKey(
UserNotificationBundle, on_delete=models.CASCADE, related_name="notifications"
)
created_at = models.DateTimeField(auto_now_add=True)
bundle_uuid = models.CharField(max_length=100, null=True, default=None, db_index=True)

View file

@ -187,8 +187,7 @@ def unpage_user(alert_group: AlertGroup, user: User, from_user: User) -> None:
user_has_notification = UserHasNotification.objects.filter(
user=user, alert_group=alert_group
).select_for_update()[0]
user_has_notification.active_notification_policy_id = None
user_has_notification.save(update_fields=["active_notification_policy_id"])
user_has_notification.update_active_task_id(task_id=None)
except IndexError:
return
finally:

View file

@ -1,23 +1,94 @@
import time
import typing
from functools import partial
from uuid import uuid4
from celery.exceptions import Retry
from django.conf import settings
from django.db import transaction
from django.db.models import Count
from django.utils import timezone
from kombu.utils.uuid import uuid as celery_uuid
from telegram.error import RetryAfter
from apps.alerts.constants import NEXT_ESCALATION_DELAY
from apps.alerts.signals import user_notification_action_triggered_signal
from apps.alerts.tasks.send_update_log_report_signal import send_update_log_report_signal
from apps.base.messaging import get_messaging_backend_from_id
from apps.metrics_exporter.tasks import update_metrics_for_user
from apps.phone_notifications.phone_backend import PhoneBackend
from common.custom_celery_tasks import shared_dedicated_queue_retry_task
from .compare_escalations import compare_escalations
from .task_logger import task_logger
if typing.TYPE_CHECKING:
from apps.alerts.models import AlertGroup, UserNotificationBundle
from apps.base.models import UserNotificationPolicy
from apps.user_management.models import User
def schedule_send_bundled_notification_task(
user_notification_bundle: "UserNotificationBundle", alert_group: "AlertGroup"
):
"""Schedule a task to send bundled notifications"""
send_bundled_notification.apply_async(
(user_notification_bundle.id,),
eta=user_notification_bundle.eta,
task_id=user_notification_bundle.notification_task_id,
)
task_logger.info(
f"Scheduled send_bundled_notification task {user_notification_bundle.notification_task_id}, "
f"user_notification_bundle: {user_notification_bundle.id}, alert_group {alert_group.id}, "
f"eta: {user_notification_bundle.eta}"
)
def schedule_perform_notification_task(
log_record_pk: int, alert_group_pk: int, use_default_notification_policy_fallback: bool
):
task = perform_notification.apply_async((log_record_pk, use_default_notification_policy_fallback))
task_logger.info(
f"Created perform_notification task {task} log_record={log_record_pk} " f"alert_group={alert_group_pk}"
)
def build_notification_reason_for_log_record(
notification_policies: typing.List["UserNotificationPolicy"], reason: typing.Optional[str]
) -> str:
from apps.base.models import UserNotificationPolicy
# Here we collect a brief overview of notification steps configured for user to send it to thread.
collected_steps_ids = []
for next_notification_policy in notification_policies:
if next_notification_policy.step == UserNotificationPolicy.Step.NOTIFY:
if next_notification_policy.notify_by not in collected_steps_ids:
collected_steps_ids.append(next_notification_policy.notify_by)
collected_steps = ", ".join(
UserNotificationPolicy.NotificationChannel(step_id).label for step_id in collected_steps_ids
)
reason = ("Reason: " + reason + "\n") if reason is not None else ""
reason += ("Further notification plan: " + collected_steps) if len(collected_steps_ids) > 0 else ""
return reason
def update_metric_if_needed(user: "User", active_alert_group_ids: typing.List[int]):
from apps.base.models import UserNotificationPolicyLogRecord
# get count of alert groups with only one personal log record with type "triggered"
alert_groups_with_one_log = (
user.personal_log_records.filter(
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED,
alert_group_id__in=active_alert_group_ids,
)
.values("alert_group")
.annotate(count=Count("alert_group"))
.filter(count=1)
.count()
)
if alert_groups_with_one_log > 0:
update_metrics_for_user.apply_async((user.id, alert_groups_with_one_log))
@shared_dedicated_queue_retry_task(
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
@ -32,7 +103,7 @@ def notify_user_task(
important=False,
notify_anyway=False,
):
from apps.alerts.models import AlertGroup, UserHasNotification
from apps.alerts.models import AlertGroup, UserHasNotification, UserNotificationBundle
from apps.base.models import UserNotificationPolicy, UserNotificationPolicyLogRecord
from apps.user_management.models import User
@ -44,6 +115,8 @@ def notify_user_task(
countdown = 0
stop_escalation = False
log_record = None
is_notification_bundled = False
user_notification_bundle = None
with transaction.atomic():
try:
@ -51,8 +124,6 @@ def notify_user_task(
except User.DoesNotExist:
return f"notify_user_task: user {user_pk} doesn't exist"
organization = alert_group.channel.organization
if not user.is_notification_allowed:
task_logger.info(f"notify_user_task: user {user.pk} notification is not allowed")
UserNotificationPolicyLogRecord(
@ -82,21 +153,8 @@ def notify_user_task(
f"notify_user_task: Failed to notify. No notification policies. user_id={user_pk} alert_group_id={alert_group_pk} important={important}"
)
return
# Here we collect a brief overview of notification steps configured for user to send it to thread.
collected_steps_ids = []
for next_notification_policy in notification_policies:
if next_notification_policy.step == UserNotificationPolicy.Step.NOTIFY:
if next_notification_policy.notify_by not in collected_steps_ids:
collected_steps_ids.append(next_notification_policy.notify_by)
reason = build_notification_reason_for_log_record(notification_policies, reason)
notification_policy = notification_policies[0]
collected_steps = ", ".join(
UserNotificationPolicy.NotificationChannel(step_id).label for step_id in collected_steps_ids
)
reason = ("Reason: " + reason + "\n") if reason is not None else ""
reason += ("Further notification plan: " + collected_steps) if len(collected_steps_ids) > 0 else ""
else:
if notify_user_task.request.id != user_has_notification.active_notification_policy_id:
task_logger.info(
@ -108,14 +166,14 @@ def notify_user_task(
try:
notification_policy = UserNotificationPolicy.objects.get(pk=previous_notification_policy_pk)
if notification_policy.user.organization != organization:
if notification_policy.user != user:
notification_policy = UserNotificationPolicy.objects.get(
order=notification_policy.order, user=user, important=important
)
notification_policy = notification_policy.next()
except UserNotificationPolicy.DoesNotExist:
task_logger.info(
f"notify_user_taskLNotification policy {previous_notification_policy_pk} has been deleted"
f"notify_user_task: Notification policy {previous_notification_policy_pk} has been deleted"
)
return
reason = None
@ -138,27 +196,25 @@ def notify_user_task(
if notification_policy is None:
stop_escalation = True
log_record = _create_notification_finished_user_notification_policy_log_record()
task_logger.info(f"Personal escalation exceeded. User: {user.pk}, alert_group: {alert_group.pk}")
else:
if (
(alert_group.acknowledged and not notify_even_acknowledged)
or (alert_group.silenced and not notify_anyway)
or alert_group.resolved
or alert_group.wiped_at
or alert_group.root_alert_group
):
return "Acknowledged, resolved, attached or wiped."
if alert_group.silenced and not notify_anyway:
task_logger.info(
f"notify_user_task: skip notification user {user.pk} because alert_group {alert_group.pk} is silenced"
f"notify_user_task: skip notification user {user.pk}, alert_group {alert_group.pk} is "
f"{alert_group.state} and/or attached or wiped"
)
return
return "Acknowledged, resolved, silenced, attached or wiped."
if notification_policy.step == UserNotificationPolicy.Step.WAIT:
if notification_policy.wait_delay is not None:
delay_in_seconds = notification_policy.wait_delay.total_seconds()
else:
delay_in_seconds = 0
countdown = delay_in_seconds
countdown = (
notification_policy.wait_delay.total_seconds() if notification_policy.wait_delay is not None else 0
)
log_record = _create_user_notification_policy_log_record(
author=user,
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED,
@ -167,7 +223,7 @@ def notify_user_task(
slack_prevent_posting=prevent_posting_to_thread,
notification_step=notification_policy.step,
)
task_logger.info(f"notify_user_task: Waiting {delay_in_seconds} to notify user {user.pk}")
task_logger.info(f"notify_user_task: Waiting {countdown} to notify user {user.pk}")
elif notification_policy.step == UserNotificationPolicy.Step.NOTIFY:
user_to_be_notified_in_slack = (
notification_policy.notify_by == UserNotificationPolicy.NotificationChannel.SLACK
@ -185,17 +241,49 @@ def notify_user_task(
notification_error_code=UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_POSTING_TO_SLACK_IS_DISABLED,
)
else:
log_record = _create_user_notification_policy_log_record(
author=user,
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED,
notification_policy=notification_policy,
alert_group=alert_group,
reason=reason,
slack_prevent_posting=prevent_posting_to_thread,
notification_step=notification_policy.step,
notification_channel=notification_policy.notify_by,
)
if (
settings.FEATURE_NOTIFICATION_BUNDLE_ENABLED
and UserNotificationBundle.notification_is_bundleable(notification_policy.notify_by)
):
user_notification_bundle, _ = UserNotificationBundle.objects.select_for_update().get_or_create(
user=user, important=important, notification_channel=notification_policy.notify_by
)
# check if notification needs to be bundled
if user_notification_bundle.notified_recently():
user_notification_bundle.append_notification(alert_group, notification_policy)
# schedule send_bundled_notification task if it hasn't been scheduled or the task eta is
# outdated
eta_is_valid = user_notification_bundle.eta_is_valid()
if not eta_is_valid:
task_logger.warning(
f"ETA is not valid - {user_notification_bundle.eta}, "
f"user_notification_bundle {user_notification_bundle.id}, "
f"task_id {user_notification_bundle.notification_task_id}. "
f"Rescheduling the send_bundled_notification task"
)
if not user_notification_bundle.notification_task_id or not eta_is_valid:
user_notification_bundle.notification_task_id = celery_uuid()
user_notification_bundle.eta = user_notification_bundle.get_notification_eta()
user_notification_bundle.save(update_fields=["notification_task_id", "eta"])
transaction.on_commit(
partial(
schedule_send_bundled_notification_task, user_notification_bundle, alert_group
)
)
is_notification_bundled = True
if not is_notification_bundled:
log_record = _create_user_notification_policy_log_record(
author=user,
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED,
notification_policy=notification_policy,
alert_group=alert_group,
reason=reason,
slack_prevent_posting=prevent_posting_to_thread,
notification_step=notification_policy.step,
notification_channel=notification_policy.notify_by,
)
if log_record: # log_record is None if user notification policy step is unspecified
# if this is the first notification step, and user hasn't been notified for this alert group - update metric
if (
@ -207,48 +295,41 @@ def notify_user_task(
).exists()
):
update_metrics_for_user.apply_async((user.id,))
log_record.save()
if notify_user_task.request.retries == 0:
transaction.on_commit(partial(send_user_notification_signal.apply_async, (log_record.pk,)))
def _create_perform_notification_task(log_record_pk, alert_group_pk, use_default_notification_policy_fallback):
task = perform_notification.apply_async((log_record_pk, use_default_notification_policy_fallback))
task_logger.info(
f"Created perform_notification task {task} log_record={log_record_pk} " f"alert_group={alert_group_pk}"
)
def _update_user_has_notification_active_notification_policy_id(active_policy_id: typing.Optional[str]) -> None:
user_has_notification.active_notification_policy_id = active_policy_id
user_has_notification.save(update_fields=["active_notification_policy_id"])
def _reset_user_has_notification_active_notification_policy_id() -> None:
_update_user_has_notification_active_notification_policy_id(None)
create_perform_notification_task = partial(
_create_perform_notification_task,
log_record.pk,
alert_group_pk,
using_fallback_default_notification_policy_step,
)
if using_fallback_default_notification_policy_step:
# if we are using default notification policy, we're done escalating.. there's no further notification
# policy steps in this case. Kick off the perform_notification task, create the
# TYPE_PERSONAL_NOTIFICATION_FINISHED log record, and reset the active_notification_policy_id
transaction.on_commit(create_perform_notification_task)
transaction.on_commit(
partial(
schedule_perform_notification_task,
log_record.pk,
alert_group_pk,
using_fallback_default_notification_policy_step,
)
)
_create_notification_finished_user_notification_policy_log_record()
_reset_user_has_notification_active_notification_policy_id()
user_has_notification.update_active_task_id(None)
elif not stop_escalation:
if notification_policy.step != UserNotificationPolicy.Step.WAIT:
transaction.on_commit(create_perform_notification_task)
# if the step is NOTIFY and notification was not not bundled, perform regular notification
# and update time when user was notified
if notification_policy.step != UserNotificationPolicy.Step.WAIT and not is_notification_bundled:
transaction.on_commit(
partial(
schedule_perform_notification_task,
log_record.pk,
alert_group_pk,
using_fallback_default_notification_policy_step,
)
)
if user_notification_bundle:
user_notification_bundle.last_notified_at = timezone.now()
user_notification_bundle.save(update_fields=["last_notified_at"])
delay = NEXT_ESCALATION_DELAY
if countdown is not None:
delay += countdown
task_id = celery_uuid()
_update_user_has_notification_active_notification_policy_id(task_id)
user_has_notification.update_active_task_id(task_id)
transaction.on_commit(
partial(
@ -259,12 +340,12 @@ def notify_user_task(
"notify_anyway": notify_anyway,
"prevent_posting_to_thread": prevent_posting_to_thread,
},
countdown=delay,
countdown=countdown + NEXT_ESCALATION_DELAY,
task_id=task_id,
)
)
else:
_reset_user_has_notification_active_notification_policy_id()
user_has_notification.update_active_task_id(None)
@shared_dedicated_queue_retry_task(
@ -458,18 +539,137 @@ def perform_notification(log_record_pk, use_default_notification_policy_fallback
backend.notify_user(user, alert_group, notification_policy)
@shared_dedicated_queue_retry_task(
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def send_bundled_notification(user_notification_bundle_id: int):
"""
The task filters bundled notifications, attached to the current user_notification_bundle, by active alert groups,
creates notification log records and updates user_notification_bundle.
If there are no active alert groups - nothing else happens. If there is only one active alert group - regular
notification will be performed (called perform_notification task). Otherwise - "send bundled notification" method of
the current notification channel will be called.
"""
from apps.alerts.models import AlertGroup, UserNotificationBundle
from apps.base.models import UserNotificationPolicy, UserNotificationPolicyLogRecord
task_logger.info(f"Start send_bundled_notification for user_notification_bundle {user_notification_bundle_id}")
with transaction.atomic():
try:
user_notification_bundle = UserNotificationBundle.objects.filter(
pk=user_notification_bundle_id
).select_for_update()[0]
except IndexError:
task_logger.info(
f"user_notification_bundle {user_notification_bundle_id} doesn't exist. "
f"The user associated with this notification bundle may have been deleted."
)
return
if not compare_escalations(send_bundled_notification.request.id, user_notification_bundle.notification_task_id):
task_logger.info(
f"send_bundled_notification: notification_task_id mismatch. "
f"Duplication or non-active notification triggered. "
f"Active: {user_notification_bundle.notification_task_id}"
)
return
notifications = user_notification_bundle.notifications.filter(bundle_uuid__isnull=True).select_related(
"alert_group"
)
log_records_to_create: typing.List["UserNotificationPolicyLogRecord"] = []
skip_notification_ids: typing.List[int] = []
active_alert_group_ids: typing.Set[int] = set()
log_record_notification_triggered = None
is_notification_allowed = user_notification_bundle.user.is_notification_allowed
# create logs
for notification in notifications:
if notification.alert_group.status != AlertGroup.NEW:
task_logger.info(f"alert_group {notification.alert_group_id} is not active, skip notification")
skip_notification_ids.append(notification.id)
continue
elif not is_notification_allowed:
log_record_notification_failed = UserNotificationPolicyLogRecord(
author=user_notification_bundle.user,
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED,
reason="notification is not allowed for user",
alert_group=notification.alert_group,
notification_error_code=UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_FORBIDDEN,
)
log_records_to_create.append(log_record_notification_failed)
active_alert_group_ids.add(notification.alert_group_id)
continue
# collect notifications for active alert groups
active_alert_group_ids.add(notification.alert_group_id)
log_record_notification_triggered = UserNotificationPolicyLogRecord(
author=user_notification_bundle.user,
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED,
alert_group=notification.alert_group,
notification_step=UserNotificationPolicy.Step.NOTIFY,
notification_channel=user_notification_bundle.notification_channel,
)
log_records_to_create.append(log_record_notification_triggered)
if len(log_records_to_create) == 1 and log_record_notification_triggered:
# perform regular notification
log_record_notification_triggered.save()
task_logger.info(
f"there is only one alert group in bundled notification, perform regular notification. "
f"alert_group {log_record_notification_triggered.alert_group_id}"
)
transaction.on_commit(
partial(
schedule_perform_notification_task,
log_record_notification_triggered.pk,
log_record_notification_triggered.alert_group_id,
)
)
notifications.delete()
else:
UserNotificationPolicyLogRecord.objects.bulk_create(log_records_to_create, batch_size=5000)
if not active_alert_group_ids or not is_notification_allowed:
task_logger.info(
f"no alert groups to notify about or notification is not allowed for user "
f"{user_notification_bundle.user_id}"
)
notifications.delete()
else:
notifications.filter(id__in=skip_notification_ids).delete()
bundle_uuid = uuid4()
notifications.update(bundle_uuid=bundle_uuid)
task_logger.info(
f"perform bundled notification for alert groups with ids: {active_alert_group_ids}, "
f"bundle_uuid: {bundle_uuid}"
)
if user_notification_bundle.notification_channel == UserNotificationPolicy.NotificationChannel.SMS:
PhoneBackend.notify_by_sms_bundle_async(user_notification_bundle.user, bundle_uuid)
user_notification_bundle.notification_task_id = None
user_notification_bundle.last_notified_at = timezone.now()
user_notification_bundle.eta = None
user_notification_bundle.save(update_fields=["notification_task_id", "last_notified_at", "eta"])
for alert_group_id in active_alert_group_ids:
transaction.on_commit(partial(send_update_log_report_signal.apply_async, (None, alert_group_id)))
# update metric
transaction.on_commit(partial(update_metric_if_needed, user_notification_bundle.user, active_alert_group_ids))
task_logger.info(f"Finished send_bundled_notification for user_notification_bundle {user_notification_bundle_id}")
# deprecated
@shared_dedicated_queue_retry_task(
autoretry_for=(Exception,), retry_backoff=True, max_retries=0 if settings.DEBUG else None
)
def send_user_notification_signal(log_record_pk):
start_time = time.time()
from apps.base.models import UserNotificationPolicyLogRecord
task_logger.debug(f"LOG RECORD PK: {log_record_pk}")
task_logger.debug(f"LOG RECORD LAST: {UserNotificationPolicyLogRecord.objects.last()}")
log_record = UserNotificationPolicyLogRecord.objects.get(pk=log_record_pk)
user_notification_action_triggered_signal.send(sender=send_user_notification_signal, log_record=log_record)
task_logger.debug("--- USER SIGNAL TOOK %s seconds ---" % (time.time() - start_time))
# Triggers user_notification_action_triggered_signal
# This signal is only connected to UserSlackRepresentative and triggers posting message to Slack about some
# FAILED notifications (see NotificationDeliveryStep and ERRORS_TO_SEND_IN_SLACK_CHANNEL).
# No need to call it here.
pass

View file

@ -13,6 +13,7 @@ from apps.alerts.models import (
Invitation,
ResolutionNote,
ResolutionNoteSlackMessage,
UserNotificationBundle,
)
from common.utils import UniqueFaker
@ -85,3 +86,8 @@ class CustomActionFactory(factory.DjangoModelFactory):
class InvitationFactory(factory.DjangoModelFactory):
class Meta:
model = Invitation
class UserNotificationBundleFactory(factory.DjangoModelFactory):
class Meta:
model = UserNotificationBundle

View file

@ -1,10 +1,11 @@
from unittest.mock import patch
import pytest
from django.utils import timezone
from telegram.error import RetryAfter
from apps.alerts.models import AlertGroup
from apps.alerts.tasks.notify_user import notify_user_task, perform_notification
from apps.alerts.tasks.notify_user import notify_user_task, perform_notification, send_bundled_notification
from apps.api.permissions import LegacyAccessControlRole
from apps.base.models.user_notification_policy import UserNotificationPolicy
from apps.base.models.user_notification_policy_log_record import UserNotificationPolicyLogRecord
@ -369,3 +370,207 @@ def test_perform_notification_use_default_notification_policy_fallback(
perform_notification(log_record.pk, True)
mock_notify_user.assert_called_once_with(user, alert_group, fallback_notification_policy)
@pytest.mark.django_db
def test_notify_user_task_notification_bundle_is_enabled(
make_organization_and_user,
make_user_for_organization,
make_user_notification_policy,
make_alert_receive_channel,
make_alert_group,
settings,
):
settings.FEATURE_NOTIFICATION_BUNDLE_ENABLED = True
organization, user_1 = make_organization_and_user()
user_2 = make_user_for_organization(organization)
make_user_notification_policy(
user=user_1,
step=UserNotificationPolicy.Step.NOTIFY,
notify_by=UserNotificationPolicy.NotificationChannel.SMS, # channel is in NOTIFICATION_CHANNELS_TO_BUNDLE
)
make_user_notification_policy(
user=user_1,
step=UserNotificationPolicy.Step.NOTIFY,
notify_by=UserNotificationPolicy.NotificationChannel.SMS,
important=True,
)
make_user_notification_policy(
user=user_2,
step=UserNotificationPolicy.Step.NOTIFY,
notify_by=UserNotificationPolicy.NotificationChannel.SLACK, # channel is not in NOTIFICATION_CHANNELS_TO_BUNDLE
)
alert_receive_channel = make_alert_receive_channel(organization=organization)
alert_group_1 = make_alert_group(alert_receive_channel=alert_receive_channel)
alert_group_2 = make_alert_group(alert_receive_channel=alert_receive_channel)
assert not user_1.notification_bundles.exists()
# send 1st notification to user_1, check notification_bundle was created
# without scheduling send_bundled_notification task
notify_user_task(user_1.id, alert_group_1.id)
assert user_1.notification_bundles.count() == 1
notification_bundle = user_1.notification_bundles.first()
assert notification_bundle.notification_task_id is None
assert not notification_bundle.notifications.exists()
# send 2nd notification to user_1, check bundled notification was attached to notification_bundle
# and send_bundled_notification was scheduled
notify_user_task(user_1.id, alert_group_2.id)
notification_bundle.refresh_from_db()
assert notification_bundle.notifications.count() == 1
assert notification_bundle.notification_task_id is not None
# send important notification to user_1, check new notification_bundle was created
notify_user_task(user_1.id, alert_group_1.id, important=True)
assert user_1.notification_bundles.count() == 2
important_notification_bundle = user_1.notification_bundles.get(important=True)
assert important_notification_bundle.notification_task_id is None
assert not important_notification_bundle.notifications.exists()
# send notification to user_2 (notification channel is not in NOTIFICATION_CHANNELS_TO_BUNDLE),
# check notification_bundle was not created
notify_user_task(user_2.id, alert_group_1.id)
assert not user_2.notification_bundles.exists()
@pytest.mark.django_db
def test_notify_user_task_notification_bundle_is_not_enabled(
make_organization_and_user,
make_user_notification_policy,
make_alert_receive_channel,
make_alert_group,
settings,
):
settings.FEATURE_NOTIFICATION_BUNDLE_ENABLED = False
organization, user = make_organization_and_user()
make_user_notification_policy(
user=user,
step=UserNotificationPolicy.Step.NOTIFY,
notify_by=UserNotificationPolicy.NotificationChannel.SMS, # channel is in NOTIFICATION_CHANNELS_TO_BUNDLE
)
alert_receive_channel = make_alert_receive_channel(organization=organization)
alert_group = make_alert_group(alert_receive_channel=alert_receive_channel)
# send notification, check notification_bundle was not created
notify_user_task(user.id, alert_group.id)
assert not user.notification_bundles.exists()
@pytest.mark.django_db
def test_send_bundle_notification(
make_organization_and_user,
make_user_notification_policy,
make_user_notification_bundle,
make_alert_receive_channel,
make_alert_group,
settings,
caplog,
):
settings.FEATURE_NOTIFICATION_BUNDLE_ENABLED = True
organization, user = make_organization_and_user()
notification_policy = make_user_notification_policy(
user=user,
step=UserNotificationPolicy.Step.NOTIFY,
notify_by=UserNotificationPolicy.NotificationChannel.SMS, # channel is in NOTIFICATION_CHANNELS_TO_BUNDLE
)
alert_receive_channel = make_alert_receive_channel(organization=organization)
alert_group_1 = make_alert_group(alert_receive_channel=alert_receive_channel)
alert_group_2 = make_alert_group(alert_receive_channel=alert_receive_channel)
alert_group_3 = make_alert_group(alert_receive_channel=alert_receive_channel)
notification_bundle = make_user_notification_bundle(
user, UserNotificationPolicy.NotificationChannel.SMS, notification_task_id="test_task_id", eta=timezone.now()
)
notification_bundle.append_notification(alert_group_1, notification_policy)
notification_bundle.append_notification(alert_group_2, notification_policy)
notification_bundle.append_notification(alert_group_3, notification_policy)
assert notification_bundle.notifications.filter(bundle_uuid__isnull=True).count() == 3
alert_group_3.resolve()
with patch("apps.alerts.tasks.notify_user.compare_escalations", return_value=True):
# send notification for 2 active alert groups
send_bundled_notification(notification_bundle.id)
assert f"alert_group {alert_group_3.id} is not active, skip notification" in caplog.text
assert "perform bundled notification for alert groups with ids:" in caplog.text
# check bundle_uuid was set, notification for resolved alert group was deleted
assert notification_bundle.notifications.filter(bundle_uuid__isnull=True).count() == 0
assert notification_bundle.notifications.all().count() == 2
assert not notification_bundle.notifications.filter(alert_group=alert_group_3).exists()
# send notification for 1 active alert group
notification_bundle.notifications.update(bundle_uuid=None)
alert_group_2.resolve()
send_bundled_notification(notification_bundle.id)
assert f"alert_group {alert_group_2.id} is not active, skip notification" in caplog.text
assert (
f"there is only one alert group in bundled notification, perform regular notification. "
f"alert_group {alert_group_1.id}"
) in caplog.text
# check all notifications were deleted
assert notification_bundle.notifications.all().count() == 0
# send notification for 0 active alert group
notification_bundle.append_notification(alert_group_1, notification_policy)
alert_group_1.resolve()
send_bundled_notification(notification_bundle.id)
assert f"alert_group {alert_group_1.id} is not active, skip notification" in caplog.text
assert f"no alert groups to notify about or notification is not allowed for user {user.id}" in caplog.text
# check all notifications were deleted
assert notification_bundle.notifications.all().count() == 0
@pytest.mark.django_db
def test_send_bundle_notification_task_id_mismatch(
make_organization_and_user,
make_user_notification_bundle,
settings,
caplog,
):
settings.FEATURE_NOTIFICATION_BUNDLE_ENABLED = True
organization, user = make_organization_and_user()
notification_bundle = make_user_notification_bundle(
user, UserNotificationPolicy.NotificationChannel.SMS, notification_task_id="test_task_id", eta=timezone.now()
)
send_bundled_notification(notification_bundle.id)
assert (
f"send_bundled_notification: notification_task_id mismatch. "
f"Duplication or non-active notification triggered. "
f"Active: {notification_bundle.notification_task_id}"
) in caplog.text
@pytest.mark.django_db
def test_notify_user_task_notification_bundle_eta_is_outdated(
make_organization_and_user,
make_user_for_organization,
make_user_notification_policy,
make_user_notification_bundle,
make_alert_receive_channel,
make_alert_group,
settings,
):
settings.FEATURE_NOTIFICATION_BUNDLE_ENABLED = True
organization, user = make_organization_and_user()
notification_policy = make_user_notification_policy(
user=user,
step=UserNotificationPolicy.Step.NOTIFY,
notify_by=UserNotificationPolicy.NotificationChannel.SMS, # channel is in NOTIFICATION_CHANNELS_TO_BUNDLE
)
alert_receive_channel = make_alert_receive_channel(organization=organization)
alert_group_1 = make_alert_group(alert_receive_channel=alert_receive_channel)
alert_group_2 = make_alert_group(alert_receive_channel=alert_receive_channel)
now = timezone.now()
outdated_eta = now - timezone.timedelta(minutes=5)
test_task_id = "test_task_id"
notification_bundle = make_user_notification_bundle(
user,
UserNotificationPolicy.NotificationChannel.SMS,
eta=outdated_eta,
notification_task_id=test_task_id,
last_notified_at=now,
)
notification_bundle.append_notification(alert_group_1, notification_policy)
assert not notification_bundle.eta_is_valid()
assert notification_bundle.notifications.count() == 1
# call notify_user_task and check that new notification task for notification_bundle was scheduled
notify_user_task(user.id, alert_group_2.id)
notification_bundle.refresh_from_db()
assert notification_bundle.eta_is_valid()
assert notification_bundle.notification_task_id != test_task_id
assert notification_bundle.last_notified_at == now
assert notification_bundle.notifications.count() == 2

View file

@ -327,8 +327,11 @@ def metrics_update_alert_groups_response_time_cache(integrations_response_time:
cache.set(metric_alert_groups_response_time_key, metric_alert_groups_response_time, timeout=metrics_cache_timeout)
def metrics_update_user_cache(user):
"""Update "user_was_notified_of_alert_groups" metric cache."""
def metrics_update_user_cache(user, counter=1):
"""
Increase "user_was_notified_of_alert_groups" metric cache by counter.
Counter shows how many alert groups user was notified of.
"""
metrics_cache_timeout = get_metrics_cache_timeout(user.organization_id)
metric_user_was_notified_key = get_metric_user_was_notified_of_alert_groups_key(user.organization_id)
metric_user_was_notified: typing.Dict[int, UserWasNotifiedOfAlertGroupsMetricsDict] = cache.get(
@ -344,6 +347,6 @@ def metrics_update_user_cache(user):
"id": user.organization.stack_id,
"counter": 0,
},
)["counter"] += 1
)["counter"] += counter
cache.set(metric_user_was_notified_key, metric_user_was_notified, timeout=metrics_cache_timeout)

View file

@ -285,8 +285,8 @@ def update_metrics_for_alert_group(alert_group_id, organization_id, previous_sta
@shared_dedicated_queue_retry_task(
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else 10
)
def update_metrics_for_user(user_id):
def update_metrics_for_user(user_id, counter=1):
from apps.user_management.models import User
user = User.objects.get(id=user_id)
metrics_update_user_cache(user)
metrics_update_user_cache(user, counter)

View file

@ -6,6 +6,7 @@ from django.test import override_settings
from apps.alerts.signals import alert_group_created_signal
from apps.alerts.tasks import notify_user_task
from apps.alerts.tasks.notify_user import update_metric_if_needed
from apps.base.models import UserNotificationPolicy, UserNotificationPolicyLogRecord
from apps.metrics_exporter.constants import NO_SERVICE_VALUE, SERVICE_LABEL
from apps.metrics_exporter.helpers import (
@ -607,6 +608,82 @@ def test_update_metrics_cache_on_user_notification(
arg_idx = get_called_arg_index_and_compare_results()
@override_settings(CELERY_TASK_ALWAYS_EAGER=True)
@pytest.mark.django_db
def test_update_metric_if_needed(
mock_apply_async,
make_organization,
make_user_for_organization,
make_alert_receive_channel,
make_alert_group,
make_user_notification_policy_log_record,
make_user_was_notified_metrics_cache_params,
monkeypatch,
mock_get_metrics_cache,
):
organization = make_organization(
org_id=METRICS_TEST_ORG_ID,
stack_slug=METRICS_TEST_INSTANCE_SLUG,
stack_id=METRICS_TEST_INSTANCE_ID,
)
alert_receive_channel = make_alert_receive_channel(
organization,
verbal_name=METRICS_TEST_INTEGRATION_NAME,
)
user = make_user_for_organization(organization, username=METRICS_TEST_USER_USERNAME)
alert_group_1 = make_alert_group(alert_receive_channel)
alert_group_2 = make_alert_group(alert_receive_channel)
alert_group_3 = make_alert_group(alert_receive_channel)
# create 1 log record for alert_group_1, 2 log records for alert_group_2 and 3 log records for alert_group_3
for idx, alert_group in enumerate([alert_group_1, alert_group_2, alert_group_3], start=1):
for _ in range(idx):
make_user_notification_policy_log_record(
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED,
author=user,
alert_group=alert_group,
)
metrics_cache = make_user_was_notified_metrics_cache_params(user.id, organization.id)
monkeypatch.setattr(cache, "get", metrics_cache)
metric_user_was_notified_key = get_metric_user_was_notified_of_alert_groups_key(organization.id)
expected_result_metric_user_was_notified = {
user.id: {
"org_id": organization.org_id,
"slug": organization.stack_slug,
"id": organization.stack_id,
"user_username": METRICS_TEST_USER_USERNAME,
"counter": 1,
}
}
with patch("apps.metrics_exporter.tasks.cache.set") as mock_cache_set:
# check current metric value
assert cache.get(metric_user_was_notified_key, {}) == expected_result_metric_user_was_notified
expected_result_metric_user_was_notified[user.id]["counter"] += 1
update_metric_if_needed(user, [alert_group_1.id, alert_group_2.id, alert_group_3.id])
# check user_was_notified_of_alert_groups metric counter was increased by 1
# (for alert_group_1 that has only one log record with type "triggered")
mock_cache_set.assert_called_once()
mock_cache_set_called_args = mock_cache_set.call_args_list
assert mock_cache_set_called_args[0].args[0] == metric_user_was_notified_key
assert mock_cache_set_called_args[0].args[1] == expected_result_metric_user_was_notified
# create new log record for alert_group_1
make_user_notification_policy_log_record(
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED,
author=user,
alert_group=alert_group_1,
)
# check metric was not updated
update_metric_if_needed(user, [alert_group_1.id, alert_group_2.id, alert_group_3.id])
mock_cache_set.assert_called_once()
@pytest.mark.django_db
def test_metrics_add_integrations_to_cache(make_organization, make_alert_receive_channel):
organization = make_organization(

View file

@ -193,6 +193,10 @@ class PhoneBackend:
log_record.save()
user_notification_action_triggered_signal.send(sender=PhoneBackend.notify_by_sms, log_record=log_record)
@staticmethod
def notify_by_sms_bundle_async(user, bundle_uuid):
pass # todo: will be added in a separate PR
def _notify_by_provider_sms(self, user, message) -> Optional[ProviderSMS]:
"""
_notify_by_provider_sms sends a notification sms using configured phone provider.

View file

@ -37,6 +37,7 @@ from apps.alerts.tests.factories import (
InvitationFactory,
ResolutionNoteFactory,
ResolutionNoteSlackMessageFactory,
UserNotificationBundleFactory,
)
from apps.api.permissions import (
ACTION_PREFIX,
@ -145,6 +146,7 @@ register(LabelKeyFactory)
register(LabelValueFactory)
register(AlertReceiveChannelAssociatedLabelFactory)
register(GoogleOAuth2UserFactory)
register(UserNotificationBundleFactory)
IS_RBAC_ENABLED = os.getenv("ONCALL_TESTING_RBAC_ENABLED", "True") == "True"
@ -1077,3 +1079,13 @@ def make_google_oauth2_user_for_user():
return GoogleOAuth2UserFactory(user=user)
return _make_google_oauth2_user_for_user
@pytest.fixture
def make_user_notification_bundle():
def _make_user_notification_bundle(user, notification_channel, important=False, **kwargs):
return UserNotificationBundleFactory(
user=user, notification_channel=notification_channel, important=important, **kwargs
)
return _make_user_notification_bundle

View file

@ -72,6 +72,7 @@ FEATURE_LABELS_ENABLED_FOR_ALL = getenv_boolean("FEATURE_LABELS_ENABLED_FOR_ALL"
# Enable labels feature for organizations from the list. Use OnCall organization ID, for this flag
FEATURE_LABELS_ENABLED_PER_ORG = getenv_list("FEATURE_LABELS_ENABLED_PER_ORG", default=list())
FEATURE_ALERT_GROUP_SEARCH_ENABLED = getenv_boolean("FEATURE_ALERT_GROUP_SEARCH_ENABLED", default=False)
FEATURE_NOTIFICATION_BUNDLE_ENABLED = getenv_boolean("FEATURE_NOTIFICATION_BUNDLE_ENABLED", default=False)
TWILIO_API_KEY_SID = os.environ.get("TWILIO_API_KEY_SID")
TWILIO_API_KEY_SECRET = os.environ.get("TWILIO_API_KEY_SECRET")

View file

@ -105,6 +105,7 @@ CELERY_TASK_ROUTES = {
"apps.alerts.tasks.notify_ical_schedule_shift.notify_ical_schedule_shift": {"queue": "critical"},
"apps.alerts.tasks.notify_user.notify_user_task": {"queue": "critical"},
"apps.alerts.tasks.notify_user.perform_notification": {"queue": "critical"},
"apps.alerts.tasks.notify_user.send_bundled_notification": {"queue": "critical"},
"apps.alerts.tasks.notify_user.send_user_notification_signal": {"queue": "critical"},
"apps.alerts.tasks.resolve_alert_group_by_source_if_needed.resolve_alert_group_by_source_if_needed": {
"queue": "critical"