731 lines
36 KiB
Python
731 lines
36 KiB
Python
import typing
|
|
from functools import partial
|
|
from uuid import uuid4
|
|
|
|
from celery.exceptions import Retry
|
|
from django.conf import settings
|
|
from django.db import transaction
|
|
from django.db.models import Count
|
|
from django.utils import timezone
|
|
from kombu.utils.uuid import uuid as celery_uuid
|
|
from telegram.error import RetryAfter
|
|
|
|
from apps.alerts.constants import NEXT_ESCALATION_DELAY
|
|
from apps.alerts.tasks.send_update_log_report_signal import send_update_log_report_signal
|
|
from apps.base.messaging import get_messaging_backend_from_id
|
|
from apps.metrics_exporter.tasks import update_metrics_for_user
|
|
from apps.phone_notifications.phone_backend import PhoneBackend
|
|
from common.custom_celery_tasks import shared_dedicated_queue_retry_task
|
|
|
|
from .task_logger import task_logger
|
|
|
|
if typing.TYPE_CHECKING:
|
|
from apps.alerts.models import AlertGroup, UserNotificationBundle
|
|
from apps.base.models import UserNotificationPolicy
|
|
from apps.user_management.models import User
|
|
|
|
|
|
RETRY_TIMEOUT_HOURS = 1
|
|
|
|
|
|
def schedule_send_bundled_notification_task(
|
|
user_notification_bundle: "UserNotificationBundle", alert_group: "AlertGroup"
|
|
):
|
|
"""Schedule a task to send bundled notifications"""
|
|
send_bundled_notification.apply_async(
|
|
(user_notification_bundle.id,),
|
|
eta=user_notification_bundle.eta,
|
|
task_id=user_notification_bundle.notification_task_id,
|
|
)
|
|
task_logger.info(
|
|
f"Scheduled send_bundled_notification task {user_notification_bundle.notification_task_id}, "
|
|
f"user_notification_bundle: {user_notification_bundle.id}, alert_group {alert_group.id}, "
|
|
f"eta: {user_notification_bundle.eta}"
|
|
)
|
|
|
|
|
|
def schedule_perform_notification_task(
|
|
log_record_pk: int, alert_group_pk: int, use_default_notification_policy_fallback: bool
|
|
):
|
|
task = perform_notification.apply_async((log_record_pk, use_default_notification_policy_fallback))
|
|
task_logger.info(
|
|
f"Created perform_notification task {task} log_record={log_record_pk} " f"alert_group={alert_group_pk}"
|
|
)
|
|
|
|
|
|
def build_notification_reason_for_log_record(
|
|
notification_policies: typing.List["UserNotificationPolicy"], reason: typing.Optional[str]
|
|
) -> str:
|
|
from apps.base.models import UserNotificationPolicy
|
|
|
|
# Here we collect a brief overview of notification steps configured for user to send it to thread.
|
|
collected_steps_ids = []
|
|
for next_notification_policy in notification_policies:
|
|
if next_notification_policy.step == UserNotificationPolicy.Step.NOTIFY:
|
|
if next_notification_policy.notify_by not in collected_steps_ids:
|
|
collected_steps_ids.append(next_notification_policy.notify_by)
|
|
|
|
collected_steps = ", ".join(
|
|
UserNotificationPolicy.NotificationChannel(step_id).label for step_id in collected_steps_ids
|
|
)
|
|
reason = ("Reason: " + reason + "\n") if reason is not None else ""
|
|
reason += ("Further notification plan: " + collected_steps) if len(collected_steps_ids) > 0 else ""
|
|
return reason
|
|
|
|
|
|
def update_metric_if_needed(user: "User", active_alert_group_ids: typing.List[int]):
|
|
from apps.base.models import UserNotificationPolicyLogRecord
|
|
|
|
# get count of alert groups with only one personal log record with type "triggered"
|
|
alert_groups_with_one_log = (
|
|
user.personal_log_records.filter(
|
|
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED,
|
|
alert_group_id__in=active_alert_group_ids,
|
|
)
|
|
.values("alert_group")
|
|
.annotate(count=Count("alert_group"))
|
|
.filter(count=1)
|
|
.count()
|
|
)
|
|
|
|
if alert_groups_with_one_log > 0:
|
|
update_metrics_for_user.apply_async((user.id, alert_groups_with_one_log))
|
|
|
|
|
|
@shared_dedicated_queue_retry_task(
|
|
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
|
|
)
|
|
def notify_user_task(
|
|
user_pk,
|
|
alert_group_pk,
|
|
previous_notification_policy_pk=None,
|
|
reason=None,
|
|
prevent_posting_to_thread=False,
|
|
notify_even_acknowledged=False,
|
|
important=False,
|
|
notify_anyway=False,
|
|
):
|
|
from apps.alerts.models import AlertGroup, AlertGroupLogRecord, UserHasNotification, UserNotificationBundle
|
|
from apps.base.models import UserNotificationPolicy, UserNotificationPolicyLogRecord
|
|
from apps.user_management.models import User
|
|
|
|
try:
|
|
alert_group = AlertGroup.objects.get(pk=alert_group_pk)
|
|
except AlertGroup.DoesNotExist:
|
|
return f"notify_user_task: alert_group {alert_group_pk} doesn't exist"
|
|
|
|
countdown = 0
|
|
stop_escalation = False
|
|
log_record = None
|
|
is_notification_bundled = False
|
|
user_notification_bundle = None
|
|
|
|
with transaction.atomic():
|
|
try:
|
|
user = User.objects.get(pk=user_pk)
|
|
except User.DoesNotExist:
|
|
return f"notify_user_task: user {user_pk} doesn't exist"
|
|
|
|
if not user.is_notification_allowed:
|
|
task_logger.info(f"notify_user_task: user {user.pk} notification is not allowed")
|
|
UserNotificationPolicyLogRecord(
|
|
author=user,
|
|
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED,
|
|
reason="notification is not allowed for user",
|
|
alert_group=alert_group,
|
|
notification_error_code=UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_FORBIDDEN,
|
|
).save()
|
|
return
|
|
|
|
user_has_notification, _ = UserHasNotification.objects.get_or_create(
|
|
user=user,
|
|
alert_group=alert_group,
|
|
)
|
|
|
|
user_has_notification = UserHasNotification.objects.filter(pk=user_has_notification.pk).select_for_update()[0]
|
|
using_fallback_default_notification_policy_step = False
|
|
|
|
if previous_notification_policy_pk is None:
|
|
(
|
|
using_fallback_default_notification_policy_step,
|
|
notification_policies,
|
|
) = user.get_notification_policies_or_use_default_fallback(important=important)
|
|
if not notification_policies:
|
|
task_logger.info(
|
|
f"notify_user_task: Failed to notify. No notification policies. user_id={user_pk} alert_group_id={alert_group_pk} important={important}"
|
|
)
|
|
return
|
|
reason = build_notification_reason_for_log_record(notification_policies, reason)
|
|
notification_policy = notification_policies[0]
|
|
else:
|
|
if notify_user_task.request.id != user_has_notification.active_notification_policy_id:
|
|
task_logger.info(
|
|
f"notify_user_task: active_notification_policy_id mismatch. "
|
|
f"Duplication or non-active escalation triggered. "
|
|
f"Active: {user_has_notification.active_notification_policy_id}"
|
|
)
|
|
return
|
|
|
|
try:
|
|
notification_policy = UserNotificationPolicy.objects.get(pk=previous_notification_policy_pk)
|
|
if notification_policy.user != user:
|
|
notification_policy = UserNotificationPolicy.objects.get(
|
|
order=notification_policy.order, user=user, important=important
|
|
)
|
|
notification_policy = notification_policy.next()
|
|
except UserNotificationPolicy.DoesNotExist:
|
|
task_logger.info(
|
|
f"notify_user_task: Notification policy {previous_notification_policy_pk} has been deleted"
|
|
)
|
|
return
|
|
reason = None
|
|
|
|
def _create_user_notification_policy_log_record(**kwargs):
|
|
return UserNotificationPolicyLogRecord(
|
|
**kwargs,
|
|
using_fallback_default_notification_policy_step=using_fallback_default_notification_policy_step,
|
|
)
|
|
|
|
def _create_notification_finished_user_notification_policy_log_record():
|
|
return _create_user_notification_policy_log_record(
|
|
author=user,
|
|
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FINISHED,
|
|
notification_policy=notification_policy,
|
|
alert_group=alert_group,
|
|
slack_prevent_posting=prevent_posting_to_thread,
|
|
)
|
|
|
|
if notification_policy is None:
|
|
stop_escalation = True
|
|
log_record = _create_notification_finished_user_notification_policy_log_record()
|
|
task_logger.info(f"Personal escalation exceeded. User: {user.pk}, alert_group: {alert_group.pk}")
|
|
else:
|
|
if (
|
|
# don't force notify direct paged user who has already acknowledged the alert group
|
|
# after the last time they were paged.
|
|
notify_even_acknowledged
|
|
and (
|
|
direct_paging_log_record := alert_group.log_records.filter(
|
|
type=AlertGroupLogRecord.TYPE_DIRECT_PAGING, step_specific_info__user=user.public_primary_key
|
|
).last()
|
|
)
|
|
and alert_group.log_records.filter(
|
|
type=AlertGroupLogRecord.TYPE_ACK, author=user, created_at__gte=direct_paging_log_record.created_at
|
|
).exists()
|
|
):
|
|
notify_even_acknowledged = False
|
|
task_logger.info(f"notify_even_acknowledged=False for user {user.pk}, alert_group {alert_group.pk})")
|
|
|
|
if (
|
|
(alert_group.acknowledged and not notify_even_acknowledged)
|
|
or (alert_group.silenced and not notify_anyway)
|
|
or alert_group.resolved
|
|
or alert_group.wiped_at
|
|
or alert_group.root_alert_group
|
|
):
|
|
task_logger.info(
|
|
f"notify_user_task: skip notification user {user.pk}, alert_group {alert_group.pk} is "
|
|
f"{alert_group.state} and/or attached or wiped"
|
|
)
|
|
return "Acknowledged, resolved, silenced, attached or wiped."
|
|
|
|
if notification_policy.step == UserNotificationPolicy.Step.WAIT:
|
|
countdown = (
|
|
notification_policy.wait_delay.total_seconds() if notification_policy.wait_delay is not None else 0
|
|
)
|
|
log_record = _create_user_notification_policy_log_record(
|
|
author=user,
|
|
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED,
|
|
notification_policy=notification_policy,
|
|
alert_group=alert_group,
|
|
slack_prevent_posting=prevent_posting_to_thread,
|
|
notification_step=notification_policy.step,
|
|
)
|
|
task_logger.info(f"notify_user_task: Waiting {countdown} to notify user {user.pk}")
|
|
elif notification_policy.step == UserNotificationPolicy.Step.NOTIFY:
|
|
user_to_be_notified_in_slack = (
|
|
notification_policy.notify_by == UserNotificationPolicy.NotificationChannel.SLACK
|
|
)
|
|
if user_to_be_notified_in_slack and alert_group.notify_in_slack_enabled is False:
|
|
log_record = _create_user_notification_policy_log_record(
|
|
author=user,
|
|
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED,
|
|
notification_policy=notification_policy,
|
|
alert_group=alert_group,
|
|
reason="Alert group Slack notifications are disabled",
|
|
slack_prevent_posting=prevent_posting_to_thread,
|
|
notification_step=notification_policy.step,
|
|
notification_channel=notification_policy.notify_by,
|
|
notification_error_code=UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_POSTING_TO_SLACK_IS_DISABLED,
|
|
)
|
|
else:
|
|
if (
|
|
settings.FEATURE_NOTIFICATION_BUNDLE_ENABLED
|
|
and UserNotificationBundle.notification_is_bundleable(notification_policy.notify_by)
|
|
):
|
|
user_notification_bundle, _ = UserNotificationBundle.objects.select_for_update().get_or_create(
|
|
user=user, important=important, notification_channel=notification_policy.notify_by
|
|
)
|
|
# check if notification needs to be bundled
|
|
if user_notification_bundle.notified_recently():
|
|
user_notification_bundle.append_notification(alert_group, notification_policy)
|
|
# schedule send_bundled_notification task if it hasn't been scheduled or the task eta is
|
|
# outdated
|
|
eta_is_valid = user_notification_bundle.eta_is_valid()
|
|
if not eta_is_valid:
|
|
task_logger.warning(
|
|
f"ETA is not valid - {user_notification_bundle.eta}, "
|
|
f"user_notification_bundle {user_notification_bundle.id}, "
|
|
f"task_id {user_notification_bundle.notification_task_id}. "
|
|
f"Rescheduling the send_bundled_notification task"
|
|
)
|
|
if not user_notification_bundle.notification_task_id or not eta_is_valid:
|
|
user_notification_bundle.notification_task_id = celery_uuid()
|
|
user_notification_bundle.eta = user_notification_bundle.get_notification_eta()
|
|
user_notification_bundle.save(update_fields=["notification_task_id", "eta"])
|
|
|
|
transaction.on_commit(
|
|
partial(
|
|
schedule_send_bundled_notification_task, user_notification_bundle, alert_group
|
|
)
|
|
)
|
|
is_notification_bundled = True
|
|
|
|
if not is_notification_bundled:
|
|
log_record = _create_user_notification_policy_log_record(
|
|
author=user,
|
|
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED,
|
|
notification_policy=notification_policy,
|
|
alert_group=alert_group,
|
|
reason=reason,
|
|
slack_prevent_posting=prevent_posting_to_thread,
|
|
notification_step=notification_policy.step,
|
|
notification_channel=notification_policy.notify_by,
|
|
)
|
|
if log_record: # log_record is None if user notification policy step is unspecified
|
|
# if this is the first notification step, and user hasn't been notified for this alert group - update metric
|
|
if (
|
|
log_record.type == UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED
|
|
and previous_notification_policy_pk is None
|
|
and not user.personal_log_records.filter(
|
|
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED,
|
|
alert_group_id=alert_group_pk,
|
|
).exists()
|
|
):
|
|
update_metrics_for_user.apply_async((user.id,))
|
|
log_record.save()
|
|
|
|
if using_fallback_default_notification_policy_step:
|
|
# if we are using default notification policy, we're done escalating.. there's no further notification
|
|
# policy steps in this case. Kick off the perform_notification task, create the
|
|
# TYPE_PERSONAL_NOTIFICATION_FINISHED log record, and reset the active_notification_policy_id
|
|
transaction.on_commit(
|
|
partial(
|
|
schedule_perform_notification_task,
|
|
log_record.pk,
|
|
alert_group_pk,
|
|
using_fallback_default_notification_policy_step,
|
|
)
|
|
)
|
|
_create_notification_finished_user_notification_policy_log_record()
|
|
user_has_notification.update_active_task_id(None)
|
|
elif not stop_escalation:
|
|
# if the step is NOTIFY and notification was not not bundled, perform regular notification
|
|
# and update time when user was notified
|
|
if notification_policy.step != UserNotificationPolicy.Step.WAIT and not is_notification_bundled:
|
|
transaction.on_commit(
|
|
partial(
|
|
schedule_perform_notification_task,
|
|
log_record.pk,
|
|
alert_group_pk,
|
|
using_fallback_default_notification_policy_step,
|
|
)
|
|
)
|
|
|
|
if user_notification_bundle:
|
|
user_notification_bundle.last_notified_at = timezone.now()
|
|
user_notification_bundle.save(update_fields=["last_notified_at"])
|
|
|
|
task_id = celery_uuid()
|
|
user_has_notification.update_active_task_id(task_id)
|
|
|
|
transaction.on_commit(
|
|
partial(
|
|
notify_user_task.apply_async,
|
|
(user.pk, alert_group.pk, notification_policy.pk, reason),
|
|
{
|
|
"notify_even_acknowledged": notify_even_acknowledged,
|
|
"notify_anyway": notify_anyway,
|
|
"prevent_posting_to_thread": prevent_posting_to_thread,
|
|
},
|
|
countdown=countdown + NEXT_ESCALATION_DELAY,
|
|
task_id=task_id,
|
|
)
|
|
)
|
|
else:
|
|
user_has_notification.update_active_task_id(None)
|
|
|
|
|
|
@shared_dedicated_queue_retry_task(
|
|
autoretry_for=(Exception,),
|
|
retry_backoff=True,
|
|
dont_autoretry_for=(Retry,),
|
|
max_retries=1 if settings.DEBUG else None,
|
|
)
|
|
def perform_notification(log_record_pk, use_default_notification_policy_fallback):
|
|
from apps.base.models import UserNotificationPolicy, UserNotificationPolicyLogRecord
|
|
from apps.telegram.models import TelegramToUserConnector
|
|
|
|
task_logger.info(f"perform_notification: log_record {log_record_pk}")
|
|
|
|
try:
|
|
log_record = UserNotificationPolicyLogRecord.objects.get(pk=log_record_pk)
|
|
except UserNotificationPolicyLogRecord.DoesNotExist:
|
|
task_logger.warning(
|
|
f"perform_notification: log_record {log_record_pk} doesn't exist. Skipping remainder of task. "
|
|
"The alert group associated with this log record may have been deleted."
|
|
)
|
|
return
|
|
|
|
task_logger.info(f"perform_notification: found record for {log_record_pk}")
|
|
|
|
user = log_record.author
|
|
alert_group = log_record.alert_group
|
|
notification_policy = (
|
|
UserNotificationPolicy.get_default_fallback_policy(user)
|
|
if use_default_notification_policy_fallback
|
|
else log_record.notification_policy
|
|
)
|
|
notification_channel = notification_policy.notify_by if notification_policy else None
|
|
|
|
if user is None or notification_policy is None:
|
|
UserNotificationPolicyLogRecord(
|
|
author=user,
|
|
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED,
|
|
notification_policy=notification_policy if not use_default_notification_policy_fallback else None,
|
|
reason="Expected data is missing",
|
|
alert_group=alert_group,
|
|
notification_step=notification_policy.step if notification_policy else None,
|
|
notification_channel=notification_channel,
|
|
notification_error_code=None,
|
|
).save()
|
|
return
|
|
|
|
if not user.is_notification_allowed:
|
|
UserNotificationPolicyLogRecord(
|
|
author=user,
|
|
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED,
|
|
reason="notification is not allowed for user",
|
|
alert_group=alert_group,
|
|
notification_error_code=UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_FORBIDDEN,
|
|
).save()
|
|
return
|
|
|
|
if alert_group.resolved:
|
|
# skip notification if alert group was resolved
|
|
UserNotificationPolicyLogRecord(
|
|
author=user,
|
|
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED,
|
|
notification_policy=notification_policy if not use_default_notification_policy_fallback else None,
|
|
reason="Skipped notification because alert group is resolved",
|
|
alert_group=alert_group,
|
|
notification_step=notification_policy.step if notification_policy else None,
|
|
notification_channel=notification_channel,
|
|
notification_error_code=None,
|
|
).save()
|
|
return
|
|
|
|
if notification_channel == UserNotificationPolicy.NotificationChannel.SMS:
|
|
phone_backend = PhoneBackend()
|
|
phone_backend.notify_by_sms(user, alert_group, notification_policy)
|
|
|
|
elif notification_channel == UserNotificationPolicy.NotificationChannel.PHONE_CALL:
|
|
phone_backend = PhoneBackend()
|
|
phone_backend.notify_by_call(user, alert_group, notification_policy)
|
|
|
|
elif notification_channel == UserNotificationPolicy.NotificationChannel.TELEGRAM:
|
|
try:
|
|
TelegramToUserConnector.notify_user(user, alert_group, notification_policy)
|
|
except RetryAfter as e:
|
|
task_logger.exception(f"Telegram API rate limit exceeded. Retry after {e.retry_after} seconds.")
|
|
# check how much time has passed since log record was created
|
|
# to prevent eternal loop of restarting perform_notification task
|
|
if timezone.now() < log_record.created_at + timezone.timedelta(hours=RETRY_TIMEOUT_HOURS):
|
|
countdown = getattr(e, "retry_after", 3)
|
|
perform_notification.apply_async(
|
|
(log_record_pk, use_default_notification_policy_fallback), countdown=countdown
|
|
)
|
|
else:
|
|
task_logger.debug(
|
|
f"telegram notification for alert_group {alert_group.pk} failed because of rate limit"
|
|
)
|
|
UserNotificationPolicyLogRecord(
|
|
author=user,
|
|
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED,
|
|
notification_policy=notification_policy,
|
|
reason="Telegram rate limit exceeded",
|
|
alert_group=alert_group,
|
|
notification_step=notification_policy.step,
|
|
notification_channel=notification_channel,
|
|
notification_error_code=UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_IN_TELEGRAM_RATELIMIT,
|
|
).save()
|
|
return
|
|
|
|
elif notification_channel == UserNotificationPolicy.NotificationChannel.SLACK:
|
|
# TODO: refactor checking the possibility of sending a notification in slack
|
|
# Code below is not consistent.
|
|
# We check various slack reasons to skip escalation in this task, in send_slack_notification,
|
|
# before and after posting of slack message.
|
|
if alert_group.skip_escalation_in_slack:
|
|
notification_error_code = UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_IN_SLACK
|
|
if alert_group.reason_to_skip_escalation == alert_group.RATE_LIMITED:
|
|
notification_error_code = UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_IN_SLACK_RATELIMIT
|
|
elif alert_group.reason_to_skip_escalation == alert_group.CHANNEL_ARCHIVED:
|
|
notification_error_code = (
|
|
UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_IN_SLACK_CHANNEL_IS_ARCHIVED
|
|
)
|
|
elif alert_group.reason_to_skip_escalation == alert_group.ACCOUNT_INACTIVE:
|
|
notification_error_code = UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_IN_SLACK_TOKEN_ERROR
|
|
task_logger.debug(
|
|
f"send_slack_notification for alert_group {alert_group.pk} failed because escalation in slack is "
|
|
f"skipped, reason: '{alert_group.get_reason_to_skip_escalation_display()}'"
|
|
)
|
|
UserNotificationPolicyLogRecord(
|
|
author=user,
|
|
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED,
|
|
notification_policy=notification_policy,
|
|
reason=f"Skipped escalation in Slack, reason: '{alert_group.get_reason_to_skip_escalation_display()}'",
|
|
alert_group=alert_group,
|
|
notification_step=notification_policy.step,
|
|
notification_channel=notification_channel,
|
|
notification_error_code=notification_error_code,
|
|
).save()
|
|
return
|
|
|
|
if alert_group.notify_in_slack_enabled is True:
|
|
# we cannot notify users in Slack if their team does not have Slack integration
|
|
if alert_group.channel.organization.slack_team_identity is None:
|
|
task_logger.debug(
|
|
f"send_slack_notification for alert_group {alert_group.pk} failed because slack team identity "
|
|
f"does not exist."
|
|
)
|
|
UserNotificationPolicyLogRecord(
|
|
author=user,
|
|
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED,
|
|
notification_policy=notification_policy,
|
|
reason="Slack team identity does not exist",
|
|
alert_group=alert_group,
|
|
notification_step=notification_policy.step,
|
|
notification_channel=notification_channel,
|
|
notification_error_code=UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_IN_SLACK_TOKEN_ERROR,
|
|
).save()
|
|
return
|
|
|
|
if log_record.slack_prevent_posting:
|
|
task_logger.debug(
|
|
f"send_slack_notification for alert_group {alert_group.pk} failed because slack posting is disabled."
|
|
)
|
|
UserNotificationPolicyLogRecord(
|
|
author=user,
|
|
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_SUCCESS,
|
|
notification_policy=notification_policy,
|
|
reason="Prevented from posting in Slack",
|
|
alert_group=alert_group,
|
|
notification_step=notification_policy.step,
|
|
notification_channel=notification_channel,
|
|
notification_error_code=UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_POSTING_TO_SLACK_IS_DISABLED,
|
|
).save()
|
|
return
|
|
|
|
if alert_group.slack_message:
|
|
alert_group.slack_message.send_slack_notification(user, alert_group, notification_policy)
|
|
task_logger.debug(f"Finished send_slack_notification for alert_group {alert_group.pk}.")
|
|
|
|
# check how much time has passed since log record was created
|
|
# to prevent eternal loop of restarting perform_notification task
|
|
elif timezone.now() < log_record.created_at + timezone.timedelta(hours=RETRY_TIMEOUT_HOURS):
|
|
task_logger.debug(
|
|
f"send_slack_notification for alert_group {alert_group.pk} failed because slack message "
|
|
f"does not exist. Restarting perform_notification."
|
|
)
|
|
restart_delay_seconds = 60
|
|
perform_notification.apply_async(
|
|
(log_record_pk, use_default_notification_policy_fallback), countdown=restart_delay_seconds
|
|
)
|
|
else:
|
|
task_logger.debug(
|
|
f"send_slack_notification for alert_group {alert_group.pk} failed because slack message "
|
|
f"after {RETRY_TIMEOUT_HOURS} hours still does not exist"
|
|
)
|
|
UserNotificationPolicyLogRecord(
|
|
author=user,
|
|
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED,
|
|
notification_policy=notification_policy,
|
|
reason="Slack message does not exist",
|
|
alert_group=alert_group,
|
|
notification_step=notification_policy.step,
|
|
notification_channel=notification_channel,
|
|
notification_error_code=UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_IN_SLACK,
|
|
).save()
|
|
else:
|
|
try:
|
|
backend_id = UserNotificationPolicy.NotificationChannel(notification_policy.notify_by).name
|
|
backend = get_messaging_backend_from_id(backend_id)
|
|
except ValueError:
|
|
backend = None
|
|
|
|
if backend is None:
|
|
task_logger.debug("notify_user failed because messaging backend is not available")
|
|
UserNotificationPolicyLogRecord(
|
|
author=user,
|
|
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED,
|
|
notification_policy=notification_policy,
|
|
reason="Messaging backend not available",
|
|
alert_group=alert_group,
|
|
notification_step=notification_policy.step,
|
|
notification_channel=notification_channel,
|
|
notification_error_code=UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_MESSAGING_BACKEND_ERROR,
|
|
).save()
|
|
return
|
|
backend.notify_user(user, alert_group, notification_policy)
|
|
|
|
|
|
@shared_dedicated_queue_retry_task(
|
|
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
|
|
)
|
|
def send_bundled_notification(user_notification_bundle_id: int):
|
|
"""
|
|
The task filters bundled notifications, attached to the current user_notification_bundle, by active alert groups,
|
|
creates notification log records and updates user_notification_bundle.
|
|
If there are no active alert groups - nothing else happens. If there is only one active alert group - regular
|
|
notification will be performed (called perform_notification task). Otherwise - "send bundled notification" method of
|
|
the current notification channel will be called.
|
|
"""
|
|
from apps.alerts.models import AlertGroup, UserNotificationBundle
|
|
from apps.base.models import UserNotificationPolicy, UserNotificationPolicyLogRecord
|
|
|
|
task_logger.info(f"Start send_bundled_notification for user_notification_bundle {user_notification_bundle_id}")
|
|
with transaction.atomic():
|
|
try:
|
|
user_notification_bundle = UserNotificationBundle.objects.filter(
|
|
pk=user_notification_bundle_id
|
|
).select_for_update()[0]
|
|
except IndexError:
|
|
task_logger.info(
|
|
f"user_notification_bundle {user_notification_bundle_id} doesn't exist. "
|
|
f"The user associated with this notification bundle may have been deleted."
|
|
)
|
|
return
|
|
|
|
if send_bundled_notification.request.id != user_notification_bundle.notification_task_id:
|
|
task_logger.info(
|
|
f"send_bundled_notification: notification_task_id mismatch. "
|
|
f"Duplication or non-active notification triggered. "
|
|
f"Active: {user_notification_bundle.notification_task_id}"
|
|
)
|
|
return
|
|
|
|
notifications = user_notification_bundle.notifications.filter(bundle_uuid__isnull=True).select_related(
|
|
"alert_group"
|
|
)
|
|
|
|
log_records_to_create: typing.List["UserNotificationPolicyLogRecord"] = []
|
|
skip_notification_ids: typing.List[int] = []
|
|
active_alert_group_ids: typing.Set[int] = set()
|
|
log_record_notification_triggered = None
|
|
is_notification_allowed = user_notification_bundle.user.is_notification_allowed
|
|
bundle_uuid = uuid4()
|
|
|
|
# create logs
|
|
for notification in notifications:
|
|
if notification.alert_group.status != AlertGroup.NEW:
|
|
task_logger.info(f"alert_group {notification.alert_group_id} is not active, skip notification")
|
|
skip_notification_ids.append(notification.id)
|
|
continue
|
|
elif not is_notification_allowed:
|
|
log_record_notification_failed = UserNotificationPolicyLogRecord(
|
|
author=user_notification_bundle.user,
|
|
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED,
|
|
reason="notification is not allowed for user",
|
|
alert_group=notification.alert_group,
|
|
notification_error_code=UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_FORBIDDEN,
|
|
)
|
|
log_records_to_create.append(log_record_notification_failed)
|
|
active_alert_group_ids.add(notification.alert_group_id)
|
|
continue
|
|
|
|
# collect notifications for active alert groups
|
|
active_alert_group_ids.add(notification.alert_group_id)
|
|
|
|
log_record_notification_triggered = UserNotificationPolicyLogRecord(
|
|
author=user_notification_bundle.user,
|
|
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED,
|
|
alert_group=notification.alert_group,
|
|
notification_policy=notification.notification_policy,
|
|
notification_step=UserNotificationPolicy.Step.NOTIFY,
|
|
notification_channel=user_notification_bundle.notification_channel,
|
|
)
|
|
log_records_to_create.append(log_record_notification_triggered)
|
|
|
|
# delete non-active notifications and update bundle_uuid for the rest notifications
|
|
if not is_notification_allowed:
|
|
notifications.delete()
|
|
else:
|
|
notifications.filter(id__in=skip_notification_ids).delete()
|
|
notifications.update(bundle_uuid=bundle_uuid)
|
|
|
|
if len(log_records_to_create) == 1 and log_record_notification_triggered:
|
|
# perform regular notification
|
|
log_record_notification_triggered.save()
|
|
task_logger.info(
|
|
f"there is only one alert group in bundled notification, perform regular notification. "
|
|
f"alert_group {log_record_notification_triggered.alert_group_id}"
|
|
)
|
|
transaction.on_commit(
|
|
partial(
|
|
schedule_perform_notification_task,
|
|
log_record_notification_triggered.pk,
|
|
log_record_notification_triggered.alert_group_id,
|
|
False,
|
|
)
|
|
)
|
|
else:
|
|
UserNotificationPolicyLogRecord.objects.bulk_create(log_records_to_create, batch_size=5000)
|
|
|
|
if not active_alert_group_ids or not is_notification_allowed:
|
|
task_logger.info(
|
|
f"no alert groups to notify about or notification is not allowed for user "
|
|
f"{user_notification_bundle.user_id}"
|
|
)
|
|
else:
|
|
task_logger.info(
|
|
f"perform bundled notification for alert groups with ids: {active_alert_group_ids}, "
|
|
f"bundle_uuid: {bundle_uuid}"
|
|
)
|
|
if user_notification_bundle.notification_channel == UserNotificationPolicy.NotificationChannel.SMS:
|
|
PhoneBackend.notify_by_sms_bundle_async(user_notification_bundle.user, bundle_uuid)
|
|
|
|
user_notification_bundle.notification_task_id = None
|
|
user_notification_bundle.last_notified_at = timezone.now()
|
|
user_notification_bundle.eta = None
|
|
user_notification_bundle.save(update_fields=["notification_task_id", "last_notified_at", "eta"])
|
|
|
|
for alert_group_id in active_alert_group_ids:
|
|
transaction.on_commit(partial(send_update_log_report_signal.apply_async, (None, alert_group_id)))
|
|
|
|
# update metric
|
|
transaction.on_commit(partial(update_metric_if_needed, user_notification_bundle.user, active_alert_group_ids))
|
|
|
|
task_logger.info(f"Finished send_bundled_notification for user_notification_bundle {user_notification_bundle_id}")
|
|
|
|
|
|
# deprecated
|
|
@shared_dedicated_queue_retry_task(
|
|
autoretry_for=(Exception,), retry_backoff=True, max_retries=0 if settings.DEBUG else None
|
|
)
|
|
def send_user_notification_signal(log_record_pk):
|
|
# Triggers user_notification_action_triggered_signal
|
|
# This signal is only connected to UserSlackRepresentative and triggers posting message to Slack about some
|
|
# FAILED notifications (see NotificationDeliveryStep and ERRORS_TO_SEND_IN_SLACK_CHANNEL).
|
|
# No need to call it here.
|
|
pass
|