oncall-engine/engine/apps/metrics_exporter/tasks.py
Yulya Artyukhina 191814b25e
User notifications bundle (#4457)
# What this PR does
This PR adds two new models: UserNotificationBundle and
BundledNotification (proposals for naming are welcome).

`UserNotificationBundle` manages the information about last notification
time and scheduled notification task for bundled notifications. It is
unique per user + notification_channel + notification importance.

`BundledNotification` contains notification policy and alert group, that
triggered the notification. The BundledNotification instance is created
in `notify_user_task` for every notification, that should be bundled,
and is attached to UserNotificationBundle by ForeignKey connection.

How it works:
If the user was notified recently (within the last two minutes) by the
current notification channel, and this channel is bundlable,
BundledNotification instance will be created and attached to the
UserNotificationBundle instance, and `send_bundled_notification` task
will be scheduled to execute in 2 min.
In `send_bundled_notification` task we get all BundledNotification
attached to the current UserNotificationBundle instance, check if alert
groups are still active and if there is only one notification - perform
regular notification by calling `perform_notification` task, otherwise
call "notify_by_<channel>_bundle" method for the current notification
channel.

PR with method to send notification bundle by SMS -
https://github.com/grafana/oncall/pull/4624

**This feature is disabled by default by feature flag. Public docs will
be added in a separate PR with enabling this feature.**
## Which issue(s) this PR closes
related to https://github.com/grafana/oncall-private/issues/2712

## Checklist

- [x] Unit, integration, and e2e (if applicable) tests updated
- [x] Documentation added (or `pr:no public docs` PR label added if not
required)
- [x] Added the relevant release notes label (see labels prefixed w/
`release:`). These labels dictate how your PR will
    show up in the autogenerated release notes.
2024-07-16 11:24:08 +00:00

292 lines
12 KiB
Python

import typing
from django.conf import settings
from django.core.cache import cache
from django.db.models import Count, Q
from apps.alerts.constants import AlertGroupState
from apps.metrics_exporter.constants import (
METRICS_ORGANIZATIONS_IDS,
METRICS_ORGANIZATIONS_IDS_CACHE_TIMEOUT,
NO_SERVICE_VALUE,
SERVICE_LABEL,
AlertGroupsResponseTimeMetricsDict,
AlertGroupsTotalMetricsDict,
RecalculateOrgMetricsDict,
UserWasNotifiedOfAlertGroupsMetricsDict,
)
from apps.metrics_exporter.helpers import (
get_default_states_dict,
get_metric_alert_groups_response_time_key,
get_metric_alert_groups_total_key,
get_metric_calculation_started_key,
get_metric_user_was_notified_of_alert_groups_key,
get_metrics_cache_timer_key,
get_metrics_recalculation_timeout,
get_organization_ids,
get_organization_ids_from_db,
get_response_time_period,
is_allowed_to_start_metrics_calculation,
metrics_update_user_cache,
)
from apps.metrics_exporter.metrics_cache_manager import MetricsCacheManager
from common.custom_celery_tasks import shared_dedicated_queue_retry_task
from common.database import get_random_readonly_database_key_if_present_otherwise_default
@shared_dedicated_queue_retry_task(
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def save_organizations_ids_in_cache():
organizations_ids = get_organization_ids_from_db()
cache.set(organizations_ids, METRICS_ORGANIZATIONS_IDS, METRICS_ORGANIZATIONS_IDS_CACHE_TIMEOUT)
@shared_dedicated_queue_retry_task(
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def start_calculate_and_cache_metrics(metrics_to_recalculate: list[RecalculateOrgMetricsDict]):
"""Start calculation metrics for each object in metrics_to_recalculate"""
for counter, recalculation_data in enumerate(metrics_to_recalculate):
if not is_allowed_to_start_metrics_calculation(**recalculation_data):
continue
# start immediately if recalculation starting has been forced
countdown = 0 if recalculation_data.get("force") else counter
calculate_and_cache_metrics.apply_async(kwargs=recalculation_data, countdown=countdown)
calculate_and_cache_user_was_notified_metric.apply_async(
(recalculation_data["organization_id"],), countdown=countdown
)
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=0)
def start_recalculation_for_new_metric(metric_name):
TEN_MINUTES = 600
calculation_started_key = get_metric_calculation_started_key(metric_name)
is_calculation_started = cache.get(calculation_started_key)
if is_calculation_started:
return
cache.set(calculation_started_key, True, timeout=TEN_MINUTES)
org_ids = set(get_organization_ids())
countdown = 0
for counter, organization_id in enumerate(org_ids):
if counter % 10 == 0:
countdown += 1
calculate_and_cache_user_was_notified_metric.apply_async((organization_id,), countdown=countdown)
@shared_dedicated_queue_retry_task(
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def calculate_and_cache_metrics(organization_id, force=False):
"""
Calculate integrations metrics for organization.
"""
from apps.alerts.models import AlertGroup, AlertReceiveChannel
from apps.user_management.models import Organization
ONE_HOUR = 3600
TWO_HOURS = 7200
organization = Organization.objects.filter(pk=organization_id).first()
if not organization:
return
integrations = (
AlertReceiveChannel.objects.using(get_random_readonly_database_key_if_present_otherwise_default())
.filter(~Q(integration=AlertReceiveChannel.INTEGRATION_MAINTENANCE) & Q(organization_id=organization_id))
.select_related("team")
)
response_time_period = get_response_time_period()
instance_slug = organization.stack_slug
instance_id = organization.stack_id
instance_org_id = organization.org_id
metric_alert_group_total: typing.Dict[int, AlertGroupsTotalMetricsDict] = {}
metric_alert_group_response_time: typing.Dict[int, AlertGroupsResponseTimeMetricsDict] = {}
states = {
AlertGroupState.FIRING.value: AlertGroup.get_new_state_filter(),
AlertGroupState.SILENCED.value: AlertGroup.get_silenced_state_filter(),
AlertGroupState.ACKNOWLEDGED.value: AlertGroup.get_acknowledged_state_filter(),
AlertGroupState.RESOLVED.value: AlertGroup.get_resolved_state_filter(),
}
for integration in integrations:
metric_alert_group_total_data = {
"integration_name": integration.emojized_verbal_name,
"team_name": integration.team_name,
"team_id": integration.team_id_or_no_team,
"org_id": instance_org_id,
"slug": instance_slug,
"id": instance_id,
"services": {
NO_SERVICE_VALUE: get_default_states_dict(),
},
}
# calculate states
for state, alert_group_filter in states.items():
# count alert groups with `service_name` label group by label value
alert_group_count_by_service = (
integration.alert_groups.filter(
alert_group_filter,
labels__organization=organization,
labels__key_name=SERVICE_LABEL,
)
.values("labels__value_name")
.annotate(count=Count("id"))
)
for value in alert_group_count_by_service:
metric_alert_group_total_data["services"].setdefault(
value["labels__value_name"],
get_default_states_dict(),
)[state] += value["count"]
# count alert groups without `service_name` label
alert_groups_count_without_service = integration.alert_groups.filter(
alert_group_filter,
~Q(labels__key_name=SERVICE_LABEL),
).count()
metric_alert_group_total_data["services"][NO_SERVICE_VALUE][state] += alert_groups_count_without_service
metric_alert_group_total[integration.id] = metric_alert_group_total_data
# calculate response time metric
metric_response_time_data = {
"integration_name": integration.emojized_verbal_name,
"team_name": integration.team_name,
"team_id": integration.team_id_or_no_team,
"org_id": instance_org_id,
"slug": instance_slug,
"id": instance_id,
"services": {NO_SERVICE_VALUE: []},
}
# filter response time by services
response_time_by_service = integration.alert_groups.filter(
started_at__gte=response_time_period,
response_time__isnull=False,
labels__organization=organization,
labels__key_name=SERVICE_LABEL,
).values_list("id", "labels__value_name", "response_time")
for _, service_name, response_time in response_time_by_service:
metric_response_time_data["services"].setdefault(service_name, [])
metric_response_time_data["services"][service_name].append(response_time.total_seconds())
no_service_response_time = (
integration.alert_groups.filter(
started_at__gte=response_time_period,
response_time__isnull=False,
)
.exclude(id__in=[i[0] for i in response_time_by_service])
.values_list("response_time", flat=True)
)
no_service_response_time_seconds = [
int(response_time.total_seconds()) for response_time in no_service_response_time
]
metric_response_time_data["services"][NO_SERVICE_VALUE] = no_service_response_time_seconds
metric_alert_group_response_time[integration.id] = metric_response_time_data
metric_alert_groups_total_key = get_metric_alert_groups_total_key(organization_id)
metric_alert_groups_response_time_key = get_metric_alert_groups_response_time_key(organization_id)
recalculate_timeout = get_metrics_recalculation_timeout()
metrics_cache_timeout = recalculate_timeout + TWO_HOURS
cache.set(metric_alert_groups_total_key, metric_alert_group_total, timeout=metrics_cache_timeout)
cache.set(metric_alert_groups_response_time_key, metric_alert_group_response_time, timeout=metrics_cache_timeout)
if force:
metrics_cache_timer_key = get_metrics_cache_timer_key(organization_id)
metrics_cache_timer = cache.get(metrics_cache_timer_key)
metrics_cache_timer["forced_started"] = False
cache.set(metrics_cache_timer_key, metrics_cache_timer, timeout=recalculate_timeout - ONE_HOUR)
@shared_dedicated_queue_retry_task(
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def calculate_and_cache_user_was_notified_metric(organization_id):
"""
Calculate metric "user_was_notified_of_alert_groups" for organization.
"""
from apps.base.models import UserNotificationPolicyLogRecord
from apps.user_management.models import Organization, User
TWO_HOURS = 7200
organization = Organization.objects.filter(pk=organization_id).first()
if not organization:
return
users = (
User.objects.using(get_random_readonly_database_key_if_present_otherwise_default())
.filter(organization_id=organization_id)
.annotate(num_logs=Count("personal_log_records"))
.filter(num_logs__gte=1)
)
instance_slug = organization.stack_slug
instance_id = organization.stack_id
instance_org_id = organization.org_id
metric_user_was_notified: typing.Dict[int, UserWasNotifiedOfAlertGroupsMetricsDict] = {}
for user in users:
counter = (
user.personal_log_records.filter(type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED)
.values("alert_group")
.distinct()
.count()
)
if counter == 0: # means that user has no successful notifications
continue
metric_user_was_notified[user.id] = {
"user_username": user.username,
"org_id": instance_org_id,
"slug": instance_slug,
"id": instance_id,
"counter": counter,
}
metric_user_was_notified_key = get_metric_user_was_notified_of_alert_groups_key(organization_id)
recalculate_timeout = get_metrics_recalculation_timeout()
metrics_cache_timeout = recalculate_timeout + TWO_HOURS
cache.set(metric_user_was_notified_key, metric_user_was_notified, timeout=metrics_cache_timeout)
@shared_dedicated_queue_retry_task(
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else 10
)
def update_metrics_for_alert_group(alert_group_id, organization_id, previous_state, new_state):
from apps.alerts.models import AlertGroup
alert_group = AlertGroup.objects.get(pk=alert_group_id)
updated_response_time = alert_group.response_time
if previous_state != AlertGroupState.FIRING or alert_group.restarted_at:
# only consider response time from the first action
updated_response_time = None
service_label = alert_group.labels.filter(key_name=SERVICE_LABEL).first()
service_name = service_label.value_name if service_label else NO_SERVICE_VALUE
MetricsCacheManager.metrics_update_cache_for_alert_group(
integration_id=alert_group.channel_id,
organization_id=organization_id,
old_state=previous_state,
new_state=new_state,
response_time=updated_response_time,
started_at=alert_group.started_at,
service_name=service_name,
)
@shared_dedicated_queue_retry_task(
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else 10
)
def update_metrics_for_user(user_id, counter=1):
from apps.user_management.models import User
user = User.objects.get(id=user_id)
metrics_update_user_cache(user, counter)