Move new alert group metric creation into async task (#3451)
# What this PR does Moving metrics creation into separate task to make alert ingestion more robust ## Which issue(s) this PR fixes ## Checklist - [ ] Unit, integration, and e2e (if applicable) tests updated - [ ] Documentation added (or `pr:no public docs` PR label added if not required) - [ ] `CHANGELOG.md` updated (or `pr:no changelog` PR label added if not required) --------- Co-authored-by: Julia <ferril.darkdiver@gmail.com>
This commit is contained in:
parent
455f74560c
commit
2fdd885abe
9 changed files with 78 additions and 35 deletions
|
|
@ -11,8 +11,6 @@ from django.conf import settings
|
|||
from django.core.validators import MinLengthValidator
|
||||
from django.db import IntegrityError, models, transaction
|
||||
from django.db.models import JSONField, Q, QuerySet
|
||||
from django.db.models.signals import post_save
|
||||
from django.dispatch import receiver
|
||||
from django.utils import timezone
|
||||
from django.utils.functional import cached_property
|
||||
|
||||
|
|
@ -24,7 +22,7 @@ from apps.alerts.incident_appearance.renderers.slack_renderer import AlertGroupS
|
|||
from apps.alerts.incident_log_builder import IncidentLogBuilder
|
||||
from apps.alerts.signals import alert_group_action_triggered_signal, alert_group_created_signal
|
||||
from apps.alerts.tasks import acknowledge_reminder_task, send_alert_group_signal, unsilence_task
|
||||
from apps.metrics_exporter.metrics_cache_manager import MetricsCacheManager
|
||||
from apps.metrics_exporter.tasks import update_metrics_for_alert_group
|
||||
from apps.slack.slack_formatter import SlackFormatter
|
||||
from apps.user_management.models import User
|
||||
from common.public_primary_keys import generate_public_primary_key, increase_public_primary_key_length
|
||||
|
|
@ -598,18 +596,7 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
|
|||
|
||||
def _update_metrics(self, organization_id, previous_state, state):
|
||||
"""Update metrics cache for response time and state as needed."""
|
||||
updated_response_time = self.response_time
|
||||
if previous_state != AlertGroupState.FIRING or self.restarted_at:
|
||||
# only consider response time from the first action
|
||||
updated_response_time = None
|
||||
MetricsCacheManager.metrics_update_cache_for_alert_group(
|
||||
self.channel_id,
|
||||
organization_id=organization_id,
|
||||
old_state=previous_state,
|
||||
new_state=state,
|
||||
response_time=updated_response_time,
|
||||
started_at=self.started_at,
|
||||
)
|
||||
update_metrics_for_alert_group.apply_async((self.id, organization_id, previous_state, state))
|
||||
|
||||
def acknowledge_by_user(self, user: User, action_source: typing.Optional[ActionSource] = None) -> None:
|
||||
from apps.alerts.models import AlertGroupLogRecord
|
||||
|
|
@ -1935,15 +1922,3 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
|
|||
"""
|
||||
count = self.alerts.all()[: max_alerts + 1].count()
|
||||
return count > max_alerts
|
||||
|
||||
|
||||
@receiver(post_save, sender=AlertGroup)
|
||||
def listen_for_alertgroup_model_save(sender, instance, created, *args, **kwargs):
|
||||
if created and not instance.is_maintenance_incident:
|
||||
# Update alert group state and response time metrics cache
|
||||
instance._update_metrics(
|
||||
organization_id=instance.channel.organization_id, previous_state=None, state=AlertGroupState.FIRING
|
||||
)
|
||||
|
||||
|
||||
post_save.connect(listen_for_alertgroup_model_save, AlertGroup)
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ from kombu.utils.uuid import uuid as celery_uuid
|
|||
from apps.alerts.constants import NEXT_ESCALATION_DELAY
|
||||
from apps.alerts.signals import user_notification_action_triggered_signal
|
||||
from apps.base.messaging import get_messaging_backend_from_id
|
||||
from apps.metrics_exporter.helpers import metrics_update_user_cache
|
||||
from apps.metrics_exporter.tasks import update_metrics_for_user
|
||||
from apps.phone_notifications.phone_backend import PhoneBackend
|
||||
from common.custom_celery_tasks import shared_dedicated_queue_retry_task
|
||||
|
||||
|
|
@ -188,7 +188,7 @@ def notify_user_task(
|
|||
alert_group_id=alert_group_pk,
|
||||
).exists()
|
||||
):
|
||||
metrics_update_user_cache(user)
|
||||
update_metrics_for_user.apply_async((user.id,))
|
||||
|
||||
log_record.save()
|
||||
if notify_user_task.request.retries == 0:
|
||||
|
|
|
|||
8
engine/apps/metrics_exporter/apps.py
Normal file
8
engine/apps/metrics_exporter/apps.py
Normal file
|
|
@ -0,0 +1,8 @@
|
|||
from django.apps import AppConfig
|
||||
|
||||
|
||||
class MetricsExporterConfig(AppConfig):
|
||||
name = "apps.metrics_exporter"
|
||||
|
||||
def ready(self):
|
||||
from . import signals # noqa: F401
|
||||
|
|
@ -43,10 +43,10 @@ class MetricsCacheManager:
|
|||
def update_integration_states_diff(metrics_dict, integration_id, previous_state=None, new_state=None):
|
||||
metrics_dict.setdefault(integration_id, MetricsCacheManager.get_default_states_diff_dict())
|
||||
if previous_state:
|
||||
state_value = previous_state.value
|
||||
state_value = previous_state
|
||||
metrics_dict[integration_id]["previous_states"][state_value] += 1
|
||||
if new_state:
|
||||
state_value = new_state.value
|
||||
state_value = new_state
|
||||
metrics_dict[integration_id]["new_states"][state_value] += 1
|
||||
return metrics_dict
|
||||
|
||||
|
|
|
|||
14
engine/apps/metrics_exporter/signals.py
Normal file
14
engine/apps/metrics_exporter/signals.py
Normal file
|
|
@ -0,0 +1,14 @@
|
|||
from apps.alerts.constants import AlertGroupState
|
||||
from apps.alerts.signals import alert_group_created_signal
|
||||
|
||||
|
||||
def on_alert_group_created(**kwargs):
|
||||
alert_group = kwargs["alert_group"]
|
||||
if alert_group.is_maintenance_incident is True:
|
||||
return
|
||||
alert_group._update_metrics(
|
||||
organization_id=alert_group.channel.organization_id, previous_state=None, state=AlertGroupState.FIRING
|
||||
)
|
||||
|
||||
|
||||
alert_group_created_signal.connect(on_alert_group_created)
|
||||
|
|
@ -24,8 +24,9 @@ from apps.metrics_exporter.helpers import (
|
|||
get_organization_ids_from_db,
|
||||
get_response_time_period,
|
||||
is_allowed_to_start_metrics_calculation,
|
||||
metrics_update_user_cache,
|
||||
)
|
||||
from apps.user_management.models import User
|
||||
from apps.metrics_exporter.metrics_cache_manager import MetricsCacheManager
|
||||
from common.custom_celery_tasks import shared_dedicated_queue_retry_task
|
||||
from common.database import get_random_readonly_database_key_if_present_otherwise_default
|
||||
|
||||
|
|
@ -164,7 +165,7 @@ def calculate_and_cache_user_was_notified_metric(organization_id):
|
|||
Calculate metric "user_was_notified_of_alert_groups" for organization.
|
||||
"""
|
||||
from apps.base.models import UserNotificationPolicyLogRecord
|
||||
from apps.user_management.models import Organization
|
||||
from apps.user_management.models import Organization, User
|
||||
|
||||
TWO_HOURS = 7200
|
||||
|
||||
|
|
@ -209,3 +210,34 @@ def calculate_and_cache_user_was_notified_metric(organization_id):
|
|||
recalculate_timeout = get_metrics_recalculation_timeout()
|
||||
metrics_cache_timeout = recalculate_timeout + TWO_HOURS
|
||||
cache.set(metric_user_was_notified_key, metric_user_was_notified, timeout=metrics_cache_timeout)
|
||||
|
||||
|
||||
@shared_dedicated_queue_retry_task(
|
||||
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else 10
|
||||
)
|
||||
def update_metrics_for_alert_group(alert_group_id, organization_id, previous_state, new_state):
|
||||
from apps.alerts.models import AlertGroup
|
||||
|
||||
alert_group = AlertGroup.objects.get(pk=alert_group_id)
|
||||
updated_response_time = alert_group.response_time
|
||||
if previous_state != AlertGroupState.FIRING or alert_group.restarted_at:
|
||||
# only consider response time from the first action
|
||||
updated_response_time = None
|
||||
MetricsCacheManager.metrics_update_cache_for_alert_group(
|
||||
integration_id=alert_group.channel_id,
|
||||
organization_id=organization_id,
|
||||
old_state=previous_state,
|
||||
new_state=new_state,
|
||||
response_time=updated_response_time,
|
||||
started_at=alert_group.started_at,
|
||||
)
|
||||
|
||||
|
||||
@shared_dedicated_queue_retry_task(
|
||||
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else 10
|
||||
)
|
||||
def update_metrics_for_user(user_id):
|
||||
from apps.user_management.models import User
|
||||
|
||||
user = User.objects.get(id=user_id)
|
||||
metrics_update_user_cache(user)
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ from apps.metrics_exporter.helpers import (
|
|||
from apps.metrics_exporter.tasks import calculate_and_cache_metrics, calculate_and_cache_user_was_notified_metric
|
||||
|
||||
|
||||
@patch("apps.alerts.models.alert_group.MetricsCacheManager.metrics_update_state_cache_for_alert_group")
|
||||
@patch("apps.alerts.models.alert_group.update_metrics_for_alert_group.apply_async")
|
||||
@pytest.mark.django_db
|
||||
def test_calculate_and_cache_metrics_task(
|
||||
mocked_update_state_cache,
|
||||
|
|
@ -114,7 +114,7 @@ def test_calculate_and_cache_metrics_task(
|
|||
assert metric_alert_groups_response_time_values[1] == expected_result_metric_alert_groups_response_time
|
||||
|
||||
|
||||
@patch("apps.alerts.models.alert_group.MetricsCacheManager.metrics_update_state_cache_for_alert_group")
|
||||
@patch("apps.alerts.models.alert_group.update_metrics_for_alert_group.apply_async")
|
||||
@pytest.mark.django_db
|
||||
def test_calculate_and_cache_user_was_notified_metric_task(
|
||||
mocked_update_state_cache,
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import pytest
|
|||
from django.core.cache import cache
|
||||
from django.test import override_settings
|
||||
|
||||
from apps.alerts.signals import alert_group_created_signal
|
||||
from apps.alerts.tasks import notify_user_task
|
||||
from apps.base.models import UserNotificationPolicy, UserNotificationPolicyLogRecord
|
||||
from apps.metrics_exporter.helpers import (
|
||||
|
|
@ -22,6 +23,12 @@ from apps.metrics_exporter.tests.conftest import (
|
|||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_apply_async(monkeypatch):
|
||||
"""Override 'mock_apply_async' fixture"""
|
||||
return
|
||||
|
||||
|
||||
@patch("apps.alerts.models.alert_group_log_record.tasks.send_update_log_report_signal.apply_async")
|
||||
@patch("apps.alerts.models.alert_group.alert_group_action_triggered_signal.send")
|
||||
@pytest.mark.django_db
|
||||
|
|
@ -29,6 +36,7 @@ from apps.metrics_exporter.tests.conftest import (
|
|||
def test_update_metric_alert_groups_total_cache_on_action(
|
||||
mocked_send_log_signal,
|
||||
mocked_action_signal_send,
|
||||
mock_apply_async,
|
||||
make_organization,
|
||||
make_user_for_organization,
|
||||
make_alert_receive_channel,
|
||||
|
|
@ -106,6 +114,8 @@ def test_update_metric_alert_groups_total_cache_on_action(
|
|||
arg_idx = 0
|
||||
alert_group = make_alert_group(alert_receive_channel)
|
||||
make_alert(alert_group=alert_group, raw_request_data={})
|
||||
# this signal is normally called in get_or_create_grouping on create alert
|
||||
alert_group_created_signal.send(sender=alert_group.__class__, alert_group=alert_group)
|
||||
|
||||
# check alert_groups_total metric cache, get called args
|
||||
mock_cache_set_called_args = mock_cache_set.call_args_list
|
||||
|
|
@ -137,6 +147,7 @@ def test_update_metric_alert_groups_total_cache_on_action(
|
|||
def test_update_metric_alert_groups_response_time_cache_on_action(
|
||||
mocked_send_log_signal,
|
||||
mocked_action_signal_send,
|
||||
mock_apply_async,
|
||||
make_organization,
|
||||
make_user_for_organization,
|
||||
make_alert_receive_channel,
|
||||
|
|
@ -461,6 +472,7 @@ def test_update_metrics_cache_on_update_team(
|
|||
@pytest.mark.django_db
|
||||
def test_update_metrics_cache_on_user_notification(
|
||||
mocked_perform_notification_task,
|
||||
mock_apply_async,
|
||||
make_organization,
|
||||
make_user_for_organization,
|
||||
make_alert_receive_channel,
|
||||
|
|
|
|||
|
|
@ -16,6 +16,8 @@ CELERY_TASK_ROUTES = {
|
|||
"apps.labels.tasks.update_labels_cache": {"queue": "default"},
|
||||
"apps.labels.tasks.update_instances_labels_cache": {"queue": "default"},
|
||||
"apps.metrics_exporter.tasks.start_calculate_and_cache_metrics": {"queue": "default"},
|
||||
"apps.metrics_exporter.tasks.update_metrics_for_alert_group": {"queue": "default"},
|
||||
"apps.metrics_exporter.tasks.update_metrics_for_user": {"queue": "default"},
|
||||
"apps.metrics_exporter.tasks.start_recalculation_for_new_metric": {"queue": "default"},
|
||||
"apps.metrics_exporter.tasks.save_organizations_ids_in_cache": {"queue": "default"},
|
||||
"apps.mobile_app.tasks.new_shift_swap_request.notify_shift_swap_requests": {"queue": "default"},
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue