From 15ef692009846af7d2e9dddb4de51a0e6aed21af Mon Sep 17 00:00:00 2001 From: Yulya Artyukhina Date: Thu, 25 May 2023 20:26:13 +0200 Subject: [PATCH] OnCall prometheus metrics exporter (#1605) # What this PR does Add OnCall prometheus metrics exporter ## Which issue(s) this PR fixes ## Checklist - [x] Tests updated - [ ] Documentation added - [ ] `CHANGELOG.md` updated --------- Co-authored-by: Joey Orlando Co-authored-by: Matias Bordese --- CHANGELOG.md | 4 + engine/apps/alerts/constants.py | 11 + .../migrations/0016_auto_20230523_1355.py | 23 + engine/apps/alerts/models/alert_group.py | 354 +++++++++++--- .../alerts/models/alert_receive_channel.py | 22 +- engine/apps/alerts/tasks/unsilence.py | 8 + engine/apps/alerts/tests/test_silence.py | 1 + engine/apps/metrics_exporter/__init__.py | 0 engine/apps/metrics_exporter/constants.py | 51 ++ engine/apps/metrics_exporter/helpers.py | 227 +++++++++ .../metrics_exporter/metrics_cache_manager.py | 86 ++++ .../metrics_exporter/metrics_collectors.py | 122 +++++ engine/apps/metrics_exporter/tasks.py | 151 ++++++ .../apps/metrics_exporter/tests/__init__.py | 0 .../apps/metrics_exporter/tests/conftest.py | 100 ++++ .../tests/test_calculation_metrics.py | 115 +++++ .../tests/test_metrics_collectors.py | 33 ++ .../tests/test_update_metics_cache.py | 452 ++++++++++++++++++ engine/apps/metrics_exporter/urls.py | 7 + engine/apps/metrics_exporter/views.py | 11 + engine/apps/user_management/models/team.py | 16 + engine/conftest.py | 11 + engine/requirements.txt | 1 + engine/settings/base.py | 5 + engine/settings/prod_without_db.py | 3 + 25 files changed, 1739 insertions(+), 75 deletions(-) create mode 100644 engine/apps/alerts/migrations/0016_auto_20230523_1355.py create mode 100644 engine/apps/metrics_exporter/__init__.py create mode 100644 engine/apps/metrics_exporter/constants.py create mode 100644 engine/apps/metrics_exporter/helpers.py create mode 100644 engine/apps/metrics_exporter/metrics_cache_manager.py create mode 100644 engine/apps/metrics_exporter/metrics_collectors.py create mode 100644 engine/apps/metrics_exporter/tasks.py create mode 100644 engine/apps/metrics_exporter/tests/__init__.py create mode 100644 engine/apps/metrics_exporter/tests/conftest.py create mode 100644 engine/apps/metrics_exporter/tests/test_calculation_metrics.py create mode 100644 engine/apps/metrics_exporter/tests/test_metrics_collectors.py create mode 100644 engine/apps/metrics_exporter/tests/test_update_metics_cache.py create mode 100644 engine/apps/metrics_exporter/urls.py create mode 100644 engine/apps/metrics_exporter/views.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 82f7a8cd..82383ff0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +### Added + +- Prometheus exporter backend for alert groups related metrics + ### Fixed - Fix error when updating closed modal window in Slack by @vadimkerr ([#2019](https://github.com/grafana/oncall/pull/2019)) diff --git a/engine/apps/alerts/constants.py b/engine/apps/alerts/constants.py index 38508e52..714312f2 100644 --- a/engine/apps/alerts/constants.py +++ b/engine/apps/alerts/constants.py @@ -1,3 +1,6 @@ +from enum import Enum + + class ActionSource: ( SLACK, @@ -10,3 +13,11 @@ class ActionSource: TASK_DELAY_SECONDS = 1 NEXT_ESCALATION_DELAY = 5 + + +# AlertGroup states verbal +class AlertGroupState(str, Enum): + FIRING = "firing" + ACKNOWLEDGED = "acknowledged" + RESOLVED = "resolved" + SILENCED = "silenced" diff --git a/engine/apps/alerts/migrations/0016_auto_20230523_1355.py b/engine/apps/alerts/migrations/0016_auto_20230523_1355.py new file mode 100644 index 00000000..eb7736b1 --- /dev/null +++ b/engine/apps/alerts/migrations/0016_auto_20230523_1355.py @@ -0,0 +1,23 @@ +# Generated by Django 3.2.19 on 2023-05-23 13:55 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('alerts', '0015_auto_20230508_1641'), + ] + + operations = [ + migrations.AddField( + model_name='alertgroup', + name='response_time', + field=models.DurationField(default=None, null=True), + ), + migrations.AddField( + model_name='alertgroup', + name='restarted_at', + field=models.DateTimeField(blank=True, default=None, null=True), + ), + ] diff --git a/engine/apps/alerts/models/alert_group.py b/engine/apps/alerts/models/alert_group.py index fb517562..ecf0bbca 100644 --- a/engine/apps/alerts/models/alert_group.py +++ b/engine/apps/alerts/models/alert_group.py @@ -11,16 +11,20 @@ from django.conf import settings from django.core.validators import MinLengthValidator from django.db import IntegrityError, models, transaction from django.db.models import JSONField, Q, QuerySet +from django.db.models.signals import post_save +from django.dispatch import receiver from django.utils import timezone from django.utils.functional import cached_property from django_deprecate_fields import deprecate_field +from apps.alerts.constants import AlertGroupState from apps.alerts.escalation_snapshot import EscalationSnapshotMixin from apps.alerts.incident_appearance.renderers.constants import DEFAULT_BACKUP_TITLE from apps.alerts.incident_appearance.renderers.slack_renderer import AlertGroupSlackRenderer from apps.alerts.incident_log_builder import IncidentLogBuilder from apps.alerts.signals import alert_group_action_triggered_signal, alert_group_created_signal from apps.alerts.tasks import acknowledge_reminder_task, call_ack_url, send_alert_group_signal, unsilence_task +from apps.metrics_exporter.metrics_cache_manager import MetricsCacheManager from apps.slack.slack_formatter import SlackFormatter from apps.user_management.models import User from common.public_primary_keys import generate_public_primary_key, increase_public_primary_key_length @@ -263,6 +267,10 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. silenced_until = models.DateTimeField(blank=True, null=True) unsilence_task_uuid = models.CharField(max_length=100, null=True, default=None) + restarted_at = models.DateTimeField(blank=True, null=True, default=None) + + response_time = models.DurationField(null=True, default=None) + @property def is_silenced_forever(self): return self.silenced and self.silenced_until is None @@ -502,9 +510,35 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. def happened_while_maintenance(self): return self.root_alert_group is not None and self.root_alert_group.maintenance_uuid is not None + def _get_response_time(self): + """Return response_time based on current alert group status.""" + response_time = None + timestamps = (self.acknowledged_at, self.resolved_at, self.silenced_at, self.wiped_at) + min_timestamp = min((ts for ts in timestamps if ts), default=None) + if min_timestamp: + response_time = min_timestamp - self.started_at + return response_time + + def _update_metrics(self, organization_id, previous_state, state): + """Update metrics cache for response time and state as needed.""" + updated_response_time = self.response_time + if previous_state != AlertGroupState.FIRING or self.restarted_at: + # only consider response time from the first action + updated_response_time = None + MetricsCacheManager.metrics_update_cache_for_alert_group( + self.channel_id, + organization_id=organization_id, + old_state=previous_state, + new_state=state, + response_time=updated_response_time, + started_at=self.started_at, + ) + def acknowledge_by_user(self, user: User, action_source: Optional[str] = None) -> None: AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord") + initial_state = self.state logger.debug(f"Started acknowledge_by_user for alert_group {self.pk}") + # if incident was silenced or resolved, unsilence/unresolve it without starting escalation if self.silenced: self.un_silence() @@ -519,6 +553,9 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. self.log_records.create(type=AlertGroupLogRecord.TYPE_UN_RESOLVED, author=user, reason="Acknowledge button") self.acknowledge(acknowledged_by_user=user, acknowledged_by=AlertGroup.USER) + # Update alert group state and response time metrics cache + self._update_metrics(organization_id=user.organization_id, previous_state=initial_state, state=self.state) + self.stop_escalation() if self.is_root_alert_group: self.start_ack_reminder(user) @@ -546,6 +583,7 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. def acknowledge_by_source(self): AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord") + initial_state = self.state # if incident was silenced, unsilence it without starting escalation if self.silenced: @@ -556,6 +594,10 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. reason="Acknowledge by source", ) self.acknowledge(acknowledged_by=AlertGroup.SOURCE) + # Update alert group state and response time metrics cache + self._update_metrics( + organization_id=self.channel.organization_id, previous_state=initial_state, state=self.state + ) self.stop_escalation() log_record = self.log_records.create(type=AlertGroupLogRecord.TYPE_ACK) @@ -576,9 +618,12 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. def un_acknowledge_by_user(self, user: User, action_source: Optional[str] = None) -> None: AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord") - + initial_state = self.state logger.debug(f"Started un_acknowledge_by_user for alert_group {self.pk}") + self.unacknowledge() + # Update alert group state metric cache + self._update_metrics(organization_id=user.organization_id, previous_state=initial_state, state=self.state) if self.is_root_alert_group: self.start_escalation_if_needed() @@ -601,6 +646,7 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. def resolve_by_user(self, user: User, action_source: Optional[str] = None) -> None: AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord") + initial_state = self.state # if incident was silenced, unsilence it without starting escalation if self.silenced: @@ -612,6 +658,8 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. reason="Resolve button", ) self.resolve(resolved_by=AlertGroup.USER, resolved_by_user=user) + # Update alert group state and response time metrics cache + self._update_metrics(organization_id=user.organization_id, previous_state=initial_state, state=self.state) self.stop_escalation() log_record = self.log_records.create(type=AlertGroupLogRecord.TYPE_RESOLVED, author=user) @@ -631,6 +679,8 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. def resolve_by_source(self): AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord") + initial_state = self.state + # if incident was silenced, unsilence it without starting escalation if self.silenced: self.un_silence() @@ -640,6 +690,10 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. reason="Resolve by source", ) self.resolve(resolved_by=AlertGroup.SOURCE) + # Update alert group state and response time metrics cache + self._update_metrics( + organization_id=self.channel.organization_id, previous_state=initial_state, state=self.state + ) self.stop_escalation() log_record = self.log_records.create(type=AlertGroupLogRecord.TYPE_RESOLVED) @@ -657,6 +711,7 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. for dependent_alert_group in self.dependent_alert_groups.all(): dependent_alert_group.resolve_by_source() + # deprecated def resolve_by_archivation(self): AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord") # if incident was silenced, unsilence it without starting escalation @@ -690,8 +745,13 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. def resolve_by_last_step(self): AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord") + initial_state = self.state self.resolve(resolved_by=AlertGroup.LAST_STEP) + # Update alert group state and response time metrics cache + self._update_metrics( + organization_id=self.channel.organization_id, previous_state=initial_state, state=self.state + ) self.stop_escalation() log_record = self.log_records.create(type=AlertGroupLogRecord.TYPE_RESOLVED) @@ -735,7 +795,11 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord") if self.wiped_at is None: + initial_state = self.state self.unresolve() + # Update alert group state metric cache + self._update_metrics(organization_id=user.organization_id, previous_state=initial_state, state=self.state) + log_record = self.log_records.create(type=AlertGroupLogRecord.TYPE_UN_RESOLVED, author=user) if self.is_root_alert_group: @@ -906,6 +970,8 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. def silence_by_user(self, user: User, silence_delay: Optional[int], action_source: Optional[str] = None) -> None: AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord") + initial_state = self.state + if self.resolved: self.unresolve() self.log_records.create(type=AlertGroupLogRecord.TYPE_UN_RESOLVED, author=user, reason="Silence button") @@ -935,6 +1001,8 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. silenced_until = None self.silence(silenced_at=now, silenced_until=silenced_until, silenced_by_user=user) + # Update alert group state and response time metrics cache + self._update_metrics(organization_id=user.organization_id, previous_state=initial_state, state=self.state) log_record = self.log_records.create( type=AlertGroupLogRecord.TYPE_SILENCE, @@ -959,8 +1027,12 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. def un_silence_by_user(self, user: User, action_source: Optional[str] = None) -> None: AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord") + initial_state = self.state self.un_silence() + # Update alert group state metric cache + self._update_metrics(organization_id=user.organization_id, previous_state=initial_state, state=self.state) + if self.is_root_alert_group: self.start_escalation_if_needed() @@ -988,6 +1060,7 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. def wipe_by_user(self, user: User) -> None: AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord") + initial_state = self.state if not self.wiped_at: self.resolve(resolved_by=AlertGroup.WIPED) @@ -996,10 +1069,19 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. self.web_title_cache = None self.wiped_at = timezone.now() self.wiped_by = user + update_fields = ["distinction", "web_title_cache", "wiped_at", "wiped_by"] + + if self.response_time is None: + self.response_time = self._get_response_time() + update_fields += ["response_time"] + for alert in self.alerts.all(): alert.wipe(wiped_by=self.wiped_by, wiped_at=self.wiped_at) - self.save(update_fields=["distinction", "web_title_cache", "wiped_at", "wiped_by"]) + self.save(update_fields=update_fields) + + # Update alert group state and response time metrics cache + self._update_metrics(organization_id=user.organization_id, previous_state=initial_state, state=self.state) log_record = self.log_records.create( type=AlertGroupLogRecord.TYPE_WIPED, @@ -1023,6 +1105,8 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. def delete_by_user(self, user: User): AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord") + initial_state = self.state + self.stop_escalation() # prevent creating multiple logs # filter instead of get_or_create cause it can be multiple logs of this type due deleting error @@ -1052,6 +1136,8 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. dependent_alerts = list(self.dependent_alert_groups.all()) self.hard_delete() + # Update alert group state metric cache + self._update_metrics(organization_id=user.organization_id, previous_state=initial_state, state=None) for dependent_alert_group in dependent_alerts: # unattach dependent incidents dependent_alert_group.un_attach_by_delete() @@ -1076,31 +1162,52 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord") # it is needed to unserolve those alert_groups which were resolved to build proper log. - alert_groups_to_unresolve_before_acknowledge = alert_groups_to_acknowledge.filter(resolved=True) + alert_groups_to_unresolve_before_acknowledge = alert_groups_to_acknowledge.filter(resolved=models.Value("1")) # it is needed to unsilence those alert_groups which were silenced to build proper log. - alert_groups_to_unsilence_before_acknowledge = alert_groups_to_acknowledge.filter(silenced=True) + alert_groups_to_unsilence_before_acknowledge = alert_groups_to_acknowledge.filter(silenced=models.Value("1")) # convert current qs to list to prevent changes by update alert_groups_to_acknowledge_list = list(alert_groups_to_acknowledge) alert_groups_to_unresolve_before_acknowledge_list = list(alert_groups_to_unresolve_before_acknowledge) alert_groups_to_unsilence_before_acknowledge_list = list(alert_groups_to_unsilence_before_acknowledge) - alert_groups_to_acknowledge.update( - acknowledged=True, - resolved=False, - resolved_at=None, - resolved_by=AlertGroup.NOT_YET, - resolved_by_user=None, - silenced_until=None, - silenced_by_user=None, - silenced_at=None, - silenced=False, - acknowledged_at=timezone.now(), - acknowledged_by_user=user, - acknowledged_by=AlertGroup.USER, - is_escalation_finished=True, - ) + previous_states = [] + for alert_group in alert_groups_to_acknowledge_list: + previous_states.append(alert_group.state) + alert_group.acknowledged = True + alert_group.resolved = False + alert_group.resolved_at = None + alert_group.resolved_by = AlertGroup.NOT_YET + alert_group.resolved_by_user = None + alert_group.silenced_until = None + alert_group.silenced_by_user = None + alert_group.silenced_at = None + alert_group.silenced = False + alert_group.acknowledged_at = timezone.now() + alert_group.acknowledged_by_user = user + alert_group.acknowledged_by = AlertGroup.USER + alert_group.is_escalation_finished = True + if alert_group.response_time is None: + alert_group.response_time = alert_group._get_response_time() + + fields_to_update = [ + "acknowledged", + "resolved", + "resolved_at", + "resolved_by", + "resolved_by_user", + "silenced_until", + "silenced_by_user", + "silenced_at", + "silenced", + "acknowledged_at", + "acknowledged_by_user", + "acknowledged_by", + "is_escalation_finished", + "response_time", + ] + AlertGroup.all_objects.bulk_update(alert_groups_to_acknowledge_list, fields=fields_to_update, batch_size=100) for alert_group in alert_groups_to_unresolve_before_acknowledge_list: alert_group.log_records.create( @@ -1114,7 +1221,13 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. type=AlertGroupLogRecord.TYPE_UN_SILENCE, author=user, reason="Bulk action acknowledge" ) - for alert_group in alert_groups_to_acknowledge_list: + for alert_group, previous_state in zip(alert_groups_to_acknowledge_list, previous_states): + # update metrics cache + alert_group._update_metrics( + organization_id=user.organization_id, + previous_state=previous_state, + state=AlertGroupState.ACKNOWLEDGED, + ) if alert_group.is_root_alert_group: alert_group.start_ack_reminder(user) @@ -1147,31 +1260,55 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord") # it is needed to unsilence those alert_groups which were silenced to build proper log. - alert_groups_to_unsilence_before_resolve = alert_groups_to_resolve.filter(silenced=True) + alert_groups_to_unsilence_before_resolve = alert_groups_to_resolve.filter(silenced=models.Value("1")) # convert current qs to list to prevent changes by update alert_groups_to_resolve_list = list(alert_groups_to_resolve) alert_groups_to_unsilence_before_resolve_list = list(alert_groups_to_unsilence_before_resolve) - alert_groups_to_resolve.update( - resolved=True, - resolved_at=timezone.now(), - is_open_for_grouping=None, - resolved_by_user=user, - resolved_by=AlertGroup.USER, - is_escalation_finished=True, - silenced_until=None, - silenced_by_user=None, - silenced_at=None, - silenced=False, - ) + previous_states = [] + for alert_group in alert_groups_to_resolve_list: + previous_states.append(alert_group.state) + alert_group.resolved = True + alert_group.resolved_at = timezone.now() + alert_group.is_open_for_grouping = None + alert_group.resolved_by_user = user + alert_group.resolved_by = AlertGroup.USER + alert_group.is_escalation_finished = True + alert_group.silenced_until = None + alert_group.silenced_by_user = None + alert_group.silenced_at = None + alert_group.silenced = False + if alert_group.response_time is None: + alert_group.response_time = alert_group._get_response_time() + + fields_to_update = [ + "resolved", + "resolved_at", + "resolved_by", + "resolved_by_user", + "is_open_for_grouping", + "silenced_until", + "silenced_by_user", + "silenced_at", + "silenced", + "is_escalation_finished", + "response_time", + ] + AlertGroup.all_objects.bulk_update(alert_groups_to_resolve_list, fields=fields_to_update, batch_size=100) for alert_group in alert_groups_to_unsilence_before_resolve_list: alert_group.log_records.create( type=AlertGroupLogRecord.TYPE_UN_SILENCE, author=user, reason="Bulk action resolve" ) - for alert_group in alert_groups_to_resolve_list: + for alert_group, previous_state in zip(alert_groups_to_resolve_list, previous_states): + # update metrics cache + alert_group._update_metrics( + organization_id=user.organization_id, + previous_state=previous_state, + state=AlertGroupState.RESOLVED, + ) log_record = alert_group.log_records.create(type=AlertGroupLogRecord.TYPE_RESOLVED, author=user) send_alert_group_signal.apply_async((log_record.pk,)) @@ -1223,10 +1360,17 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. silenced_by_user=None, silenced_at=None, silenced=False, + restarted_at=timezone.now(), ) # unacknowledge alert groups for alert_group in alert_groups_to_restart_unack_list: + # update metrics cache (note alert_group.state is the original alert group's state) + alert_group._update_metrics( + organization_id=user.organization_id, + previous_state=alert_group.state, + state=AlertGroupState.FIRING, + ) log_record = alert_group.log_records.create( type=AlertGroupLogRecord.TYPE_UN_ACK, author=user, @@ -1259,10 +1403,17 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. silenced_by_user=None, silenced_at=None, silenced=False, + restarted_at=timezone.now(), ) # unresolve alert groups for alert_group in alert_groups_to_restart_unresolve_list: + # update metrics cache (note alert_group.state is the original alert group's state) + alert_group._update_metrics( + organization_id=user.organization_id, + previous_state=alert_group.state, + state=AlertGroupState.FIRING, + ) log_record = alert_group.log_records.create( type=AlertGroupLogRecord.TYPE_UN_RESOLVED, author=user, @@ -1295,10 +1446,17 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. silenced_by_user=None, silenced_at=None, silenced=False, + restarted_at=timezone.now(), ) # unsilence alert groups for alert_group in alert_groups_to_restart_unsilence_list: + # update metrics cache (note alert_group.state is the original alert group's state) + alert_group._update_metrics( + organization_id=user.organization_id, + previous_state=alert_group.state, + state=AlertGroupState.FIRING, + ) log_record = alert_group.log_records.create( type=AlertGroupLogRecord.TYPE_UN_SILENCE, author=user, reason="Bulk action restart" ) @@ -1363,38 +1521,45 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. alert_groups_to_unacknowledge_before_silence_list = list(alert_groups_to_unacknowledge_before_silence) alert_groups_to_unresolve_before_silence_list = list(alert_groups_to_unresolve_before_silence) - if silence_for_period: - alert_groups_to_silence.update( - acknowledged=False, - acknowledged_at=None, - acknowledged_by_user=None, - acknowledged_by=AlertGroup.NOT_YET, - resolved=False, - resolved_at=None, - resolved_by_user=None, - resolved_by=AlertGroup.NOT_YET, - silenced=True, - silenced_at=now, - silenced_until=silenced_until, - silenced_by_user=user, - ) - else: - alert_groups_to_silence.update( - acknowledged=False, - acknowledged_at=None, - acknowledged_by_user=None, - acknowledged_by=AlertGroup.NOT_YET, - resolved=False, - resolved_at=None, - resolved_by_user=None, - resolved_by=AlertGroup.NOT_YET, - silenced=True, - silenced_at=now, - silenced_until=silenced_until, - silenced_by_user=user, - is_escalation_finished=True, - ) + previous_states = [] + for alert_group in alert_groups_to_silence_list: + previous_states.append(alert_group.state) + alert_group.acknowledged = False + alert_group.acknowledged_at = None + alert_group.acknowledged_by_user = None + alert_group.acknowledged_by = AlertGroup.NOT_YET + alert_group.resolved = False + alert_group.resolved_at = None + alert_group.resolved_by_user = None + alert_group.resolved_by = AlertGroup.NOT_YET + alert_group.silenced = True + alert_group.silenced_at = now + alert_group.silenced_until = silenced_until + alert_group.silenced_by_user = user + if not silence_for_period: + alert_group.is_escalation_finished = True + if alert_group.response_time is None: + alert_group.response_time = alert_group._get_response_time() + fields_to_update = [ + "acknowledged", + "acknowledged_at", + "acknowledged_by_user", + "acknowledged_by", + "resolved", + "resolved_at", + "resolved_by_user", + "resolved_by", + "silenced", + "silenced_at", + "silenced_until", + "silenced_by_user", + "is_escalation_finished", + "response_time", + ] + AlertGroup.all_objects.bulk_update(alert_groups_to_silence_list, fields=fields_to_update, batch_size=100) + + # create log records for alert_group in alert_groups_to_unresolve_before_silence_list: alert_group.log_records.create( type=AlertGroupLogRecord.TYPE_UN_RESOLVED, @@ -1416,7 +1581,13 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. reason="Bulk action silence", ) - for alert_group in alert_groups_to_silence_list: + for alert_group, previous_state in zip(alert_groups_to_silence_list, previous_states): + # update metrics cache + alert_group._update_metrics( + organization_id=user.organization_id, + previous_state=previous_state, + state=AlertGroupState.SILENCED, + ) log_record = alert_group.log_records.create( type=AlertGroupLogRecord.TYPE_SILENCE, author=user, @@ -1498,7 +1669,12 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. for k, v in kwargs.items(): setattr(self, k, v) - self.save(update_fields=["acknowledged", "acknowledged_at", *kwargs.keys()]) + update_fields = ["acknowledged", "acknowledged_at", *kwargs.keys()] + if self.response_time is None: + self.response_time = self._get_response_time() + update_fields += ["response_time"] + + self.save(update_fields=update_fields) def unacknowledge(self): self.un_silence() @@ -1518,7 +1694,12 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. for k, v in kwargs.items(): setattr(self, k, v) - self.save(update_fields=["resolved", "resolved_at", "is_open_for_grouping", *kwargs.keys()]) + update_fields = ["resolved", "resolved_at", "is_open_for_grouping", *kwargs.keys()] + if self.response_time is None: + self.response_time = self._get_response_time() + update_fields += ["response_time"] + + self.save(update_fields=update_fields) def unresolve(self): self.unacknowledge() @@ -1538,7 +1719,12 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. for k, v in kwargs.items(): setattr(self, k, v) - self.save(update_fields=["silenced", *kwargs.keys()]) + update_fields = ["silenced", *kwargs.keys()] + if self.response_time is None: + self.response_time = self._get_response_time() + update_fields += ["response_time"] + + self.save(update_fields=update_fields) def un_silence(self): self.silenced_until = None @@ -1546,8 +1732,16 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. self.silenced_at = None self.silenced = False self.unsilence_task_uuid = None + self.restarted_at = timezone.now() self.save( - update_fields=["silenced_until", "silenced", "silenced_by_user", "silenced_at", "unsilence_task_uuid"] + update_fields=[ + "silenced_until", + "silenced", + "silenced_by_user", + "silenced_at", + "unsilence_task_uuid", + "restarted_at", + ] ) def archive(self): @@ -1621,13 +1815,13 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. @property def state(self): if self.resolved: - return "resolved" + return AlertGroupState.RESOLVED elif self.acknowledged: - return "acknowledged" + return AlertGroupState.ACKNOWLEDGED elif self.silenced: - return "silenced" + return AlertGroupState.SILENCED else: - return "new" + return AlertGroupState.FIRING @property def notify_in_slack_enabled(self): @@ -1675,3 +1869,15 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. ) return stop_escalation_log + + +@receiver(post_save, sender=AlertGroup) +def listen_for_alertgroup_model_save(sender, instance, created, *args, **kwargs): + if created and not instance.is_maintenance_incident: + # Update alert group state and response time metrics cache + instance._update_metrics( + organization_id=instance.channel.organization_id, previous_state=None, state=AlertGroupState.FIRING + ) + + +post_save.connect(listen_for_alertgroup_model_save, AlertGroup) diff --git a/engine/apps/alerts/models/alert_receive_channel.py b/engine/apps/alerts/models/alert_receive_channel.py index 41df3b0e..f06a3ddc 100644 --- a/engine/apps/alerts/models/alert_receive_channel.py +++ b/engine/apps/alerts/models/alert_receive_channel.py @@ -23,6 +23,11 @@ from apps.base.messaging import get_messaging_backend_from_id from apps.base.utils import live_settings from apps.integrations.metadata import heartbeat from apps.integrations.tasks import create_alert, create_alertmanager_alerts +from apps.metrics_exporter.helpers import ( + metrics_add_integration_to_cache, + metrics_remove_deleted_integration_from_cache, + metrics_update_integration_cache, +) from apps.slack.constants import SLACK_RATE_LIMIT_DELAY, SLACK_RATE_LIMIT_TIMEOUT from apps.slack.tasks import post_slack_rate_limit_message from apps.slack.utils import post_message_to_channel @@ -256,7 +261,15 @@ class AlertReceiveChannel(IntegrationOptionsMixin, MaintainableObject): def grafana_alerting_sync_manager(self): return GrafanaAlertingSyncManager(self) - @property + @cached_property + def team_name(self): + return self.team.name if self.team else "No team" + + @cached_property + def team_id_or_no_team(self): + return self.team_id if self.team else "no_team" + + @cached_property def emojized_verbal_name(self): return emoji.emojize(self.verbal_name, use_aliases=True) @@ -615,6 +628,13 @@ def listen_for_alertreceivechannel_model_save(sender, instance, created, *args, heartbeat = IntegrationHeartBeat.objects.create(alert_receive_channel=instance, timeout_seconds=TEN_MINUTES) write_resource_insight_log(instance=heartbeat, author=instance.author, event=EntityEvent.CREATED) + metrics_add_integration_to_cache(instance) + + elif instance.deleted_at: + metrics_remove_deleted_integration_from_cache(instance) + else: + metrics_update_integration_cache(instance) + if instance.integration == AlertReceiveChannel.INTEGRATION_GRAFANA_ALERTING: if created: instance.grafana_alerting_sync_manager.create_contact_points() diff --git a/engine/apps/alerts/tasks/unsilence.py b/engine/apps/alerts/tasks/unsilence.py index 3f110929..626f437e 100644 --- a/engine/apps/alerts/tasks/unsilence.py +++ b/engine/apps/alerts/tasks/unsilence.py @@ -30,8 +30,16 @@ def unsilence_task(alert_group_pk): ) return if alert_group.status == AlertGroup.SILENCED and alert_group.is_root_alert_group: + initial_state = alert_group.state task_logger.info(f"unsilence alert_group {alert_group_pk} and start escalation if needed") + alert_group.un_silence() + # update metrics + alert_group._update_metrics( + organization_id=alert_group.channel.organization_id, + previous_state=initial_state, + state=alert_group.state, + ) alert_group.start_escalation_if_needed() un_silence_log_record = AlertGroupLogRecord( type=AlertGroupLogRecord.TYPE_UN_SILENCE, diff --git a/engine/apps/alerts/tests/test_silence.py b/engine/apps/alerts/tests/test_silence.py index b4e0d4fc..d72d8602 100644 --- a/engine/apps/alerts/tests/test_silence.py +++ b/engine/apps/alerts/tests/test_silence.py @@ -69,3 +69,4 @@ def test_unsilence_alert_group( assert alert_group.silenced_at is None assert alert_group.silenced_until is None assert alert_group.silenced_by_user is None + assert alert_group.restarted_at is not None diff --git a/engine/apps/metrics_exporter/__init__.py b/engine/apps/metrics_exporter/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/engine/apps/metrics_exporter/constants.py b/engine/apps/metrics_exporter/constants.py new file mode 100644 index 00000000..5d235987 --- /dev/null +++ b/engine/apps/metrics_exporter/constants.py @@ -0,0 +1,51 @@ +import typing + +from django.utils import timezone + + +class AlertGroupsTotalMetricsDict(typing.TypedDict): + integration_name: str + team_name: str + team_id: int + org_id: int + slug: str + id: int + firing: int + acknowledged: int + silenced: int + resolved: int + + +class AlertGroupsResponseTimeMetricsDict(typing.TypedDict): + integration_name: str + team_name: str + team_id: int + org_id: int + slug: str + id: int + response_time: list + + +class RecalculateMetricsTimer(typing.TypedDict): + recalculate_timeout: int + forced_started: bool + + +class RecalculateOrgMetricsDict(typing.TypedDict): + organization_id: int + force: bool + + +ALERT_GROUPS_TOTAL = "oncall_alert_groups_total" +ALERT_GROUPS_RESPONSE_TIME = "oncall_alert_groups_response_time_seconds" + +METRICS_RESPONSE_TIME_CALCULATION_PERIOD = timezone.timedelta(days=7) + +METRICS_CACHE_LIFETIME = 93600 # 26 hours. Should be higher than METRICS_RECALCULATE_CACHE_TIMEOUT + +METRICS_CACHE_TIMER = "metrics_cache_timer" +METRICS_RECALCULATION_CACHE_TIMEOUT = 86400 # 24 hours. Should be less than METRICS_CACHE_LIFETIME +METRICS_RECALCULATION_CACHE_TIMEOUT_DISPERSE = (0, 3600) # 1 hour + +METRICS_ORGANIZATIONS_IDS = "metrics_organizations_ids" +METRICS_ORGANIZATIONS_IDS_CACHE_TIMEOUT = 3600 # 1 hour diff --git a/engine/apps/metrics_exporter/helpers.py b/engine/apps/metrics_exporter/helpers.py new file mode 100644 index 00000000..b24c9d37 --- /dev/null +++ b/engine/apps/metrics_exporter/helpers.py @@ -0,0 +1,227 @@ +import random +import typing + +from django.apps import apps +from django.core.cache import cache +from django.utils import timezone + +from apps.alerts.constants import AlertGroupState +from apps.metrics_exporter.constants import ( + ALERT_GROUPS_RESPONSE_TIME, + ALERT_GROUPS_TOTAL, + METRICS_CACHE_LIFETIME, + METRICS_CACHE_TIMER, + METRICS_ORGANIZATIONS_IDS, + METRICS_ORGANIZATIONS_IDS_CACHE_TIMEOUT, + METRICS_RECALCULATION_CACHE_TIMEOUT, + METRICS_RECALCULATION_CACHE_TIMEOUT_DISPERSE, + METRICS_RESPONSE_TIME_CALCULATION_PERIOD, + AlertGroupsResponseTimeMetricsDict, + AlertGroupsTotalMetricsDict, +) + + +def get_organization_ids(): + """Try getting organizations ids from cache, otherwise get from db and save values in cache""" + Organization = apps.get_model("user_management", "Organization") + organizations_ids = cache.get(METRICS_ORGANIZATIONS_IDS, []) + if not organizations_ids: + organizations_ids = Organization.objects.all().values_list("id", flat=True) + organizations_ids = list(organizations_ids) + cache.set(organizations_ids, METRICS_ORGANIZATIONS_IDS, METRICS_ORGANIZATIONS_IDS_CACHE_TIMEOUT) + return organizations_ids + + +def get_response_time_period(): + """Returns period for response time calculation""" + return timezone.now() - METRICS_RESPONSE_TIME_CALCULATION_PERIOD + + +def get_metrics_recalculation_timeout(): + """ + Returns timeout when metrics should be recalculated. + Add some dispersion to avoid starting recalculation tasks for all organizations at the same time. + """ + return METRICS_RECALCULATION_CACHE_TIMEOUT + random.randint(*METRICS_RECALCULATION_CACHE_TIMEOUT_DISPERSE) + + +def get_metrics_cache_timeout(organization_id): + metrics_cache_timer_key = get_metrics_cache_timer_key(organization_id) + metrics_cache_timer = cache.get(metrics_cache_timer_key) + if metrics_cache_timer: + TWO_HOURS = 7200 + metrics_cache_timeout = int(metrics_cache_timer.get("recalculate_timeout")) + TWO_HOURS + else: + metrics_cache_timeout = METRICS_CACHE_LIFETIME + return metrics_cache_timeout + + +def get_metrics_cache_timer_key(organization_id): + return f"{METRICS_CACHE_TIMER}_{organization_id}" + + +def get_metrics_cache_timer_for_organization(organization_id): + key = get_metrics_cache_timer_key(organization_id) + return cache.get(key) + + +def get_metric_alert_groups_total_key(organization_id): + return f"{ALERT_GROUPS_TOTAL}_{organization_id}" + + +def get_metric_alert_groups_response_time_key(organization_id): + return f"{ALERT_GROUPS_RESPONSE_TIME}_{organization_id}" + + +def metrics_update_integration_cache(integration): + """Update integration data in metrics cache""" + metrics_cache_timeout = get_metrics_cache_timeout(integration.organization_id) + metric_alert_groups_total_key = get_metric_alert_groups_total_key(integration.organization_id) + metric_alert_groups_response_time_key = get_metric_alert_groups_response_time_key(integration.organization_id) + + for metric_key in [metric_alert_groups_total_key, metric_alert_groups_response_time_key]: + metric_cache = cache.get(metric_key, {}) + integration_metric_cache = metric_cache.get(integration.id) + if integration_metric_cache: + cache_updated = False + if integration_metric_cache["team_id"] != integration.team_id_or_no_team: + integration_metric_cache["team_id"] = integration.team_id_or_no_team + integration_metric_cache["team_name"] = integration.team_name + cache_updated = True + if integration_metric_cache["integration_name"] != integration.emojized_verbal_name: + integration_metric_cache["integration_name"] = integration.emojized_verbal_name + cache_updated = True + if cache_updated: + cache.set(metric_key, metric_cache, timeout=metrics_cache_timeout) + + +def metrics_remove_deleted_integration_from_cache(integration): + """Remove data related to deleted integration from metrics cache""" + metrics_cache_timeout = get_metrics_cache_timeout(integration.organization_id) + metric_alert_groups_total_key = get_metric_alert_groups_total_key(integration.organization_id) + metric_alert_groups_response_time_key = get_metric_alert_groups_response_time_key(integration.organization_id) + + for metric_key in [metric_alert_groups_total_key, metric_alert_groups_response_time_key]: + metric_cache = cache.get(metric_key) + if metric_cache: + metric_cache.pop(integration.id, None) + cache.set(metric_key, metric_cache, timeout=metrics_cache_timeout) + + +def metrics_add_integration_to_cache(integration): + """Add new integration data to metrics cache""" + metrics_cache_timeout = get_metrics_cache_timeout(integration.organization_id) + metric_alert_groups_total_key = get_metric_alert_groups_total_key(integration.organization_id) + + instance_slug = integration.organization.stack_slug + instance_id = integration.organization.stack_id + grafana_org_id = integration.organization.org_id + metric_alert_groups_total: typing.Dict[int, AlertGroupsTotalMetricsDict] = cache.get( + metric_alert_groups_total_key, {} + ) + metric_alert_groups_total.setdefault( + integration.id, + { + "integration_name": integration.emojized_verbal_name, + "team_name": integration.team_name, + "team_id": integration.team_id_or_no_team, + "org_id": grafana_org_id, + "slug": instance_slug, + "id": instance_id, + AlertGroupState.FIRING.value: 0, + AlertGroupState.ACKNOWLEDGED.value: 0, + AlertGroupState.RESOLVED.value: 0, + AlertGroupState.SILENCED.value: 0, + }, + ) + cache.set(metric_alert_groups_total_key, metric_alert_groups_total, timeout=metrics_cache_timeout) + + metric_alert_groups_response_time_key = get_metric_alert_groups_response_time_key(integration.organization_id) + metric_alert_groups_response_time: typing.Dict[int, AlertGroupsResponseTimeMetricsDict] = cache.get( + metric_alert_groups_response_time_key, {} + ) + metric_alert_groups_response_time.setdefault( + integration.id, + { + "integration_name": integration.emojized_verbal_name, + "team_name": integration.team_name, + "team_id": integration.team_id_or_no_team, + "org_id": grafana_org_id, + "slug": instance_slug, + "id": instance_id, + "response_time": [], + }, + ) + cache.set(metric_alert_groups_response_time_key, metric_alert_groups_response_time, timeout=metrics_cache_timeout) + + +def metrics_bulk_update_team_label_cache(teams_updated_data, organization_id): + """Update team related data in metrics cache for each team in `teams_updated_data`""" + if not teams_updated_data: + return + metrics_cache_timeout = get_metrics_cache_timeout(organization_id) + metric_alert_groups_total_key = get_metric_alert_groups_total_key(organization_id) + metric_alert_groups_response_time_key = get_metric_alert_groups_response_time_key(organization_id) + + metric_alert_groups_total = cache.get(metric_alert_groups_total_key, {}) + metric_alert_groups_response_time = cache.get(metric_alert_groups_response_time_key, {}) + for team_id, team_data in teams_updated_data.items(): + for integration_id in metric_alert_groups_total: + if metric_alert_groups_total[integration_id]["team_id"] == team_id: + integration_response_time_metrics = metric_alert_groups_response_time.get(integration_id) + if team_data["deleted"]: + metric_alert_groups_total[integration_id]["team_id"] = "no_team" + metric_alert_groups_total[integration_id]["team_name"] = "No team" + if integration_response_time_metrics: + integration_response_time_metrics["team_id"] = "no_team" + integration_response_time_metrics["team_name"] = "No team" + else: + metric_alert_groups_total[integration_id]["team_name"] = team_data["team_name"] + if integration_response_time_metrics: + integration_response_time_metrics["team_name"] = team_data["team_name"] + + cache.set(metric_alert_groups_total_key, metric_alert_groups_total, timeout=metrics_cache_timeout) + cache.set(metric_alert_groups_response_time_key, metric_alert_groups_response_time, timeout=metrics_cache_timeout) + + +def metrics_update_alert_groups_state_cache(states_diff, organization_id): + """Update alert groups state metric cache for each integration in states_diff dict.""" + if not states_diff: + return + + metrics_cache_timeout = get_metrics_cache_timeout(organization_id) + metric_alert_groups_total_key = get_metric_alert_groups_total_key(organization_id) + metric_alert_groups_total = cache.get(metric_alert_groups_total_key, {}) + if not metric_alert_groups_total: + return + for integration_id, integration_states_diff in states_diff.items(): + integration_alert_groups = metric_alert_groups_total.get(int(integration_id)) + if not integration_alert_groups: + continue + for previous_state, counter in integration_states_diff["previous_states"].items(): + if integration_alert_groups[previous_state] - counter > 0: + integration_alert_groups[previous_state] -= counter + else: + integration_alert_groups[previous_state] = 0 + for new_state, counter in integration_states_diff["new_states"].items(): + integration_alert_groups[new_state] += counter + + cache.set(metric_alert_groups_total_key, metric_alert_groups_total, timeout=metrics_cache_timeout) + + +def metrics_update_alert_groups_response_time_cache(integrations_response_time, organization_id): + """Update alert groups response time metric cache for each integration in `integrations_response_time` dict.""" + if not integrations_response_time: + return + + metrics_cache_timeout = get_metrics_cache_timeout(organization_id) + metric_alert_groups_response_time_key = get_metric_alert_groups_response_time_key(organization_id) + metric_alert_groups_response_time = cache.get(metric_alert_groups_response_time_key, {}) + if not metric_alert_groups_response_time: + return + for integration_id, integration_response_time in integrations_response_time.items(): + integration_response_time_metrics = metric_alert_groups_response_time.get(int(integration_id)) + if not integration_response_time_metrics: + continue + integration_response_time_metrics["response_time"].extend(integration_response_time) + cache.set(metric_alert_groups_response_time_key, metric_alert_groups_response_time, timeout=metrics_cache_timeout) diff --git a/engine/apps/metrics_exporter/metrics_cache_manager.py b/engine/apps/metrics_exporter/metrics_cache_manager.py new file mode 100644 index 00000000..1b6f4a0e --- /dev/null +++ b/engine/apps/metrics_exporter/metrics_cache_manager.py @@ -0,0 +1,86 @@ +from apps.alerts.constants import AlertGroupState +from apps.metrics_exporter.helpers import ( + get_response_time_period, + metrics_update_alert_groups_response_time_cache, + metrics_update_alert_groups_state_cache, +) + + +class MetricsCacheManager: + @staticmethod + def get_default_teams_diff_dict(): + default_dict = { + "team_name": None, + "deleted": False, + } + return default_dict + + @staticmethod + def update_team_diff(teams_diff, team_id, new_name=None, deleted=False): + teams_diff.setdefault(team_id, MetricsCacheManager.get_default_teams_diff_dict()) + teams_diff[team_id]["team_name"] = new_name + teams_diff[team_id]["deleted"] = deleted + return teams_diff + + @staticmethod + def get_default_states_diff_dict(): + default_dict = { + "previous_states": {state.value: 0 for state in AlertGroupState}, + "new_states": {state.value: 0 for state in AlertGroupState}, + } + return default_dict + + @staticmethod + def update_integration_states_diff(metrics_dict, integration_id, previous_state=None, new_state=None): + metrics_dict.setdefault(integration_id, MetricsCacheManager.get_default_states_diff_dict()) + if previous_state: + state_value = previous_state.value + metrics_dict[integration_id]["previous_states"][state_value] += 1 + if new_state: + state_value = new_state.value + metrics_dict[integration_id]["new_states"][state_value] += 1 + return metrics_dict + + @staticmethod + def update_integration_response_time_diff(metrics_dict, integration_id, response_time_seconds): + metrics_dict.setdefault(integration_id, []) + metrics_dict[integration_id].append(response_time_seconds) + return metrics_dict + + @staticmethod + def metrics_update_state_cache_for_alert_group(integration_id, organization_id, old_state=None, new_state=None): + """ + Update state metric cache for one alert group. + Run the task to update async if organization_id is None due to an additional request to db + """ + metrics_state_diff = MetricsCacheManager.update_integration_states_diff( + {}, integration_id, previous_state=old_state, new_state=new_state + ) + metrics_update_alert_groups_state_cache(metrics_state_diff, organization_id) + + @staticmethod + def metrics_update_response_time_cache_for_alert_group(integration_id, organization_id, response_time_seconds): + """ + Update response time metric cache for one alert group. + Run the task to update async if organization_id is None due to an additional request to db + """ + metrics_response_time = MetricsCacheManager.update_integration_response_time_diff( + {}, integration_id, response_time_seconds + ) + metrics_update_alert_groups_response_time_cache(metrics_response_time, organization_id) + + @staticmethod + def metrics_update_cache_for_alert_group( + integration_id, organization_id, old_state=None, new_state=None, response_time=None, started_at=None + ): + """Call methods to update state and response time metrics cache for one alert group.""" + + if response_time and old_state == AlertGroupState.FIRING and started_at > get_response_time_period(): + response_time_seconds = int(response_time.total_seconds()) + MetricsCacheManager.metrics_update_response_time_cache_for_alert_group( + integration_id, organization_id, response_time_seconds + ) + if old_state or new_state: + MetricsCacheManager.metrics_update_state_cache_for_alert_group( + integration_id, organization_id, old_state, new_state + ) diff --git a/engine/apps/metrics_exporter/metrics_collectors.py b/engine/apps/metrics_exporter/metrics_collectors.py new file mode 100644 index 00000000..67dcf2c2 --- /dev/null +++ b/engine/apps/metrics_exporter/metrics_collectors.py @@ -0,0 +1,122 @@ +import typing + +from django.core.cache import cache +from prometheus_client import CollectorRegistry +from prometheus_client.metrics_core import GaugeMetricFamily, HistogramMetricFamily + +from apps.alerts.constants import AlertGroupState +from apps.metrics_exporter.constants import ( + ALERT_GROUPS_RESPONSE_TIME, + ALERT_GROUPS_TOTAL, + AlertGroupsResponseTimeMetricsDict, + AlertGroupsTotalMetricsDict, + RecalculateOrgMetricsDict, +) +from apps.metrics_exporter.helpers import ( + get_metric_alert_groups_response_time_key, + get_metric_alert_groups_total_key, + get_metrics_cache_timer_for_organization, + get_organization_ids, +) +from apps.metrics_exporter.tasks import start_calculate_and_cache_metrics + +application_metrics_registry = CollectorRegistry() + + +# https://github.com/prometheus/client_python#custom-collectors +class ApplicationMetricsCollector: + def __init__(self): + self._buckets = (60, 300, 600, 3600, "+Inf") + self._labels = [ + "integration", + "team", + "org_id", + "slug", + "id", + ] + self._labels_with_state = self._labels + ["state"] + + def collect(self): + alert_groups_total = GaugeMetricFamily(ALERT_GROUPS_TOTAL, "All alert groups", labels=self._labels_with_state) + alert_groups_response_time_seconds = HistogramMetricFamily( + ALERT_GROUPS_RESPONSE_TIME, "Users response time to alert groups in 7 days (seconds)", labels=self._labels + ) + + metrics_to_recalculate = [] + org_ids = get_organization_ids() + + for organization_id in org_ids: + start_calculation_task = False + # get alert_groups_total metric + alert_groups_total_key = get_metric_alert_groups_total_key(organization_id) + ag_states: typing.Dict[int, AlertGroupsTotalMetricsDict] = cache.get(alert_groups_total_key) + + if ag_states is None: + start_calculation_task = True + else: + for integration, integration_data in ag_states.items(): + # Labels values should have the same order as _labels + labels_values = [ + integration_data["integration_name"], # integration + integration_data["team_name"], # team + integration_data["org_id"], # org_id + integration_data["slug"], # grafana instance slug + integration_data["id"], # grafana instance id + ] + + labels_values = list(map(str, labels_values)) + for state in AlertGroupState: + alert_groups_total.add_metric(labels_values + [state.value], integration_data[state.value]) + + # get alert_groups_response_time metric + alert_groups_response_time_key = get_metric_alert_groups_response_time_key(organization_id) + ag_response_time: typing.Dict[int, AlertGroupsResponseTimeMetricsDict] = cache.get( + alert_groups_response_time_key + ) + if ag_response_time is None: + start_calculation_task = True + else: + for integration, integration_data in ag_response_time.items(): + # Labels values should have the same order as _labels + labels_values = [ + integration_data["integration_name"], # integration + integration_data["team_name"], # team + integration_data["org_id"], # org_id + integration_data["slug"], # grafana instance slug + integration_data["id"], # grafana instance id + ] + labels_values = list(map(str, labels_values)) + + response_time_values = integration_data["response_time"] + if not response_time_values: + continue + buckets, sum_value = self.get_buckets_with_sum(response_time_values) + buckets = sorted(list(buckets.items()), key=lambda x: float(x[0])) + alert_groups_response_time_seconds.add_metric(labels_values, buckets=buckets, sum_value=sum_value) + + if start_calculation_task or not get_metrics_cache_timer_for_organization(organization_id): + org_to_recalculate: RecalculateOrgMetricsDict = { + "organization_id": organization_id, + "force": start_calculation_task, + } + metrics_to_recalculate.append(org_to_recalculate) + + if metrics_to_recalculate: + start_calculate_and_cache_metrics.apply_async((metrics_to_recalculate,)) + + yield alert_groups_total + yield alert_groups_response_time_seconds + + def get_buckets_with_sum(self, values): + """Put values in correct buckets and count values sum""" + buckets_values = {str(key): 0 for key in self._buckets} + sum_value = 0 + for value in values: + for bucket in self._buckets: + if value <= float(bucket): + buckets_values[str(bucket)] += 1.0 + sum_value += value + return buckets_values, sum_value + + +application_metrics_registry.register(ApplicationMetricsCollector()) diff --git a/engine/apps/metrics_exporter/tasks.py b/engine/apps/metrics_exporter/tasks.py new file mode 100644 index 00000000..0ff315b3 --- /dev/null +++ b/engine/apps/metrics_exporter/tasks.py @@ -0,0 +1,151 @@ +import typing + +from django.apps import apps +from django.conf import settings +from django.core.cache import cache +from django.db.models import Q + +from apps.alerts.constants import AlertGroupState +from apps.metrics_exporter.constants import ( + METRICS_ORGANIZATIONS_IDS, + METRICS_ORGANIZATIONS_IDS_CACHE_TIMEOUT, + AlertGroupsResponseTimeMetricsDict, + AlertGroupsTotalMetricsDict, + RecalculateMetricsTimer, + RecalculateOrgMetricsDict, +) +from apps.metrics_exporter.helpers import ( + get_metric_alert_groups_response_time_key, + get_metric_alert_groups_total_key, + get_metrics_cache_timer_key, + get_metrics_recalculation_timeout, + get_response_time_period, +) +from apps.user_management.models import Organization +from common.custom_celery_tasks import shared_dedicated_queue_retry_task +from common.database import get_random_readonly_database_key_if_present_otherwise_default + + +@shared_dedicated_queue_retry_task( + autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None +) +def save_organizations_ids_in_cache(): + organizations_ids = Organization.objects.all().values_list("id", flat=True) + organizations_ids = list(organizations_ids) + cache.set(organizations_ids, METRICS_ORGANIZATIONS_IDS, METRICS_ORGANIZATIONS_IDS_CACHE_TIMEOUT) + + +@shared_dedicated_queue_retry_task( + autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None +) +def start_calculate_and_cache_metrics(metrics_to_recalculate: list[RecalculateOrgMetricsDict]): + """Start calculation metrics for each object in metrics_to_recalculate""" + for counter, recalculation_data in enumerate(metrics_to_recalculate): + # start immediately if recalculation starting has been forced + countdown = 0 if recalculation_data.get("force") else counter + calculate_and_cache_metrics.apply_async(kwargs=recalculation_data, countdown=countdown) + + +@shared_dedicated_queue_retry_task( + autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None +) +def calculate_and_cache_metrics(organization_id, force=False): + """ + Calculate metrics for organization. + Before calculation checks if calculation has already been started to avoid too frequent launch or parallel launch + """ + AlertGroup = apps.get_model("alerts", "AlertGroup") + AlertReceiveChannel = apps.get_model("alerts", "AlertReceiveChannel") + ONE_HOUR = 3600 + TWO_HOURS = 7200 + + recalculate_timeout = get_metrics_recalculation_timeout() + + # check if recalculation has been already started + metrics_cache_timer_key = get_metrics_cache_timer_key(organization_id) + metrics_cache_timer = cache.get(metrics_cache_timer_key) + if metrics_cache_timer: + if not force or metrics_cache_timer.get("forced_started", False): + return + else: + metrics_cache_timer["forced_started"] = True + else: + metrics_cache_timer: RecalculateMetricsTimer = { + "recalculate_timeout": recalculate_timeout, + "forced_started": force, + } + + metrics_cache_timer["recalculate_timeout"] = recalculate_timeout + cache.set(metrics_cache_timer_key, metrics_cache_timer, timeout=recalculate_timeout) + + integrations = ( + AlertReceiveChannel.objects.using(get_random_readonly_database_key_if_present_otherwise_default()) + .filter( + ~Q(integration=AlertReceiveChannel.INTEGRATION_MAINTENANCE) + & Q(organization__deleted_at__isnull=True) + & Q(organization_id=organization_id) + ) + .select_related("organization", "team") + .prefetch_related("alert_groups") + ) + + response_time_period = get_response_time_period() + + metric_alert_group_total: typing.Dict[int, AlertGroupsTotalMetricsDict] = {} + metric_alert_group_response_time: typing.Dict[int, AlertGroupsResponseTimeMetricsDict] = {} + + states = { + AlertGroupState.FIRING.value: AlertGroup.get_new_state_filter(), + AlertGroupState.SILENCED.value: AlertGroup.get_silenced_state_filter(), + AlertGroupState.ACKNOWLEDGED.value: AlertGroup.get_acknowledged_state_filter(), + AlertGroupState.RESOLVED.value: AlertGroup.get_resolved_state_filter(), + } + + for integration in integrations: + instance_slug = integration.organization.stack_slug + instance_id = integration.organization.stack_id + # calculate states + for state, alert_group_filter in states.items(): + metric_alert_group_total.setdefault( + integration.id, + { + "integration_name": integration.emojized_verbal_name, + "team_name": integration.team_name, + "team_id": integration.team_id_or_no_team, + "org_id": integration.organization.org_id, + "slug": instance_slug, + "id": instance_id, + }, + )[state] = integration.alert_groups.filter(alert_group_filter).count() + + # calculate response time + all_response_time = [] + alert_groups = integration.alert_groups.filter(started_at__gte=response_time_period) + for alert_group in alert_groups: + if alert_group.response_time: + all_response_time.append(int(alert_group.response_time.total_seconds())) + elif alert_group.state != AlertGroupState.FIRING: + # get calculated value from current alert group information + response_time = alert_group._get_response_time() + if response_time: + all_response_time.append(int(response_time.total_seconds())) + + metric_alert_group_response_time[integration.id] = { + "integration_name": integration.emojized_verbal_name, + "team_name": integration.team_name, + "team_id": integration.team_id_or_no_team, + "org_id": integration.organization.org_id, + "slug": instance_slug, + "id": instance_id, + "response_time": all_response_time, + } + + metric_alert_groups_total_key = get_metric_alert_groups_total_key(organization_id) + metric_alert_groups_response_time_key = get_metric_alert_groups_response_time_key(organization_id) + + metrics_cache_timeout = recalculate_timeout + TWO_HOURS + cache.set(metric_alert_groups_total_key, metric_alert_group_total, timeout=metrics_cache_timeout) + cache.set(metric_alert_groups_response_time_key, metric_alert_group_response_time, timeout=metrics_cache_timeout) + if metrics_cache_timer["forced_started"]: + metrics_cache_timer["forced_started"] = False + cache.set(metrics_cache_timer_key, metrics_cache_timer, timeout=recalculate_timeout - ONE_HOUR) diff --git a/engine/apps/metrics_exporter/tests/__init__.py b/engine/apps/metrics_exporter/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/engine/apps/metrics_exporter/tests/conftest.py b/engine/apps/metrics_exporter/tests/conftest.py new file mode 100644 index 00000000..9ebc3a29 --- /dev/null +++ b/engine/apps/metrics_exporter/tests/conftest.py @@ -0,0 +1,100 @@ +import pytest +from django.core.cache import cache + +from apps.metrics_exporter.constants import ALERT_GROUPS_RESPONSE_TIME, ALERT_GROUPS_TOTAL +from apps.metrics_exporter.helpers import get_metric_alert_groups_response_time_key, get_metric_alert_groups_total_key + +METRICS_TEST_INTEGRATION_NAME = "Test integration" +METRICS_TEST_ORG_ID = 123 # random number +METRICS_TEST_INSTANCE_SLUG = "test_instance" +METRICS_TEST_INSTANCE_ID = 292 # random number + + +@pytest.fixture() +def mock_cache_get_metrics_for_collector(monkeypatch): + def _mock_cache_get(key, *args, **kwargs): + if key.startswith(ALERT_GROUPS_TOTAL): + key = ALERT_GROUPS_TOTAL + elif key.startswith(ALERT_GROUPS_RESPONSE_TIME): + key = ALERT_GROUPS_RESPONSE_TIME + test_metrics = { + ALERT_GROUPS_TOTAL: { + 1: { + "integration_name": "Test metrics integration", + "team_name": "Test team", + "team_id": 1, + "org_id": 1, + "slug": "Test stack", + "id": 1, + "firing": 2, + "acknowledged": 3, + "silenced": 4, + "resolved": 5, + } + }, + ALERT_GROUPS_RESPONSE_TIME: { + 1: { + "integration_name": "Test metrics integration", + "team_name": "Test team", + "team_id": 1, + "org_id": 1, + "slug": "Test stack", + "id": 1, + "response_time": [2, 10, 200, 650], + } + }, + } + return test_metrics.get(key) + + monkeypatch.setattr(cache, "get", _mock_cache_get) + + +@pytest.fixture() +def mock_get_metrics_cache(monkeypatch): + def _mock_cache_get(key, *args, **kwargs): + return {} + + monkeypatch.setattr(cache, "get", _mock_cache_get) + + +@pytest.fixture +def make_metrics_cache_params(monkeypatch): + def _make_cache_params(integration_id, organization_id, team_name=None, team_id=None): + team_name = team_name or "No team" + team_id = team_id or "no_team" + metric_alert_groups_total_key = get_metric_alert_groups_total_key(organization_id) + metric_alert_groups_response_time_key = get_metric_alert_groups_response_time_key(organization_id) + + def cache_get(key, *args, **kwargs): + metrics_data = { + metric_alert_groups_response_time_key: { + integration_id: { + "integration_name": METRICS_TEST_INTEGRATION_NAME, + "team_name": team_name, + "team_id": team_id, + "org_id": METRICS_TEST_ORG_ID, + "slug": METRICS_TEST_INSTANCE_SLUG, + "id": METRICS_TEST_INSTANCE_ID, + "response_time": [], + } + }, + metric_alert_groups_total_key: { + integration_id: { + "integration_name": METRICS_TEST_INTEGRATION_NAME, + "team_name": team_name, + "team_id": team_id, + "org_id": METRICS_TEST_ORG_ID, + "slug": METRICS_TEST_INSTANCE_SLUG, + "id": METRICS_TEST_INSTANCE_ID, + "firing": 0, + "acknowledged": 0, + "silenced": 0, + "resolved": 0, + } + }, + } + return metrics_data.get(key, {}) + + return cache_get + + return _make_cache_params diff --git a/engine/apps/metrics_exporter/tests/test_calculation_metrics.py b/engine/apps/metrics_exporter/tests/test_calculation_metrics.py new file mode 100644 index 00000000..b992a340 --- /dev/null +++ b/engine/apps/metrics_exporter/tests/test_calculation_metrics.py @@ -0,0 +1,115 @@ +from unittest.mock import patch + +import pytest + +from apps.metrics_exporter.helpers import ( + get_metric_alert_groups_response_time_key, + get_metric_alert_groups_total_key, + get_metrics_cache_timer_key, +) +from apps.metrics_exporter.tasks import calculate_and_cache_metrics + + +@patch("apps.alerts.models.alert_group.MetricsCacheManager.metrics_update_state_cache_for_alert_group") +@pytest.mark.django_db +def test_calculate_and_cache_metrics_task( + mocked_update_state_cache, + make_organization, + make_user_for_organization, + make_team, + make_alert_receive_channel, + make_alert_group, + make_alert, +): + METRICS_RESPONSE_TIME_LEN = 3 # 1 for each alert group with changed state (acked, resolved, silenced) + organization = make_organization() + team = make_team(organization) + + alert_receive_channel_1 = make_alert_receive_channel(organization) + alert_receive_channel_2 = make_alert_receive_channel(organization, team=team) + for alert_receive_channel in [alert_receive_channel_1, alert_receive_channel_2]: + for _ in range(2): + alert_group = make_alert_group(alert_receive_channel) + make_alert(alert_group=alert_group, raw_request_data={}) + + alert_group_to_ack = make_alert_group(alert_receive_channel) + make_alert(alert_group=alert_group_to_ack, raw_request_data={}) + alert_group_to_ack.acknowledge() + + alert_group_to_res = make_alert_group(alert_receive_channel) + make_alert(alert_group=alert_group_to_res, raw_request_data={}) + alert_group_to_res.resolve() + + alert_group_to_sil = make_alert_group(alert_receive_channel) + make_alert(alert_group=alert_group_to_sil, raw_request_data={}) + alert_group_to_sil.silence() + + metrics_cache_timer_key = get_metrics_cache_timer_key(organization.id) + metric_alert_groups_total_key = get_metric_alert_groups_total_key(organization.id) + metric_alert_groups_response_time_key = get_metric_alert_groups_response_time_key(organization.id) + + expected_result_metric_alert_groups_total = { + alert_receive_channel_1.id: { + "integration_name": alert_receive_channel_1.verbal_name, + "team_name": "No team", + "team_id": "no_team", + "org_id": organization.org_id, + "slug": organization.stack_slug, + "id": organization.stack_id, + "firing": 2, + "silenced": 1, + "acknowledged": 1, + "resolved": 1, + }, + alert_receive_channel_2.id: { + "integration_name": alert_receive_channel_2.verbal_name, + "team_name": team.name, + "team_id": team.id, + "org_id": organization.org_id, + "slug": organization.stack_slug, + "id": organization.stack_id, + "firing": 2, + "silenced": 1, + "acknowledged": 1, + "resolved": 1, + }, + } + expected_result_metric_alert_groups_response_time = { + alert_receive_channel_1.id: { + "integration_name": alert_receive_channel_1.verbal_name, + "team_name": "No team", + "team_id": "no_team", + "org_id": organization.org_id, + "slug": organization.stack_slug, + "id": organization.stack_id, + "response_time": [], + }, + alert_receive_channel_2.id: { + "integration_name": alert_receive_channel_2.verbal_name, + "team_name": team.name, + "team_id": team.id, + "org_id": organization.org_id, + "slug": organization.stack_slug, + "id": organization.stack_id, + "response_time": [], + }, + } + + with patch("apps.metrics_exporter.tasks.cache.set") as mock_cache_set: + calculate_and_cache_metrics(organization.id) + args = mock_cache_set.call_args_list + assert args[0].args[0] == metrics_cache_timer_key + + # check alert_groups_total metric cache + metric_alert_groups_total_values = args[1].args + assert metric_alert_groups_total_values[0] == metric_alert_groups_total_key + assert metric_alert_groups_total_values[1] == expected_result_metric_alert_groups_total + + # check alert_groups_response_time metric cache + metric_alert_groups_response_time_values = args[2].args + assert metric_alert_groups_response_time_values[0] == metric_alert_groups_response_time_key + for integration_id, values in metric_alert_groups_response_time_values[1].items(): + assert len(values["response_time"]) == METRICS_RESPONSE_TIME_LEN + # set response time to expected result because it is calculated on fly + expected_result_metric_alert_groups_response_time[integration_id]["response_time"] = values["response_time"] + assert metric_alert_groups_response_time_values[1] == expected_result_metric_alert_groups_response_time diff --git a/engine/apps/metrics_exporter/tests/test_metrics_collectors.py b/engine/apps/metrics_exporter/tests/test_metrics_collectors.py new file mode 100644 index 00000000..6f17d015 --- /dev/null +++ b/engine/apps/metrics_exporter/tests/test_metrics_collectors.py @@ -0,0 +1,33 @@ +from unittest.mock import patch + +import pytest +from prometheus_client import CollectorRegistry, generate_latest + +from apps.alerts.constants import AlertGroupState +from apps.metrics_exporter.constants import ALERT_GROUPS_RESPONSE_TIME, ALERT_GROUPS_TOTAL +from apps.metrics_exporter.metrics_collectors import ApplicationMetricsCollector + + +@patch("apps.metrics_exporter.metrics_collectors.get_organization_ids", return_value=[1]) +@patch("apps.metrics_exporter.metrics_collectors.start_calculate_and_cache_metrics.apply_async") +@pytest.mark.django_db +def test_application_metrics_collector( + mocked_org_ids, mocked_start_calculate_and_cache_metrics, mock_cache_get_metrics_for_collector +): + """Test that ApplicationMetricsCollector generates expected metrics from cache""" + collector = ApplicationMetricsCollector() + test_metrics_registry = CollectorRegistry() + test_metrics_registry.register(collector) + for metric in test_metrics_registry.collect(): + if metric.name == ALERT_GROUPS_TOTAL: + # integration with labels for each alert group state + assert len(metric.samples) == len(AlertGroupState) + elif metric.name == ALERT_GROUPS_RESPONSE_TIME: + # integration with labels for each value in collector's bucket + _count and _sum histogram values + assert len(metric.samples) == len(collector._buckets) + 2 + result = generate_latest(test_metrics_registry).decode("utf-8") + assert result is not None + assert mocked_org_ids.called + # Since there is no recalculation timer for test org in cache, start_calculate_and_cache_metrics must be called + assert mocked_start_calculate_and_cache_metrics.called + test_metrics_registry.unregister(collector) diff --git a/engine/apps/metrics_exporter/tests/test_update_metics_cache.py b/engine/apps/metrics_exporter/tests/test_update_metics_cache.py new file mode 100644 index 00000000..d9ee2e0a --- /dev/null +++ b/engine/apps/metrics_exporter/tests/test_update_metics_cache.py @@ -0,0 +1,452 @@ +from unittest.mock import patch + +import pytest +from django.core.cache import cache +from django.test import override_settings + +from apps.metrics_exporter.helpers import ( + get_metric_alert_groups_response_time_key, + get_metric_alert_groups_total_key, + metrics_bulk_update_team_label_cache, +) +from apps.metrics_exporter.metrics_cache_manager import MetricsCacheManager +from apps.metrics_exporter.tests.conftest import ( + METRICS_TEST_INSTANCE_ID, + METRICS_TEST_INSTANCE_SLUG, + METRICS_TEST_INTEGRATION_NAME, + METRICS_TEST_ORG_ID, +) + + +@patch("apps.alerts.models.alert_group_log_record.send_update_log_report_signal.apply_async") +@patch("apps.alerts.models.alert_group.alert_group_action_triggered_signal.send") +@pytest.mark.django_db +@override_settings(CELERY_TASK_ALWAYS_EAGER=True) +def test_update_metric_alert_groups_total_cache_on_action( + mocked_send_log_signal, + mocked_action_signal_send, + make_organization, + make_user_for_organization, + make_alert_receive_channel, + make_alert_group, + make_alert, + make_metrics_cache_params, + monkeypatch, +): + organization = make_organization( + org_id=METRICS_TEST_ORG_ID, + stack_slug=METRICS_TEST_INSTANCE_SLUG, + stack_id=METRICS_TEST_INSTANCE_ID, + ) + user = make_user_for_organization(organization) + alert_receive_channel = make_alert_receive_channel(organization, verbal_name=METRICS_TEST_INTEGRATION_NAME) + + metric_alert_groups_total_key = get_metric_alert_groups_total_key(organization.id) + + expected_result_metric_alert_groups_total = { + alert_receive_channel.id: { + "integration_name": alert_receive_channel.verbal_name, + "team_name": "No team", + "team_id": "no_team", + "org_id": organization.org_id, + "slug": organization.stack_slug, + "id": organization.stack_id, + "firing": 0, + "silenced": 0, + "acknowledged": 0, + "resolved": 0, + } + } + + expected_result_firing = { + "firing": 1, + "silenced": 0, + "acknowledged": 0, + "resolved": 0, + } + + expected_result_acked = { + "firing": 0, + "silenced": 0, + "acknowledged": 1, + "resolved": 0, + } + + expected_result_resolved = { + "firing": 0, + "silenced": 0, + "acknowledged": 0, + "resolved": 1, + } + + expected_result_silenced = { + "firing": 0, + "silenced": 1, + "acknowledged": 0, + "resolved": 0, + } + + metrics_cache = make_metrics_cache_params(alert_receive_channel.id, organization.id) + monkeypatch.setattr(cache, "get", metrics_cache) + + def get_called_arg_index_and_compare_results(update_expected_result): + """find index for the metric argument, that was set in cache""" + for idx, called_arg in enumerate(mock_cache_set_called_args): + if idx >= arg_idx and called_arg.args[0] == metric_alert_groups_total_key: + expected_result_metric_alert_groups_total[alert_receive_channel.id].update(update_expected_result) + assert called_arg.args[1] == expected_result_metric_alert_groups_total + return idx + 1 + raise AssertionError + + with patch("apps.metrics_exporter.tasks.cache.set") as mock_cache_set: + arg_idx = 0 + alert_group = make_alert_group(alert_receive_channel) + make_alert(alert_group=alert_group, raw_request_data={}) + + # check alert_groups_total metric cache, get called args + mock_cache_set_called_args = mock_cache_set.call_args_list + arg_idx = get_called_arg_index_and_compare_results(expected_result_firing) + + alert_group.acknowledge_by_user(user) + arg_idx = get_called_arg_index_and_compare_results(expected_result_acked) + + alert_group.un_acknowledge_by_user(user) + arg_idx = get_called_arg_index_and_compare_results(expected_result_firing) + + alert_group.resolve_by_user(user) + arg_idx = get_called_arg_index_and_compare_results(expected_result_resolved) + + alert_group.un_resolve_by_user(user) + arg_idx = get_called_arg_index_and_compare_results(expected_result_firing) + + alert_group.silence_by_user(user, silence_delay=None) + arg_idx = get_called_arg_index_and_compare_results(expected_result_silenced) + + alert_group.un_silence_by_user(user) + get_called_arg_index_and_compare_results(expected_result_firing) + + +@patch("apps.alerts.models.alert_group_log_record.send_update_log_report_signal.apply_async") +@patch("apps.alerts.models.alert_group.alert_group_action_triggered_signal.send") +@pytest.mark.django_db +@override_settings(CELERY_TASK_ALWAYS_EAGER=True) +def test_update_metric_alert_groups_response_time_cache_on_action( + mocked_send_log_signal, + mocked_action_signal_send, + make_organization, + make_user_for_organization, + make_alert_receive_channel, + make_alert_group, + make_alert, + monkeypatch, + make_metrics_cache_params, +): + organization = make_organization( + org_id=METRICS_TEST_ORG_ID, + stack_slug=METRICS_TEST_INSTANCE_SLUG, + stack_id=METRICS_TEST_INSTANCE_ID, + ) + user = make_user_for_organization(organization) + alert_receive_channel = make_alert_receive_channel(organization, verbal_name=METRICS_TEST_INTEGRATION_NAME) + + metric_alert_groups_response_time_key = get_metric_alert_groups_response_time_key(organization.id) + + expected_result_metric_alert_groups_response_time = { + alert_receive_channel.id: { + "integration_name": alert_receive_channel.verbal_name, + "team_name": "No team", + "team_id": "no_team", + "org_id": organization.org_id, + "slug": organization.stack_slug, + "id": organization.stack_id, + "response_time": [], + } + } + + metrics_cache = make_metrics_cache_params(alert_receive_channel.id, organization.id) + monkeypatch.setattr(cache, "get", metrics_cache) + + def get_called_arg_index_and_compare_results(): + """find index for related to the metric argument, that was set in cache""" + for idx, called_arg in enumerate(mock_cache_set_called_args): + if idx >= arg_idx and called_arg.args[0] == metric_alert_groups_response_time_key: + response_time_values = called_arg.args[1][alert_receive_channel.id]["response_time"] + expected_result_metric_alert_groups_response_time[alert_receive_channel.id].update( + {"response_time": response_time_values} + ) + # response time values len always will be 1 here since cache is mocked and refreshed on every call + assert len(response_time_values) == 1 + assert called_arg.args[1] == expected_result_metric_alert_groups_response_time + return idx + 1 + raise AssertionError + + def assert_cache_was_not_changed_by_response_time_metric(): + for idx, called_arg in enumerate(mock_cache_set_called_args): + if idx >= arg_idx and called_arg.args[0] == metric_alert_groups_response_time_key: + raise AssertionError + + with patch("apps.metrics_exporter.tasks.cache.set") as mock_cache_set: + arg_idx = 0 + alert_group_1, alert_group_2, alert_group_3 = [make_alert_group(alert_receive_channel) for _ in range(3)] + for alert_group in [alert_group_1, alert_group_2, alert_group_3]: + make_alert(alert_group=alert_group, raw_request_data={}) + + # check alert_groups_response_time metric cache, get called args + mock_cache_set_called_args = mock_cache_set.call_args_list + # alert_groups_response_time cache shouldn't be updated on create alert group + assert_cache_was_not_changed_by_response_time_metric() + + alert_group_1.acknowledge_by_user(user) + arg_idx = get_called_arg_index_and_compare_results() + + # assert that only the first action counts + alert_group_1.un_acknowledge_by_user(user) + assert_cache_was_not_changed_by_response_time_metric() + + alert_group_1.resolve_by_user(user) + assert_cache_was_not_changed_by_response_time_metric() + + alert_group_1.un_resolve_by_user(user) + assert_cache_was_not_changed_by_response_time_metric() + + alert_group_1.silence_by_user(user, silence_delay=None) + assert_cache_was_not_changed_by_response_time_metric() + + alert_group_1.un_silence_by_user(user) + assert_cache_was_not_changed_by_response_time_metric() + + # check that response_time cache updates on other actions with other alert groups + alert_group_2.resolve_by_user(user) + arg_idx = get_called_arg_index_and_compare_results() + + alert_group_3.silence_by_user(user, silence_delay=None) + get_called_arg_index_and_compare_results() + + +@pytest.mark.django_db +def test_update_metrics_cache_on_update_integration( + make_organization, + make_user_for_organization, + make_alert_receive_channel_with_post_save_signal, + make_team, + make_metrics_cache_params, + monkeypatch, + mock_get_metrics_cache, +): + organization = make_organization( + org_id=METRICS_TEST_ORG_ID, + stack_slug=METRICS_TEST_INSTANCE_SLUG, + stack_id=METRICS_TEST_INSTANCE_ID, + ) + team = make_team(organization) + + metric_alert_groups_total_key = get_metric_alert_groups_total_key(organization.id) + metric_alert_groups_response_time_key = get_metric_alert_groups_response_time_key(organization.id) + + expected_result_updated_team = { + "team_name": team.name, + "team_id": team.id, + } + + expected_result_updated_name = {"integration_name": "Renamed test integration"} + + def get_called_arg_index_and_compare_results(): + """find index for related to the metric argument, that was set in cache""" + is_set_metric_alert_groups_total_cache = False + is_set_metric_alert_groups_response_time_cache = False + for idx, called_arg in enumerate(mock_cache_set_called_args): + if idx >= arg_idx and called_arg.args[0] == metric_alert_groups_total_key: + assert called_arg.args[1] == expected_result_metric_alert_groups_total + is_set_metric_alert_groups_total_cache = True + elif idx >= arg_idx and called_arg.args[0] == metric_alert_groups_response_time_key: + assert called_arg.args[1] == expected_result_metric_alert_groups_response_time + is_set_metric_alert_groups_response_time_cache = True + if is_set_metric_alert_groups_total_cache and is_set_metric_alert_groups_response_time_cache: + return idx + 1 + raise AssertionError + + with patch("apps.metrics_exporter.tasks.cache.set") as mock_cache_set: + arg_idx = 0 + # check cache update on create integration + alert_receive_channel = make_alert_receive_channel_with_post_save_signal( + organization, verbal_name=METRICS_TEST_INTEGRATION_NAME + ) + + expected_result_metric_alert_groups_total = { + alert_receive_channel.id: { + "integration_name": METRICS_TEST_INTEGRATION_NAME, + "team_name": "No team", + "team_id": "no_team", + "org_id": organization.org_id, + "slug": organization.stack_slug, + "id": organization.stack_id, + "firing": 0, + "silenced": 0, + "acknowledged": 0, + "resolved": 0, + } + } + expected_result_metric_alert_groups_response_time = { + alert_receive_channel.id: { + "integration_name": METRICS_TEST_INTEGRATION_NAME, + "team_name": "No team", + "team_id": "no_team", + "org_id": organization.org_id, + "slug": organization.stack_slug, + "id": organization.stack_id, + "response_time": [], + } + } + + mock_cache_set_called_args = mock_cache_set.call_args_list + arg_idx = get_called_arg_index_and_compare_results() + + metrics_cache = make_metrics_cache_params(alert_receive_channel.id, organization.id) + monkeypatch.setattr(cache, "get", metrics_cache) + + # check cache update on update integration's team + alert_receive_channel.team = team + # clear cached_property + del alert_receive_channel.team_name + del alert_receive_channel.team_id_or_no_team + + alert_receive_channel.save() + for expected_result in [ + expected_result_metric_alert_groups_total, + expected_result_metric_alert_groups_response_time, + ]: + expected_result[alert_receive_channel.id].update(expected_result_updated_team) + arg_idx = get_called_arg_index_and_compare_results() + + # check cache update on update integration's name + alert_receive_channel.refresh_from_db() + alert_receive_channel.verbal_name = expected_result_updated_name["integration_name"] + # clear cached_property + del alert_receive_channel.emojized_verbal_name + alert_receive_channel.save() + + for expected_result in [ + expected_result_metric_alert_groups_total, + expected_result_metric_alert_groups_response_time, + ]: + expected_result[alert_receive_channel.id].update(expected_result_updated_name) + arg_idx = get_called_arg_index_and_compare_results() + + # check cache update on update integration's name + alert_receive_channel.refresh_from_db() + alert_receive_channel.verbal_name = expected_result_updated_name["integration_name"] + # clear cached_property + del alert_receive_channel.emojized_verbal_name + alert_receive_channel.save() + + for expected_result in [ + expected_result_metric_alert_groups_total, + expected_result_metric_alert_groups_response_time, + ]: + expected_result[alert_receive_channel.id].update(expected_result_updated_name) + arg_idx = get_called_arg_index_and_compare_results() + + # check cache update on delete integration + alert_receive_channel.refresh_from_db() + alert_receive_channel.delete() + + for expected_result in [ + expected_result_metric_alert_groups_total, + expected_result_metric_alert_groups_response_time, + ]: + expected_result.pop(alert_receive_channel.id) + get_called_arg_index_and_compare_results() + + +@pytest.mark.django_db +def test_update_metrics_cache_on_update_team( + make_organization, + make_user_for_organization, + make_alert_receive_channel, + make_team, + make_metrics_cache_params, + monkeypatch, + mock_get_metrics_cache, +): + organization = make_organization( + org_id=METRICS_TEST_ORG_ID, + stack_slug=METRICS_TEST_INSTANCE_SLUG, + stack_id=METRICS_TEST_INSTANCE_ID, + ) + team = make_team(organization) + alert_receive_channel = make_alert_receive_channel( + organization, verbal_name=METRICS_TEST_INTEGRATION_NAME, team=team + ) + metrics_cache = make_metrics_cache_params(alert_receive_channel.id, organization.id, team.name, team.id) + monkeypatch.setattr(cache, "get", metrics_cache) + + metric_alert_groups_total_key = get_metric_alert_groups_total_key(organization.id) + metric_alert_groups_response_time_key = get_metric_alert_groups_response_time_key(organization.id) + + new_team_name = "Test team renamed" + + expected_result_metric_alert_groups_total = { + alert_receive_channel.id: { + "integration_name": METRICS_TEST_INTEGRATION_NAME, + "team_name": new_team_name, + "team_id": team.id, + "org_id": organization.org_id, + "slug": organization.stack_slug, + "id": organization.stack_id, + "firing": 0, + "silenced": 0, + "acknowledged": 0, + "resolved": 0, + } + } + expected_result_metric_alert_groups_response_time = { + alert_receive_channel.id: { + "integration_name": METRICS_TEST_INTEGRATION_NAME, + "team_name": new_team_name, + "team_id": team.id, + "org_id": organization.org_id, + "slug": organization.stack_slug, + "id": organization.stack_id, + "response_time": [], + } + } + + expected_result_delete_team = { + "team_name": "No team", + "team_id": "no_team", + } + + def get_called_arg_index_and_compare_results(): + """find index for related to the metric argument, that was set in cache""" + is_set_metric_alert_groups_total_cache = False + is_set_metric_alert_groups_response_time_cache = False + for idx, called_arg in enumerate(mock_cache_set_called_args): + if idx >= arg_idx and called_arg.args[0] == metric_alert_groups_total_key: + assert called_arg.args[1] == expected_result_metric_alert_groups_total + is_set_metric_alert_groups_total_cache = True + elif idx >= arg_idx and called_arg.args[0] == metric_alert_groups_response_time_key: + assert called_arg.args[1] == expected_result_metric_alert_groups_response_time + is_set_metric_alert_groups_response_time_cache = True + if is_set_metric_alert_groups_total_cache and is_set_metric_alert_groups_response_time_cache: + return idx + 1 + raise AssertionError + + team.name = new_team_name + team.save() + + with patch("apps.metrics_exporter.tasks.cache.set") as mock_cache_set: + arg_idx = 0 + mock_cache_set_called_args = mock_cache_set.call_args_list + + metrics_team_to_update = MetricsCacheManager.update_team_diff({}, team.id, new_name=new_team_name) + metrics_bulk_update_team_label_cache(metrics_team_to_update, organization.id) + arg_idx = get_called_arg_index_and_compare_results() + + metrics_team_to_update = MetricsCacheManager.update_team_diff({}, team.id, deleted=True) + metrics_bulk_update_team_label_cache(metrics_team_to_update, organization.id) + for expected_result in [ + expected_result_metric_alert_groups_total, + expected_result_metric_alert_groups_response_time, + ]: + expected_result[alert_receive_channel.id].update(expected_result_delete_team) + get_called_arg_index_and_compare_results() diff --git a/engine/apps/metrics_exporter/urls.py b/engine/apps/metrics_exporter/urls.py new file mode 100644 index 00000000..4768cd79 --- /dev/null +++ b/engine/apps/metrics_exporter/urls.py @@ -0,0 +1,7 @@ +from django.urls import path + +from .views import MetricsExporterView + +urlpatterns = [ + path("", MetricsExporterView.as_view()), +] diff --git a/engine/apps/metrics_exporter/views.py b/engine/apps/metrics_exporter/views.py new file mode 100644 index 00000000..7a03de78 --- /dev/null +++ b/engine/apps/metrics_exporter/views.py @@ -0,0 +1,11 @@ +from django.http import HttpResponse +from prometheus_client import generate_latest +from rest_framework.views import APIView + +from .metrics_collectors import application_metrics_registry + + +class MetricsExporterView(APIView): + def get(self, request): + result = generate_latest(application_metrics_registry).decode("utf-8") + return HttpResponse(result, content_type="text/plain; version=0.0.4; charset=utf-8") diff --git a/engine/apps/user_management/models/team.py b/engine/apps/user_management/models/team.py index 46f98ad7..769c5baa 100644 --- a/engine/apps/user_management/models/team.py +++ b/engine/apps/user_management/models/team.py @@ -2,6 +2,8 @@ from django.conf import settings from django.core.validators import MinLengthValidator from django.db import models +from apps.metrics_exporter.helpers import metrics_bulk_update_team_label_cache +from apps.metrics_exporter.metrics_cache_manager import MetricsCacheManager from common.public_primary_keys import generate_public_primary_key, increase_public_primary_key_length @@ -43,6 +45,13 @@ class TeamManager(models.Manager): team_ids_to_delete = existing_team_ids - grafana_teams.keys() organization.teams.filter(team_id__in=team_ids_to_delete).delete() + # collect teams diffs to update metrics cache + metrics_teams_to_update = {} + for team_id in team_ids_to_delete: + metrics_teams_to_update = MetricsCacheManager.update_team_diff( + metrics_teams_to_update, team_id, deleted=True + ) + # update existing teams if any fields have changed teams_to_update = [] for team in organization.teams.filter(team_id__in=existing_team_ids): @@ -52,12 +61,19 @@ class TeamManager(models.Manager): or team.email != grafana_team["email"] or team.avatar_url != grafana_team["avatarUrl"] ): + if team.name != grafana_team["name"]: + # collect teams diffs to update metrics cache + metrics_teams_to_update = MetricsCacheManager.update_team_diff( + metrics_teams_to_update, team.id, new_name=grafana_team["name"] + ) team.name = grafana_team["name"] team.email = grafana_team["email"] team.avatar_url = grafana_team["avatarUrl"] teams_to_update.append(team) organization.teams.bulk_update(teams_to_update, ["name", "email", "avatar_url"], batch_size=5000) + metrics_bulk_update_team_label_cache(metrics_teams_to_update, organization.id) + class Team(models.Model): public_primary_key = models.CharField( diff --git a/engine/conftest.py b/engine/conftest.py index c12fbec5..ea523f88 100644 --- a/engine/conftest.py +++ b/engine/conftest.py @@ -440,6 +440,17 @@ def make_alert_receive_channel(): return _make_alert_receive_channel +@pytest.fixture +def make_alert_receive_channel_with_post_save_signal(): + def _make_alert_receive_channel(organization, **kwargs): + if "integration" not in kwargs: + kwargs["integration"] = AlertReceiveChannel.INTEGRATION_GRAFANA + alert_receive_channel = AlertReceiveChannelFactory(organization=organization, **kwargs) + return alert_receive_channel + + return _make_alert_receive_channel + + @pytest.fixture def make_channel_filter(): def _make_channel_filter(alert_receive_channel, filtering_term=None, **kwargs): diff --git a/engine/requirements.txt b/engine/requirements.txt index 1270f5da..75a1eb8c 100644 --- a/engine/requirements.txt +++ b/engine/requirements.txt @@ -55,3 +55,4 @@ django-deprecate-fields==0.1.1 pymdown-extensions==10.0 requests==2.31.0 urllib3==1.26.15 +prometheus_client==0.16.0 diff --git a/engine/settings/base.py b/engine/settings/base.py index aa4e6d67..60005f0a 100644 --- a/engine/settings/base.py +++ b/engine/settings/base.py @@ -220,6 +220,7 @@ INSTALLED_APPS = [ "apps.public_api", "apps.grafana_plugin", "apps.webhooks", + "apps.metrics_exporter", "corsheaders", "debug_toolbar", "social_django", @@ -482,6 +483,10 @@ CELERY_BEAT_SCHEDULE = { "conditionally_send_going_oncall_push_notifications_for_all_schedules": { "task": "apps.mobile_app.tasks.conditionally_send_going_oncall_push_notifications_for_all_schedules", "schedule": 10 * 60, + }, + "save_organizations_ids_in_cache": { + "task": "apps.metrics_exporter.tasks.save_organizations_ids_in_cache", + "schedule": 60 * 30, "args": (), }, } diff --git a/engine/settings/prod_without_db.py b/engine/settings/prod_without_db.py index 6ad5d692..7c33383d 100644 --- a/engine/settings/prod_without_db.py +++ b/engine/settings/prod_without_db.py @@ -57,6 +57,8 @@ CELERY_TASK_ROUTES = { "apps.heartbeat.tasks.integration_heartbeat_checkup": {"queue": "default"}, "apps.heartbeat.tasks.process_heartbeat_task": {"queue": "default"}, "apps.heartbeat.tasks.restore_heartbeat_tasks": {"queue": "default"}, + "apps.metrics_exporter.tasks.start_calculate_and_cache_metrics": {"queue": "default"}, + "apps.metrics_exporter.tasks.save_organizations_ids_in_cache": {"queue": "default"}, "apps.schedules.tasks.refresh_ical_files.refresh_ical_file": {"queue": "default"}, "apps.schedules.tasks.refresh_ical_files.start_refresh_ical_files": {"queue": "default"}, "apps.schedules.tasks.notify_about_gaps_in_schedule.check_empty_shifts_in_schedule": {"queue": "default"}, @@ -116,6 +118,7 @@ CELERY_TASK_ROUTES = { "apps.grafana_plugin.tasks.sync.start_cleanup_deleted_organizations": {"queue": "long"}, "apps.grafana_plugin.tasks.sync.start_sync_organizations": {"queue": "long"}, "apps.grafana_plugin.tasks.sync.sync_organization_async": {"queue": "long"}, + "apps.metrics_exporter.tasks.calculate_and_cache_metrics": {"queue": "long"}, # SLACK "apps.integrations.tasks.notify_about_integration_ratelimit_in_slack": {"queue": "slack"}, "apps.slack.helpers.alert_group_representative.on_alert_group_action_triggered_async": {"queue": "slack"},