oncall-engine/engine/apps/alerts/tasks/notify_user.py

731 lines
36 KiB
Python

import typing
from functools import partial
from uuid import uuid4
from celery.exceptions import Retry
from django.conf import settings
from django.db import transaction
from django.db.models import Count
from django.utils import timezone
from kombu.utils.uuid import uuid as celery_uuid
from telegram.error import RetryAfter
from apps.alerts.constants import NEXT_ESCALATION_DELAY
from apps.alerts.tasks.send_update_log_report_signal import send_update_log_report_signal
from apps.base.messaging import get_messaging_backend_from_id
from apps.metrics_exporter.tasks import update_metrics_for_user
from apps.phone_notifications.phone_backend import PhoneBackend
from common.custom_celery_tasks import shared_dedicated_queue_retry_task
from .task_logger import task_logger
if typing.TYPE_CHECKING:
from apps.alerts.models import AlertGroup, UserNotificationBundle
from apps.base.models import UserNotificationPolicy
from apps.user_management.models import User
RETRY_TIMEOUT_HOURS = 1
def schedule_send_bundled_notification_task(
user_notification_bundle: "UserNotificationBundle", alert_group: "AlertGroup"
):
"""Schedule a task to send bundled notifications"""
send_bundled_notification.apply_async(
(user_notification_bundle.id,),
eta=user_notification_bundle.eta,
task_id=user_notification_bundle.notification_task_id,
)
task_logger.info(
f"Scheduled send_bundled_notification task {user_notification_bundle.notification_task_id}, "
f"user_notification_bundle: {user_notification_bundle.id}, alert_group {alert_group.id}, "
f"eta: {user_notification_bundle.eta}"
)
def schedule_perform_notification_task(
log_record_pk: int, alert_group_pk: int, use_default_notification_policy_fallback: bool
):
task = perform_notification.apply_async((log_record_pk, use_default_notification_policy_fallback))
task_logger.info(
f"Created perform_notification task {task} log_record={log_record_pk} " f"alert_group={alert_group_pk}"
)
def build_notification_reason_for_log_record(
notification_policies: typing.List["UserNotificationPolicy"], reason: typing.Optional[str]
) -> str:
from apps.base.models import UserNotificationPolicy
# Here we collect a brief overview of notification steps configured for user to send it to thread.
collected_steps_ids = []
for next_notification_policy in notification_policies:
if next_notification_policy.step == UserNotificationPolicy.Step.NOTIFY:
if next_notification_policy.notify_by not in collected_steps_ids:
collected_steps_ids.append(next_notification_policy.notify_by)
collected_steps = ", ".join(
UserNotificationPolicy.NotificationChannel(step_id).label for step_id in collected_steps_ids
)
reason = ("Reason: " + reason + "\n") if reason is not None else ""
reason += ("Further notification plan: " + collected_steps) if len(collected_steps_ids) > 0 else ""
return reason
def update_metric_if_needed(user: "User", active_alert_group_ids: typing.List[int]):
from apps.base.models import UserNotificationPolicyLogRecord
# get count of alert groups with only one personal log record with type "triggered"
alert_groups_with_one_log = (
user.personal_log_records.filter(
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED,
alert_group_id__in=active_alert_group_ids,
)
.values("alert_group")
.annotate(count=Count("alert_group"))
.filter(count=1)
.count()
)
if alert_groups_with_one_log > 0:
update_metrics_for_user.apply_async((user.id, alert_groups_with_one_log))
@shared_dedicated_queue_retry_task(
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def notify_user_task(
user_pk,
alert_group_pk,
previous_notification_policy_pk=None,
reason=None,
prevent_posting_to_thread=False,
notify_even_acknowledged=False,
important=False,
notify_anyway=False,
):
from apps.alerts.models import AlertGroup, AlertGroupLogRecord, UserHasNotification, UserNotificationBundle
from apps.base.models import UserNotificationPolicy, UserNotificationPolicyLogRecord
from apps.user_management.models import User
try:
alert_group = AlertGroup.objects.get(pk=alert_group_pk)
except AlertGroup.DoesNotExist:
return f"notify_user_task: alert_group {alert_group_pk} doesn't exist"
countdown = 0
stop_escalation = False
log_record = None
is_notification_bundled = False
user_notification_bundle = None
with transaction.atomic():
try:
user = User.objects.get(pk=user_pk)
except User.DoesNotExist:
return f"notify_user_task: user {user_pk} doesn't exist"
if not user.is_notification_allowed:
task_logger.info(f"notify_user_task: user {user.pk} notification is not allowed")
UserNotificationPolicyLogRecord(
author=user,
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED,
reason="notification is not allowed for user",
alert_group=alert_group,
notification_error_code=UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_FORBIDDEN,
).save()
return
user_has_notification, _ = UserHasNotification.objects.get_or_create(
user=user,
alert_group=alert_group,
)
user_has_notification = UserHasNotification.objects.filter(pk=user_has_notification.pk).select_for_update()[0]
using_fallback_default_notification_policy_step = False
if previous_notification_policy_pk is None:
(
using_fallback_default_notification_policy_step,
notification_policies,
) = user.get_notification_policies_or_use_default_fallback(important=important)
if not notification_policies:
task_logger.info(
f"notify_user_task: Failed to notify. No notification policies. user_id={user_pk} alert_group_id={alert_group_pk} important={important}"
)
return
reason = build_notification_reason_for_log_record(notification_policies, reason)
notification_policy = notification_policies[0]
else:
if notify_user_task.request.id != user_has_notification.active_notification_policy_id:
task_logger.info(
f"notify_user_task: active_notification_policy_id mismatch. "
f"Duplication or non-active escalation triggered. "
f"Active: {user_has_notification.active_notification_policy_id}"
)
return
try:
notification_policy = UserNotificationPolicy.objects.get(pk=previous_notification_policy_pk)
if notification_policy.user != user:
notification_policy = UserNotificationPolicy.objects.get(
order=notification_policy.order, user=user, important=important
)
notification_policy = notification_policy.next()
except UserNotificationPolicy.DoesNotExist:
task_logger.info(
f"notify_user_task: Notification policy {previous_notification_policy_pk} has been deleted"
)
return
reason = None
def _create_user_notification_policy_log_record(**kwargs):
return UserNotificationPolicyLogRecord(
**kwargs,
using_fallback_default_notification_policy_step=using_fallback_default_notification_policy_step,
)
def _create_notification_finished_user_notification_policy_log_record():
return _create_user_notification_policy_log_record(
author=user,
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FINISHED,
notification_policy=notification_policy,
alert_group=alert_group,
slack_prevent_posting=prevent_posting_to_thread,
)
if notification_policy is None:
stop_escalation = True
log_record = _create_notification_finished_user_notification_policy_log_record()
task_logger.info(f"Personal escalation exceeded. User: {user.pk}, alert_group: {alert_group.pk}")
else:
if (
# don't force notify direct paged user who has already acknowledged the alert group
# after the last time they were paged.
notify_even_acknowledged
and (
direct_paging_log_record := alert_group.log_records.filter(
type=AlertGroupLogRecord.TYPE_DIRECT_PAGING, step_specific_info__user=user.public_primary_key
).last()
)
and alert_group.log_records.filter(
type=AlertGroupLogRecord.TYPE_ACK, author=user, created_at__gte=direct_paging_log_record.created_at
).exists()
):
notify_even_acknowledged = False
task_logger.info(f"notify_even_acknowledged=False for user {user.pk}, alert_group {alert_group.pk})")
if (
(alert_group.acknowledged and not notify_even_acknowledged)
or (alert_group.silenced and not notify_anyway)
or alert_group.resolved
or alert_group.wiped_at
or alert_group.root_alert_group
):
task_logger.info(
f"notify_user_task: skip notification user {user.pk}, alert_group {alert_group.pk} is "
f"{alert_group.state} and/or attached or wiped"
)
return "Acknowledged, resolved, silenced, attached or wiped."
if notification_policy.step == UserNotificationPolicy.Step.WAIT:
countdown = (
notification_policy.wait_delay.total_seconds() if notification_policy.wait_delay is not None else 0
)
log_record = _create_user_notification_policy_log_record(
author=user,
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED,
notification_policy=notification_policy,
alert_group=alert_group,
slack_prevent_posting=prevent_posting_to_thread,
notification_step=notification_policy.step,
)
task_logger.info(f"notify_user_task: Waiting {countdown} to notify user {user.pk}")
elif notification_policy.step == UserNotificationPolicy.Step.NOTIFY:
user_to_be_notified_in_slack = (
notification_policy.notify_by == UserNotificationPolicy.NotificationChannel.SLACK
)
if user_to_be_notified_in_slack and alert_group.notify_in_slack_enabled is False:
log_record = _create_user_notification_policy_log_record(
author=user,
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED,
notification_policy=notification_policy,
alert_group=alert_group,
reason="Alert group Slack notifications are disabled",
slack_prevent_posting=prevent_posting_to_thread,
notification_step=notification_policy.step,
notification_channel=notification_policy.notify_by,
notification_error_code=UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_POSTING_TO_SLACK_IS_DISABLED,
)
else:
if (
settings.FEATURE_NOTIFICATION_BUNDLE_ENABLED
and UserNotificationBundle.notification_is_bundleable(notification_policy.notify_by)
):
user_notification_bundle, _ = UserNotificationBundle.objects.select_for_update().get_or_create(
user=user, important=important, notification_channel=notification_policy.notify_by
)
# check if notification needs to be bundled
if user_notification_bundle.notified_recently():
user_notification_bundle.append_notification(alert_group, notification_policy)
# schedule send_bundled_notification task if it hasn't been scheduled or the task eta is
# outdated
eta_is_valid = user_notification_bundle.eta_is_valid()
if not eta_is_valid:
task_logger.warning(
f"ETA is not valid - {user_notification_bundle.eta}, "
f"user_notification_bundle {user_notification_bundle.id}, "
f"task_id {user_notification_bundle.notification_task_id}. "
f"Rescheduling the send_bundled_notification task"
)
if not user_notification_bundle.notification_task_id or not eta_is_valid:
user_notification_bundle.notification_task_id = celery_uuid()
user_notification_bundle.eta = user_notification_bundle.get_notification_eta()
user_notification_bundle.save(update_fields=["notification_task_id", "eta"])
transaction.on_commit(
partial(
schedule_send_bundled_notification_task, user_notification_bundle, alert_group
)
)
is_notification_bundled = True
if not is_notification_bundled:
log_record = _create_user_notification_policy_log_record(
author=user,
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED,
notification_policy=notification_policy,
alert_group=alert_group,
reason=reason,
slack_prevent_posting=prevent_posting_to_thread,
notification_step=notification_policy.step,
notification_channel=notification_policy.notify_by,
)
if log_record: # log_record is None if user notification policy step is unspecified
# if this is the first notification step, and user hasn't been notified for this alert group - update metric
if (
log_record.type == UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED
and previous_notification_policy_pk is None
and not user.personal_log_records.filter(
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED,
alert_group_id=alert_group_pk,
).exists()
):
update_metrics_for_user.apply_async((user.id,))
log_record.save()
if using_fallback_default_notification_policy_step:
# if we are using default notification policy, we're done escalating.. there's no further notification
# policy steps in this case. Kick off the perform_notification task, create the
# TYPE_PERSONAL_NOTIFICATION_FINISHED log record, and reset the active_notification_policy_id
transaction.on_commit(
partial(
schedule_perform_notification_task,
log_record.pk,
alert_group_pk,
using_fallback_default_notification_policy_step,
)
)
_create_notification_finished_user_notification_policy_log_record()
user_has_notification.update_active_task_id(None)
elif not stop_escalation:
# if the step is NOTIFY and notification was not not bundled, perform regular notification
# and update time when user was notified
if notification_policy.step != UserNotificationPolicy.Step.WAIT and not is_notification_bundled:
transaction.on_commit(
partial(
schedule_perform_notification_task,
log_record.pk,
alert_group_pk,
using_fallback_default_notification_policy_step,
)
)
if user_notification_bundle:
user_notification_bundle.last_notified_at = timezone.now()
user_notification_bundle.save(update_fields=["last_notified_at"])
task_id = celery_uuid()
user_has_notification.update_active_task_id(task_id)
transaction.on_commit(
partial(
notify_user_task.apply_async,
(user.pk, alert_group.pk, notification_policy.pk, reason),
{
"notify_even_acknowledged": notify_even_acknowledged,
"notify_anyway": notify_anyway,
"prevent_posting_to_thread": prevent_posting_to_thread,
},
countdown=countdown + NEXT_ESCALATION_DELAY,
task_id=task_id,
)
)
else:
user_has_notification.update_active_task_id(None)
@shared_dedicated_queue_retry_task(
autoretry_for=(Exception,),
retry_backoff=True,
dont_autoretry_for=(Retry,),
max_retries=1 if settings.DEBUG else None,
)
def perform_notification(log_record_pk, use_default_notification_policy_fallback):
from apps.base.models import UserNotificationPolicy, UserNotificationPolicyLogRecord
from apps.telegram.models import TelegramToUserConnector
task_logger.info(f"perform_notification: log_record {log_record_pk}")
try:
log_record = UserNotificationPolicyLogRecord.objects.get(pk=log_record_pk)
except UserNotificationPolicyLogRecord.DoesNotExist:
task_logger.warning(
f"perform_notification: log_record {log_record_pk} doesn't exist. Skipping remainder of task. "
"The alert group associated with this log record may have been deleted."
)
return
task_logger.info(f"perform_notification: found record for {log_record_pk}")
user = log_record.author
alert_group = log_record.alert_group
notification_policy = (
UserNotificationPolicy.get_default_fallback_policy(user)
if use_default_notification_policy_fallback
else log_record.notification_policy
)
notification_channel = notification_policy.notify_by if notification_policy else None
if user is None or notification_policy is None:
UserNotificationPolicyLogRecord(
author=user,
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED,
notification_policy=notification_policy if not use_default_notification_policy_fallback else None,
reason="Expected data is missing",
alert_group=alert_group,
notification_step=notification_policy.step if notification_policy else None,
notification_channel=notification_channel,
notification_error_code=None,
).save()
return
if not user.is_notification_allowed:
UserNotificationPolicyLogRecord(
author=user,
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED,
reason="notification is not allowed for user",
alert_group=alert_group,
notification_error_code=UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_FORBIDDEN,
).save()
return
if alert_group.resolved:
# skip notification if alert group was resolved
UserNotificationPolicyLogRecord(
author=user,
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED,
notification_policy=notification_policy if not use_default_notification_policy_fallback else None,
reason="Skipped notification because alert group is resolved",
alert_group=alert_group,
notification_step=notification_policy.step if notification_policy else None,
notification_channel=notification_channel,
notification_error_code=None,
).save()
return
if notification_channel == UserNotificationPolicy.NotificationChannel.SMS:
phone_backend = PhoneBackend()
phone_backend.notify_by_sms(user, alert_group, notification_policy)
elif notification_channel == UserNotificationPolicy.NotificationChannel.PHONE_CALL:
phone_backend = PhoneBackend()
phone_backend.notify_by_call(user, alert_group, notification_policy)
elif notification_channel == UserNotificationPolicy.NotificationChannel.TELEGRAM:
try:
TelegramToUserConnector.notify_user(user, alert_group, notification_policy)
except RetryAfter as e:
task_logger.exception(f"Telegram API rate limit exceeded. Retry after {e.retry_after} seconds.")
# check how much time has passed since log record was created
# to prevent eternal loop of restarting perform_notification task
if timezone.now() < log_record.created_at + timezone.timedelta(hours=RETRY_TIMEOUT_HOURS):
countdown = getattr(e, "retry_after", 3)
perform_notification.apply_async(
(log_record_pk, use_default_notification_policy_fallback), countdown=countdown
)
else:
task_logger.debug(
f"telegram notification for alert_group {alert_group.pk} failed because of rate limit"
)
UserNotificationPolicyLogRecord(
author=user,
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED,
notification_policy=notification_policy,
reason="Telegram rate limit exceeded",
alert_group=alert_group,
notification_step=notification_policy.step,
notification_channel=notification_channel,
notification_error_code=UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_IN_TELEGRAM_RATELIMIT,
).save()
return
elif notification_channel == UserNotificationPolicy.NotificationChannel.SLACK:
# TODO: refactor checking the possibility of sending a notification in slack
# Code below is not consistent.
# We check various slack reasons to skip escalation in this task, in send_slack_notification,
# before and after posting of slack message.
if alert_group.skip_escalation_in_slack:
notification_error_code = UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_IN_SLACK
if alert_group.reason_to_skip_escalation == alert_group.RATE_LIMITED:
notification_error_code = UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_IN_SLACK_RATELIMIT
elif alert_group.reason_to_skip_escalation == alert_group.CHANNEL_ARCHIVED:
notification_error_code = (
UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_IN_SLACK_CHANNEL_IS_ARCHIVED
)
elif alert_group.reason_to_skip_escalation == alert_group.ACCOUNT_INACTIVE:
notification_error_code = UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_IN_SLACK_TOKEN_ERROR
task_logger.debug(
f"send_slack_notification for alert_group {alert_group.pk} failed because escalation in slack is "
f"skipped, reason: '{alert_group.get_reason_to_skip_escalation_display()}'"
)
UserNotificationPolicyLogRecord(
author=user,
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED,
notification_policy=notification_policy,
reason=f"Skipped escalation in Slack, reason: '{alert_group.get_reason_to_skip_escalation_display()}'",
alert_group=alert_group,
notification_step=notification_policy.step,
notification_channel=notification_channel,
notification_error_code=notification_error_code,
).save()
return
if alert_group.notify_in_slack_enabled is True:
# we cannot notify users in Slack if their team does not have Slack integration
if alert_group.channel.organization.slack_team_identity is None:
task_logger.debug(
f"send_slack_notification for alert_group {alert_group.pk} failed because slack team identity "
f"does not exist."
)
UserNotificationPolicyLogRecord(
author=user,
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED,
notification_policy=notification_policy,
reason="Slack team identity does not exist",
alert_group=alert_group,
notification_step=notification_policy.step,
notification_channel=notification_channel,
notification_error_code=UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_IN_SLACK_TOKEN_ERROR,
).save()
return
if log_record.slack_prevent_posting:
task_logger.debug(
f"send_slack_notification for alert_group {alert_group.pk} failed because slack posting is disabled."
)
UserNotificationPolicyLogRecord(
author=user,
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_SUCCESS,
notification_policy=notification_policy,
reason="Prevented from posting in Slack",
alert_group=alert_group,
notification_step=notification_policy.step,
notification_channel=notification_channel,
notification_error_code=UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_POSTING_TO_SLACK_IS_DISABLED,
).save()
return
if alert_group.slack_message:
alert_group.slack_message.send_slack_notification(user, alert_group, notification_policy)
task_logger.debug(f"Finished send_slack_notification for alert_group {alert_group.pk}.")
# check how much time has passed since log record was created
# to prevent eternal loop of restarting perform_notification task
elif timezone.now() < log_record.created_at + timezone.timedelta(hours=RETRY_TIMEOUT_HOURS):
task_logger.debug(
f"send_slack_notification for alert_group {alert_group.pk} failed because slack message "
f"does not exist. Restarting perform_notification."
)
restart_delay_seconds = 60
perform_notification.apply_async(
(log_record_pk, use_default_notification_policy_fallback), countdown=restart_delay_seconds
)
else:
task_logger.debug(
f"send_slack_notification for alert_group {alert_group.pk} failed because slack message "
f"after {RETRY_TIMEOUT_HOURS} hours still does not exist"
)
UserNotificationPolicyLogRecord(
author=user,
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED,
notification_policy=notification_policy,
reason="Slack message does not exist",
alert_group=alert_group,
notification_step=notification_policy.step,
notification_channel=notification_channel,
notification_error_code=UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_IN_SLACK,
).save()
else:
try:
backend_id = UserNotificationPolicy.NotificationChannel(notification_policy.notify_by).name
backend = get_messaging_backend_from_id(backend_id)
except ValueError:
backend = None
if backend is None:
task_logger.debug("notify_user failed because messaging backend is not available")
UserNotificationPolicyLogRecord(
author=user,
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED,
notification_policy=notification_policy,
reason="Messaging backend not available",
alert_group=alert_group,
notification_step=notification_policy.step,
notification_channel=notification_channel,
notification_error_code=UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_MESSAGING_BACKEND_ERROR,
).save()
return
backend.notify_user(user, alert_group, notification_policy)
@shared_dedicated_queue_retry_task(
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def send_bundled_notification(user_notification_bundle_id: int):
"""
The task filters bundled notifications, attached to the current user_notification_bundle, by active alert groups,
creates notification log records and updates user_notification_bundle.
If there are no active alert groups - nothing else happens. If there is only one active alert group - regular
notification will be performed (called perform_notification task). Otherwise - "send bundled notification" method of
the current notification channel will be called.
"""
from apps.alerts.models import AlertGroup, UserNotificationBundle
from apps.base.models import UserNotificationPolicy, UserNotificationPolicyLogRecord
task_logger.info(f"Start send_bundled_notification for user_notification_bundle {user_notification_bundle_id}")
with transaction.atomic():
try:
user_notification_bundle = UserNotificationBundle.objects.filter(
pk=user_notification_bundle_id
).select_for_update()[0]
except IndexError:
task_logger.info(
f"user_notification_bundle {user_notification_bundle_id} doesn't exist. "
f"The user associated with this notification bundle may have been deleted."
)
return
if send_bundled_notification.request.id != user_notification_bundle.notification_task_id:
task_logger.info(
f"send_bundled_notification: notification_task_id mismatch. "
f"Duplication or non-active notification triggered. "
f"Active: {user_notification_bundle.notification_task_id}"
)
return
notifications = user_notification_bundle.notifications.filter(bundle_uuid__isnull=True).select_related(
"alert_group"
)
log_records_to_create: typing.List["UserNotificationPolicyLogRecord"] = []
skip_notification_ids: typing.List[int] = []
active_alert_group_ids: typing.Set[int] = set()
log_record_notification_triggered = None
is_notification_allowed = user_notification_bundle.user.is_notification_allowed
bundle_uuid = uuid4()
# create logs
for notification in notifications:
if notification.alert_group.status != AlertGroup.NEW:
task_logger.info(f"alert_group {notification.alert_group_id} is not active, skip notification")
skip_notification_ids.append(notification.id)
continue
elif not is_notification_allowed:
log_record_notification_failed = UserNotificationPolicyLogRecord(
author=user_notification_bundle.user,
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED,
reason="notification is not allowed for user",
alert_group=notification.alert_group,
notification_error_code=UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_FORBIDDEN,
)
log_records_to_create.append(log_record_notification_failed)
active_alert_group_ids.add(notification.alert_group_id)
continue
# collect notifications for active alert groups
active_alert_group_ids.add(notification.alert_group_id)
log_record_notification_triggered = UserNotificationPolicyLogRecord(
author=user_notification_bundle.user,
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED,
alert_group=notification.alert_group,
notification_policy=notification.notification_policy,
notification_step=UserNotificationPolicy.Step.NOTIFY,
notification_channel=user_notification_bundle.notification_channel,
)
log_records_to_create.append(log_record_notification_triggered)
# delete non-active notifications and update bundle_uuid for the rest notifications
if not is_notification_allowed:
notifications.delete()
else:
notifications.filter(id__in=skip_notification_ids).delete()
notifications.update(bundle_uuid=bundle_uuid)
if len(log_records_to_create) == 1 and log_record_notification_triggered:
# perform regular notification
log_record_notification_triggered.save()
task_logger.info(
f"there is only one alert group in bundled notification, perform regular notification. "
f"alert_group {log_record_notification_triggered.alert_group_id}"
)
transaction.on_commit(
partial(
schedule_perform_notification_task,
log_record_notification_triggered.pk,
log_record_notification_triggered.alert_group_id,
False,
)
)
else:
UserNotificationPolicyLogRecord.objects.bulk_create(log_records_to_create, batch_size=5000)
if not active_alert_group_ids or not is_notification_allowed:
task_logger.info(
f"no alert groups to notify about or notification is not allowed for user "
f"{user_notification_bundle.user_id}"
)
else:
task_logger.info(
f"perform bundled notification for alert groups with ids: {active_alert_group_ids}, "
f"bundle_uuid: {bundle_uuid}"
)
if user_notification_bundle.notification_channel == UserNotificationPolicy.NotificationChannel.SMS:
PhoneBackend.notify_by_sms_bundle_async(user_notification_bundle.user, bundle_uuid)
user_notification_bundle.notification_task_id = None
user_notification_bundle.last_notified_at = timezone.now()
user_notification_bundle.eta = None
user_notification_bundle.save(update_fields=["notification_task_id", "last_notified_at", "eta"])
for alert_group_id in active_alert_group_ids:
transaction.on_commit(partial(send_update_log_report_signal.apply_async, (None, alert_group_id)))
# update metric
transaction.on_commit(partial(update_metric_if_needed, user_notification_bundle.user, active_alert_group_ids))
task_logger.info(f"Finished send_bundled_notification for user_notification_bundle {user_notification_bundle_id}")
# deprecated
@shared_dedicated_queue_retry_task(
autoretry_for=(Exception,), retry_backoff=True, max_retries=0 if settings.DEBUG else None
)
def send_user_notification_signal(log_record_pk):
# Triggers user_notification_action_triggered_signal
# This signal is only connected to UserSlackRepresentative and triggers posting message to Slack about some
# FAILED notifications (see NotificationDeliveryStep and ERRORS_TO_SEND_IN_SLACK_CHANNEL).
# No need to call it here.
pass