import typing from django.conf import settings from django.core.cache import cache from django.db.models import Count, Q from apps.alerts.constants import AlertGroupState from apps.metrics_exporter.constants import ( METRICS_ORGANIZATIONS_IDS, METRICS_ORGANIZATIONS_IDS_CACHE_TIMEOUT, NO_SERVICE_VALUE, SERVICE_LABEL, AlertGroupsResponseTimeMetricsDict, AlertGroupsTotalMetricsDict, RecalculateOrgMetricsDict, UserWasNotifiedOfAlertGroupsMetricsDict, ) from apps.metrics_exporter.helpers import ( get_default_states_dict, get_metric_alert_groups_response_time_key, get_metric_alert_groups_total_key, get_metric_calculation_started_key, get_metric_user_was_notified_of_alert_groups_key, get_metrics_cache_timer_key, get_metrics_recalculation_timeout, get_organization_ids, get_organization_ids_from_db, get_response_time_period, is_allowed_to_start_metrics_calculation, metrics_update_user_cache, ) from apps.metrics_exporter.metrics_cache_manager import MetricsCacheManager from common.custom_celery_tasks import shared_dedicated_queue_retry_task from common.database import get_random_readonly_database_key_if_present_otherwise_default @shared_dedicated_queue_retry_task( autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None ) def save_organizations_ids_in_cache(): organizations_ids = get_organization_ids_from_db() cache.set(organizations_ids, METRICS_ORGANIZATIONS_IDS, METRICS_ORGANIZATIONS_IDS_CACHE_TIMEOUT) @shared_dedicated_queue_retry_task( autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None ) def start_calculate_and_cache_metrics(metrics_to_recalculate: list[RecalculateOrgMetricsDict]): """Start calculation metrics for each object in metrics_to_recalculate""" for counter, recalculation_data in enumerate(metrics_to_recalculate): if not is_allowed_to_start_metrics_calculation(**recalculation_data): continue # start immediately if recalculation starting has been forced countdown = 0 if recalculation_data.get("force") else counter calculate_and_cache_metrics.apply_async(kwargs=recalculation_data, countdown=countdown) calculate_and_cache_user_was_notified_metric.apply_async( (recalculation_data["organization_id"],), countdown=countdown ) @shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=0) def start_recalculation_for_new_metric(metric_name): TEN_MINUTES = 600 calculation_started_key = get_metric_calculation_started_key(metric_name) is_calculation_started = cache.get(calculation_started_key) if is_calculation_started: return cache.set(calculation_started_key, True, timeout=TEN_MINUTES) org_ids = set(get_organization_ids()) countdown = 0 for counter, organization_id in enumerate(org_ids): if counter % 10 == 0: countdown += 1 calculate_and_cache_user_was_notified_metric.apply_async((organization_id,), countdown=countdown) @shared_dedicated_queue_retry_task( autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None ) def calculate_and_cache_metrics(organization_id, force=False): """ Calculate integrations metrics for organization. """ from apps.alerts.models import AlertGroup, AlertReceiveChannel from apps.user_management.models import Organization ONE_HOUR = 3600 TWO_HOURS = 7200 organization = Organization.objects.filter(pk=organization_id).first() if not organization: return integrations = ( AlertReceiveChannel.objects.using(get_random_readonly_database_key_if_present_otherwise_default()) .filter(~Q(integration=AlertReceiveChannel.INTEGRATION_MAINTENANCE) & Q(organization_id=organization_id)) .select_related("team") ) response_time_period = get_response_time_period() instance_slug = organization.stack_slug instance_id = organization.stack_id instance_org_id = organization.org_id metric_alert_group_total: typing.Dict[int, AlertGroupsTotalMetricsDict] = {} metric_alert_group_response_time: typing.Dict[int, AlertGroupsResponseTimeMetricsDict] = {} states = { AlertGroupState.FIRING.value: AlertGroup.get_new_state_filter(), AlertGroupState.SILENCED.value: AlertGroup.get_silenced_state_filter(), AlertGroupState.ACKNOWLEDGED.value: AlertGroup.get_acknowledged_state_filter(), AlertGroupState.RESOLVED.value: AlertGroup.get_resolved_state_filter(), } for integration in integrations: metric_alert_group_total_data = { "integration_name": integration.emojized_verbal_name, "team_name": integration.team_name, "team_id": integration.team_id_or_no_team, "org_id": instance_org_id, "slug": instance_slug, "id": instance_id, "services": { NO_SERVICE_VALUE: get_default_states_dict(), }, } # calculate states for state, alert_group_filter in states.items(): # count alert groups with `service_name` label group by label value alert_group_count_by_service = ( integration.alert_groups.filter( alert_group_filter, labels__organization=organization, labels__key_name=SERVICE_LABEL, ) .values("labels__value_name") .annotate(count=Count("id")) ) for value in alert_group_count_by_service: metric_alert_group_total_data["services"].setdefault( value["labels__value_name"], get_default_states_dict(), )[state] += value["count"] # count alert groups without `service_name` label alert_groups_count_without_service = integration.alert_groups.filter( alert_group_filter, ~Q(labels__key_name=SERVICE_LABEL), ).count() metric_alert_group_total_data["services"][NO_SERVICE_VALUE][state] += alert_groups_count_without_service metric_alert_group_total[integration.id] = metric_alert_group_total_data # calculate response time metric metric_response_time_data = { "integration_name": integration.emojized_verbal_name, "team_name": integration.team_name, "team_id": integration.team_id_or_no_team, "org_id": instance_org_id, "slug": instance_slug, "id": instance_id, "services": {NO_SERVICE_VALUE: []}, } # filter response time by services response_time_by_service = integration.alert_groups.filter( started_at__gte=response_time_period, response_time__isnull=False, labels__organization=organization, labels__key_name=SERVICE_LABEL, ).values_list("id", "labels__value_name", "response_time") for _, service_name, response_time in response_time_by_service: metric_response_time_data["services"].setdefault(service_name, []) metric_response_time_data["services"][service_name].append(response_time.total_seconds()) no_service_response_time = ( integration.alert_groups.filter( started_at__gte=response_time_period, response_time__isnull=False, ) .exclude(id__in=[i[0] for i in response_time_by_service]) .values_list("response_time", flat=True) ) no_service_response_time_seconds = [ int(response_time.total_seconds()) for response_time in no_service_response_time ] metric_response_time_data["services"][NO_SERVICE_VALUE] = no_service_response_time_seconds metric_alert_group_response_time[integration.id] = metric_response_time_data metric_alert_groups_total_key = get_metric_alert_groups_total_key(organization_id) metric_alert_groups_response_time_key = get_metric_alert_groups_response_time_key(organization_id) recalculate_timeout = get_metrics_recalculation_timeout() metrics_cache_timeout = recalculate_timeout + TWO_HOURS cache.set(metric_alert_groups_total_key, metric_alert_group_total, timeout=metrics_cache_timeout) cache.set(metric_alert_groups_response_time_key, metric_alert_group_response_time, timeout=metrics_cache_timeout) if force: metrics_cache_timer_key = get_metrics_cache_timer_key(organization_id) metrics_cache_timer = cache.get(metrics_cache_timer_key) metrics_cache_timer["forced_started"] = False cache.set(metrics_cache_timer_key, metrics_cache_timer, timeout=recalculate_timeout - ONE_HOUR) @shared_dedicated_queue_retry_task( autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None ) def calculate_and_cache_user_was_notified_metric(organization_id): """ Calculate metric "user_was_notified_of_alert_groups" for organization. """ from apps.base.models import UserNotificationPolicyLogRecord from apps.user_management.models import Organization, User TWO_HOURS = 7200 organization = Organization.objects.filter(pk=organization_id).first() if not organization: return users = ( User.objects.using(get_random_readonly_database_key_if_present_otherwise_default()) .filter(organization_id=organization_id) .annotate(num_logs=Count("personal_log_records")) .filter(num_logs__gte=1) ) instance_slug = organization.stack_slug instance_id = organization.stack_id instance_org_id = organization.org_id metric_user_was_notified: typing.Dict[int, UserWasNotifiedOfAlertGroupsMetricsDict] = {} for user in users: counter = ( user.personal_log_records.filter(type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED) .values("alert_group") .distinct() .count() ) if counter == 0: # means that user has no successful notifications continue metric_user_was_notified[user.id] = { "user_username": user.username, "org_id": instance_org_id, "slug": instance_slug, "id": instance_id, "counter": counter, } metric_user_was_notified_key = get_metric_user_was_notified_of_alert_groups_key(organization_id) recalculate_timeout = get_metrics_recalculation_timeout() metrics_cache_timeout = recalculate_timeout + TWO_HOURS cache.set(metric_user_was_notified_key, metric_user_was_notified, timeout=metrics_cache_timeout) @shared_dedicated_queue_retry_task( autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else 10 ) def update_metrics_for_alert_group(alert_group_id, organization_id, previous_state, new_state): from apps.alerts.models import AlertGroup alert_group = AlertGroup.objects.get(pk=alert_group_id) updated_response_time = alert_group.response_time if previous_state != AlertGroupState.FIRING or alert_group.restarted_at: # only consider response time from the first action updated_response_time = None service_label = alert_group.labels.filter(key_name=SERVICE_LABEL).first() service_name = service_label.value_name if service_label else NO_SERVICE_VALUE MetricsCacheManager.metrics_update_cache_for_alert_group( integration_id=alert_group.channel_id, organization_id=organization_id, old_state=previous_state, new_state=new_state, response_time=updated_response_time, started_at=alert_group.started_at, service_name=service_name, ) @shared_dedicated_queue_retry_task( autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else 10 ) def update_metrics_for_user(user_id, counter=1): from apps.user_management.models import User user = User.objects.get(id=user_id) metrics_update_user_cache(user, counter)