Prepare insight metrics structure for adding service_name label (#4227)

# What this PR does
Prepare insight metrics for adding `service_name` label.
This PR updates metrics cache structure, supporting both old and new
version of cache.
`service_name` label can be added with additional PR when all metric
cache is updated.

## Which issue(s) this PR closes
https://github.com/grafana/oncall-private/issues/2610

## Checklist

- [x] Unit, integration, and e2e (if applicable) tests updated
- [x] Documentation added (or `pr:no public docs` PR label added if not
required)
- [x] Added the relevant release notes label (see labels prefixed w/
`release:`). These labels dictate how your PR will
    show up in the autogenerated release notes.
This commit is contained in:
Yulya Artyukhina 2024-04-29 11:45:23 +02:00 committed by GitHub
parent 6ed7a1e3b8
commit d1085b718c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 699 additions and 124 deletions

View file

@ -4,6 +4,13 @@ import typing
from django.conf import settings
class AlertGroupStateDict(typing.TypedDict):
firing: int
acknowledged: int
silenced: int
resolved: int
class AlertGroupsTotalMetricsDict(typing.TypedDict):
integration_name: str
team_name: str
@ -11,10 +18,7 @@ class AlertGroupsTotalMetricsDict(typing.TypedDict):
org_id: int
slug: str
id: int
firing: int
acknowledged: int
silenced: int
resolved: int
services: typing.Dict[str, AlertGroupStateDict]
class AlertGroupsResponseTimeMetricsDict(typing.TypedDict):
@ -24,7 +28,7 @@ class AlertGroupsResponseTimeMetricsDict(typing.TypedDict):
org_id: int
slug: str
id: int
response_time: list
services: typing.Dict[str, list]
class UserWasNotifiedOfAlertGroupsMetricsDict(typing.TypedDict):
@ -61,3 +65,6 @@ METRICS_RECALCULATION_CACHE_TIMEOUT_DISPERSE = (0, 3600) # 1 hour
METRICS_ORGANIZATIONS_IDS = "metrics_organizations_ids"
METRICS_ORGANIZATIONS_IDS_CACHE_TIMEOUT = 3600 # 1 hour
SERVICE_LABEL = "service_name"
NO_SERVICE_VALUE = "No service"

View file

@ -16,8 +16,10 @@ from apps.metrics_exporter.constants import (
METRICS_RECALCULATION_CACHE_TIMEOUT,
METRICS_RECALCULATION_CACHE_TIMEOUT_DISPERSE,
METRICS_RESPONSE_TIME_CALCULATION_PERIOD,
NO_SERVICE_VALUE,
USER_WAS_NOTIFIED_OF_ALERT_GROUPS,
AlertGroupsResponseTimeMetricsDict,
AlertGroupStateDict,
AlertGroupsTotalMetricsDict,
RecalculateMetricsTimer,
UserWasNotifiedOfAlertGroupsMetricsDict,
@ -126,6 +128,15 @@ def get_metric_calculation_started_key(metric_name) -> str:
return f"calculation_started_for_{metric_name}"
def get_default_states_dict() -> AlertGroupStateDict:
return {
AlertGroupState.FIRING.value: 0,
AlertGroupState.ACKNOWLEDGED.value: 0,
AlertGroupState.RESOLVED.value: 0,
AlertGroupState.SILENCED.value: 0,
}
def metrics_update_integration_cache(integration: "AlertReceiveChannel") -> None:
"""Update integration data in metrics cache"""
metrics_cache_timeout = get_metrics_cache_timeout(integration.organization_id)
@ -185,10 +196,7 @@ def metrics_add_integrations_to_cache(integrations: list["AlertReceiveChannel"],
"org_id": grafana_org_id,
"slug": instance_slug,
"id": instance_id,
AlertGroupState.FIRING.value: 0,
AlertGroupState.ACKNOWLEDGED.value: 0,
AlertGroupState.RESOLVED.value: 0,
AlertGroupState.SILENCED.value: 0,
"services": {NO_SERVICE_VALUE: get_default_states_dict()},
},
)
cache.set(metric_alert_groups_total_key, metric_alert_groups_total, timeout=metrics_cache_timeout)
@ -208,13 +216,13 @@ def metrics_add_integrations_to_cache(integrations: list["AlertReceiveChannel"],
"org_id": grafana_org_id,
"slug": instance_slug,
"id": instance_id,
"response_time": [],
"services": {NO_SERVICE_VALUE: []},
},
)
cache.set(metric_alert_groups_response_time_key, metric_alert_groups_response_time, timeout=metrics_cache_timeout)
def metrics_bulk_update_team_label_cache(teams_updated_data, organization_id):
def metrics_bulk_update_team_label_cache(teams_updated_data: dict, organization_id: int):
"""Update team related data in metrics cache for each team in `teams_updated_data`"""
if not teams_updated_data:
return
@ -243,8 +251,29 @@ def metrics_bulk_update_team_label_cache(teams_updated_data, organization_id):
cache.set(metric_alert_groups_response_time_key, metric_alert_groups_response_time, timeout=metrics_cache_timeout)
def metrics_update_alert_groups_state_cache(states_diff, organization_id):
"""Update alert groups state metric cache for each integration in states_diff dict."""
def metrics_update_alert_groups_state_cache(states_diff: dict, organization_id: int):
"""
Update alert groups state metric cache for each integration in states_diff dict.
states_diff example:
{
<integration_id>: {
<service name>: {
"previous_states": {
firing: 1,
acknowledged: 0,
resolved: 0,
silenced: 0,
},
"new_states": {
firing: 0,
acknowledged: 1,
resolved: 0,
silenced: 0,
}
}
}
}
"""
if not states_diff:
return
@ -253,23 +282,40 @@ def metrics_update_alert_groups_state_cache(states_diff, organization_id):
metric_alert_groups_total = cache.get(metric_alert_groups_total_key, {})
if not metric_alert_groups_total:
return
for integration_id, integration_states_diff in states_diff.items():
for integration_id, service_data in states_diff.items():
integration_alert_groups = metric_alert_groups_total.get(int(integration_id))
if not integration_alert_groups:
continue
for previous_state, counter in integration_states_diff["previous_states"].items():
if integration_alert_groups[previous_state] - counter > 0:
integration_alert_groups[previous_state] -= counter
for service_name, service_state_diff in service_data.items():
if "services" in integration_alert_groups:
states_to_update = integration_alert_groups["services"].setdefault(
service_name, get_default_states_dict()
)
else:
integration_alert_groups[previous_state] = 0
for new_state, counter in integration_states_diff["new_states"].items():
integration_alert_groups[new_state] += counter
# support version of metrics cache without service name. This clause can be removed when all metrics
# cache is updated on prod (~2 days after release)
states_to_update = integration_alert_groups
for previous_state, counter in service_state_diff["previous_states"].items():
if states_to_update[previous_state] - counter > 0:
states_to_update[previous_state] -= counter
else:
states_to_update[previous_state] = 0
for new_state, counter in service_state_diff["new_states"].items():
states_to_update[new_state] += counter
cache.set(metric_alert_groups_total_key, metric_alert_groups_total, timeout=metrics_cache_timeout)
def metrics_update_alert_groups_response_time_cache(integrations_response_time, organization_id):
"""Update alert groups response time metric cache for each integration in `integrations_response_time` dict."""
def metrics_update_alert_groups_response_time_cache(integrations_response_time: dict, organization_id: int):
"""
Update alert groups response time metric cache for each integration in `integrations_response_time` dict.
integrations_response_time dict example:
{
<integration_id>: {
<service name>: [10],
}
}
"""
if not integrations_response_time:
return
@ -278,11 +324,18 @@ def metrics_update_alert_groups_response_time_cache(integrations_response_time,
metric_alert_groups_response_time = cache.get(metric_alert_groups_response_time_key, {})
if not metric_alert_groups_response_time:
return
for integration_id, integration_response_time in integrations_response_time.items():
for integration_id, service_data in integrations_response_time.items():
integration_response_time_metrics = metric_alert_groups_response_time.get(int(integration_id))
if not integration_response_time_metrics:
continue
integration_response_time_metrics["response_time"].extend(integration_response_time)
for service_name, response_time_values in service_data.items():
if "services" in integration_response_time_metrics:
integration_response_time_metrics["services"].setdefault(service_name, [])
integration_response_time_metrics["services"][service_name].extend(response_time_values)
else:
# support version of metrics cache without service name. This clause can be removed when all metrics
# cache is updated on prod (~2 days after release)
integration_response_time_metrics["response_time"].extend(response_time_values)
cache.set(metric_alert_groups_response_time_key, metric_alert_groups_response_time, timeout=metrics_cache_timeout)

View file

@ -40,56 +40,60 @@ class MetricsCacheManager:
return default_dict
@staticmethod
def update_integration_states_diff(metrics_dict, integration_id, previous_state=None, new_state=None):
metrics_dict.setdefault(integration_id, MetricsCacheManager.get_default_states_diff_dict())
def update_integration_states_diff(metrics_dict, integration_id, service_name, previous_state=None, new_state=None):
state_per_service = metrics_dict.setdefault(
integration_id, {service_name: MetricsCacheManager.get_default_states_diff_dict()}
)
if previous_state:
state_value = previous_state
metrics_dict[integration_id]["previous_states"][state_value] += 1
state_per_service[service_name]["previous_states"][state_value] += 1
if new_state:
state_value = new_state
metrics_dict[integration_id]["new_states"][state_value] += 1
state_per_service[service_name]["new_states"][state_value] += 1
return metrics_dict
@staticmethod
def update_integration_response_time_diff(metrics_dict, integration_id, response_time_seconds):
metrics_dict.setdefault(integration_id, [])
metrics_dict[integration_id].append(response_time_seconds)
return metrics_dict
@staticmethod
def metrics_update_state_cache_for_alert_group(integration_id, organization_id, old_state=None, new_state=None):
def metrics_update_state_cache_for_alert_group(
integration_id, organization_id, service_name, old_state=None, new_state=None
):
"""
Update state metric cache for one alert group.
Run the task to update async if organization_id is None due to an additional request to db
"""
metrics_state_diff = MetricsCacheManager.update_integration_states_diff(
{}, integration_id, previous_state=old_state, new_state=new_state
{}, integration_id, service_name, previous_state=old_state, new_state=new_state
)
metrics_update_alert_groups_state_cache(metrics_state_diff, organization_id)
@staticmethod
def metrics_update_response_time_cache_for_alert_group(integration_id, organization_id, response_time_seconds):
def metrics_update_response_time_cache_for_alert_group(
integration_id, organization_id, response_time_seconds, service_name
):
"""
Update response time metric cache for one alert group.
Run the task to update async if organization_id is None due to an additional request to db
"""
metrics_response_time = MetricsCacheManager.update_integration_response_time_diff(
{}, integration_id, response_time_seconds
)
metrics_response_time: typing.Dict[int, typing.Dict[str, typing.List[int]]] = {
integration_id: {service_name: [response_time_seconds]}
}
metrics_update_alert_groups_response_time_cache(metrics_response_time, organization_id)
@staticmethod
def metrics_update_cache_for_alert_group(
integration_id, organization_id, old_state=None, new_state=None, response_time=None, started_at=None
integration_id,
organization_id,
old_state=None,
new_state=None,
response_time=None,
started_at=None,
service_name=None,
):
"""Call methods to update state and response time metrics cache for one alert group."""
if response_time and old_state == AlertGroupState.FIRING and started_at > get_response_time_period():
response_time_seconds = int(response_time.total_seconds())
MetricsCacheManager.metrics_update_response_time_cache_for_alert_group(
integration_id, organization_id, response_time_seconds
integration_id, organization_id, response_time_seconds, service_name
)
if old_state or new_state:
MetricsCacheManager.metrics_update_state_cache_for_alert_group(
integration_id, organization_id, old_state, new_state
integration_id, organization_id, service_name, old_state, new_state
)

View file

@ -46,10 +46,14 @@ class ApplicationMetricsCollector:
"slug",
"id",
]
self._integration_labels = [
"integration",
"team",
] + self._stack_labels
self._integration_labels = (
[
"integration",
"team",
]
+ self._stack_labels
# + [SERVICE_LABEL] # todo:metrics: uncomment when all metric cache is updated (~2 after release)
)
self._integration_labels_with_state = self._integration_labels + ["state"]
self._user_labels = ["username"] + self._stack_labels
@ -96,8 +100,19 @@ class ApplicationMetricsCollector:
integration_data["id"], # grafana instance id
]
labels_values = list(map(str, labels_values))
for state in AlertGroupState:
alert_groups_total.add_metric(labels_values + [state.value], integration_data[state.value])
# clause below is needed for compatibility with old metric cache during rollout metrics with services
if "services" in integration_data:
for service_name in integration_data["services"]:
for state in AlertGroupState:
alert_groups_total.add_metric(
labels_values + [state.value],
# todo:metrics: replace [state.value] when all metric cache is updated
# + [service_name, state.value],
integration_data["services"][service_name][state.value],
)
else:
for state in AlertGroupState:
alert_groups_total.add_metric(labels_values + [state.value], integration_data[state.value])
org_id_from_key = RE_ALERT_GROUPS_TOTAL.match(org_key).groups()[0]
processed_org_ids.add(int(org_id_from_key))
missing_org_ids = org_ids - processed_org_ids
@ -126,12 +141,26 @@ class ApplicationMetricsCollector:
]
labels_values = list(map(str, labels_values))
response_time_values = integration_data["response_time"]
if not response_time_values:
continue
buckets, sum_value = self.get_buckets_with_sum(response_time_values)
buckets = sorted(list(buckets.items()), key=lambda x: float(x[0]))
alert_groups_response_time_seconds.add_metric(labels_values, buckets=buckets, sum_value=sum_value)
# clause below is needed for compatibility with old metric cache during rollout metrics with services
if "services" in integration_data:
# todo:metrics: for service_name, response_time
for _, response_time in integration_data["services"].items():
if not response_time:
continue
buckets, sum_value = self.get_buckets_with_sum(response_time)
buckets = sorted(list(buckets.items()), key=lambda x: float(x[0]))
alert_groups_response_time_seconds.add_metric(
labels_values, # + [service_name] todo:metrics: uncomment when all metric cache is updated
buckets=buckets,
sum_value=sum_value,
)
else:
response_time_values = integration_data["response_time"]
if not response_time_values:
continue
buckets, sum_value = self.get_buckets_with_sum(response_time_values)
buckets = sorted(list(buckets.items()), key=lambda x: float(x[0]))
alert_groups_response_time_seconds.add_metric(labels_values, buckets=buckets, sum_value=sum_value)
org_id_from_key = RE_ALERT_GROUPS_RESPONSE_TIME.match(org_key).groups()[0]
processed_org_ids.add(int(org_id_from_key))
missing_org_ids = org_ids - processed_org_ids

View file

@ -8,12 +8,15 @@ from apps.alerts.constants import AlertGroupState
from apps.metrics_exporter.constants import (
METRICS_ORGANIZATIONS_IDS,
METRICS_ORGANIZATIONS_IDS_CACHE_TIMEOUT,
NO_SERVICE_VALUE,
SERVICE_LABEL,
AlertGroupsResponseTimeMetricsDict,
AlertGroupsTotalMetricsDict,
RecalculateOrgMetricsDict,
UserWasNotifiedOfAlertGroupsMetricsDict,
)
from apps.metrics_exporter.helpers import (
get_default_states_dict,
get_metric_alert_groups_response_time_key,
get_metric_alert_groups_total_key,
get_metric_calculation_started_key,
@ -111,37 +114,80 @@ def calculate_and_cache_metrics(organization_id, force=False):
}
for integration in integrations:
# calculate states
for state, alert_group_filter in states.items():
metric_alert_group_total.setdefault(
integration.id,
{
"integration_name": integration.emojized_verbal_name,
"team_name": integration.team_name,
"team_id": integration.team_id_or_no_team,
"org_id": instance_org_id,
"slug": instance_slug,
"id": instance_id,
},
)[state] = integration.alert_groups.filter(alert_group_filter).count()
# get response time
all_response_time = integration.alert_groups.filter(
started_at__gte=response_time_period,
response_time__isnull=False,
).values_list("response_time", flat=True)
all_response_time_seconds = [int(response_time.total_seconds()) for response_time in all_response_time]
metric_alert_group_response_time[integration.id] = {
metric_alert_group_total_data = {
"integration_name": integration.emojized_verbal_name,
"team_name": integration.team_name,
"team_id": integration.team_id_or_no_team,
"org_id": instance_org_id,
"slug": instance_slug,
"id": instance_id,
"response_time": all_response_time_seconds,
"services": {
NO_SERVICE_VALUE: get_default_states_dict(),
},
}
# calculate states
for state, alert_group_filter in states.items():
# count alert groups with `service_name` label group by label value
alert_group_count_by_service = (
integration.alert_groups.filter(
alert_group_filter,
labels__organization=organization,
labels__key_name=SERVICE_LABEL,
)
.values("labels__value_name")
.annotate(count=Count("id"))
)
for value in alert_group_count_by_service:
metric_alert_group_total_data["services"].setdefault(
value["labels__value_name"],
get_default_states_dict(),
)[state] += value["count"]
# count alert groups without `service_name` label
alert_groups_count_without_service = integration.alert_groups.filter(
alert_group_filter,
~Q(labels__key_name=SERVICE_LABEL),
).count()
metric_alert_group_total_data["services"][NO_SERVICE_VALUE][state] += alert_groups_count_without_service
metric_alert_group_total[integration.id] = metric_alert_group_total_data
# calculate response time metric
metric_response_time_data = {
"integration_name": integration.emojized_verbal_name,
"team_name": integration.team_name,
"team_id": integration.team_id_or_no_team,
"org_id": instance_org_id,
"slug": instance_slug,
"id": instance_id,
"services": {NO_SERVICE_VALUE: []},
}
# filter response time by services
response_time_by_service = integration.alert_groups.filter(
started_at__gte=response_time_period,
response_time__isnull=False,
labels__organization=organization,
labels__key_name=SERVICE_LABEL,
).values_list("id", "labels__value_name", "response_time")
for _, service_name, response_time in response_time_by_service:
metric_response_time_data["services"].setdefault(service_name, [])
metric_response_time_data["services"][service_name].append(response_time.total_seconds())
no_service_response_time = (
integration.alert_groups.filter(
started_at__gte=response_time_period,
response_time__isnull=False,
)
.exclude(id__in=[i[0] for i in response_time_by_service])
.values_list("response_time", flat=True)
)
no_service_response_time_seconds = [
int(response_time.total_seconds()) for response_time in no_service_response_time
]
metric_response_time_data["services"][NO_SERVICE_VALUE] = no_service_response_time_seconds
metric_alert_group_response_time[integration.id] = metric_response_time_data
metric_alert_groups_total_key = get_metric_alert_groups_total_key(organization_id)
metric_alert_groups_response_time_key = get_metric_alert_groups_response_time_key(organization_id)
@ -223,6 +269,8 @@ def update_metrics_for_alert_group(alert_group_id, organization_id, previous_sta
if previous_state != AlertGroupState.FIRING or alert_group.restarted_at:
# only consider response time from the first action
updated_response_time = None
service_label = alert_group.labels.filter(key_name=SERVICE_LABEL).first()
service_name = service_label.value_name if service_label else NO_SERVICE_VALUE
MetricsCacheManager.metrics_update_cache_for_alert_group(
integration_id=alert_group.channel_id,
organization_id=organization_id,
@ -230,6 +278,7 @@ def update_metrics_for_alert_group(alert_group_id, organization_id, previous_sta
new_state=new_state,
response_time=updated_response_time,
started_at=alert_group.started_at,
service_name=service_name,
)

View file

@ -4,6 +4,7 @@ from django.core.cache import cache
from apps.metrics_exporter.constants import (
ALERT_GROUPS_RESPONSE_TIME,
ALERT_GROUPS_TOTAL,
NO_SERVICE_VALUE,
USER_WAS_NOTIFIED_OF_ALERT_GROUPS,
)
from apps.metrics_exporter.helpers import (
@ -21,6 +22,67 @@ METRICS_TEST_USER_USERNAME = "Alex"
@pytest.fixture()
def mock_cache_get_metrics_for_collector(monkeypatch):
def _mock_cache_get(key, *args, **kwargs):
if ALERT_GROUPS_TOTAL in key:
key = ALERT_GROUPS_TOTAL
elif ALERT_GROUPS_RESPONSE_TIME in key:
key = ALERT_GROUPS_RESPONSE_TIME
elif USER_WAS_NOTIFIED_OF_ALERT_GROUPS in key:
key = USER_WAS_NOTIFIED_OF_ALERT_GROUPS
test_metrics = {
ALERT_GROUPS_TOTAL: {
1: {
"integration_name": "Test metrics integration",
"team_name": "Test team",
"team_id": 1,
"org_id": 1,
"slug": "Test stack",
"id": 1,
"services": {
NO_SERVICE_VALUE: {
"firing": 2,
"silenced": 4,
"acknowledged": 3,
"resolved": 5,
},
},
},
},
ALERT_GROUPS_RESPONSE_TIME: {
1: {
"integration_name": "Test metrics integration",
"team_name": "Test team",
"team_id": 1,
"org_id": 1,
"slug": "Test stack",
"id": 1,
"services": {
NO_SERVICE_VALUE: [2, 10, 200, 650],
},
}
},
USER_WAS_NOTIFIED_OF_ALERT_GROUPS: {
1: {
"org_id": 1,
"slug": "Test stack",
"id": 1,
"user_username": "Alex",
"counter": 4,
}
},
}
return test_metrics.get(key)
def _mock_cache_get_many(keys, *args, **kwargs):
return {key: _mock_cache_get(key) for key in keys if _mock_cache_get(key)}
monkeypatch.setattr(cache, "get", _mock_cache_get)
monkeypatch.setattr(cache, "get_many", _mock_cache_get_many)
# todo:metrics: remove later when all cache is updated
@pytest.fixture() # used for test backwards compatibility with old version of metrics
def mock_cache_get_metrics_for_collector_mixed_versions(monkeypatch):
def _mock_cache_get(key, *args, **kwargs):
if ALERT_GROUPS_TOTAL in key:
key = ALERT_GROUPS_TOTAL
@ -41,7 +103,23 @@ def mock_cache_get_metrics_for_collector(monkeypatch):
"acknowledged": 3,
"silenced": 4,
"resolved": 5,
}
},
2: {
"integration_name": "Test metrics integration 2",
"team_name": "Test team",
"team_id": 1,
"org_id": 1,
"slug": "Test stack",
"id": 1,
"services": {
NO_SERVICE_VALUE: {
"firing": 2,
"silenced": 4,
"acknowledged": 3,
"resolved": 5,
},
},
},
},
ALERT_GROUPS_RESPONSE_TIME: {
1: {
@ -52,7 +130,18 @@ def mock_cache_get_metrics_for_collector(monkeypatch):
"slug": "Test stack",
"id": 1,
"response_time": [2, 10, 200, 650],
}
},
2: {
"integration_name": "Test metrics integration 2",
"team_name": "Test team",
"team_id": 1,
"org_id": 1,
"slug": "Test stack",
"id": 1,
"services": {
NO_SERVICE_VALUE: [2, 10, 200, 650],
},
},
},
USER_WAS_NOTIFIED_OF_ALERT_GROUPS: {
1: {
@ -87,6 +176,56 @@ def mock_get_metrics_cache(monkeypatch):
@pytest.fixture
def make_metrics_cache_params(monkeypatch):
def _make_cache_params(integration_id, organization_id, team_name=None, team_id=None):
team_name = team_name or "No team"
team_id = team_id or "no_team"
metric_alert_groups_total_key = get_metric_alert_groups_total_key(organization_id)
metric_alert_groups_response_time_key = get_metric_alert_groups_response_time_key(organization_id)
def cache_get(key, *args, **kwargs):
metrics_data = {
metric_alert_groups_response_time_key: {
integration_id: {
"integration_name": METRICS_TEST_INTEGRATION_NAME,
"team_name": team_name,
"team_id": team_id,
"org_id": METRICS_TEST_ORG_ID,
"slug": METRICS_TEST_INSTANCE_SLUG,
"id": METRICS_TEST_INSTANCE_ID,
"services": {
NO_SERVICE_VALUE: [],
},
}
},
metric_alert_groups_total_key: {
integration_id: {
"integration_name": METRICS_TEST_INTEGRATION_NAME,
"team_name": team_name,
"team_id": team_id,
"org_id": METRICS_TEST_ORG_ID,
"slug": METRICS_TEST_INSTANCE_SLUG,
"id": METRICS_TEST_INSTANCE_ID,
"services": {
NO_SERVICE_VALUE: {
"firing": 0,
"silenced": 0,
"acknowledged": 0,
"resolved": 0,
},
},
}
},
}
return metrics_data.get(key, {})
return cache_get
return _make_cache_params
# todo:metrics: remove later when all cache is updated
@pytest.fixture
def make_metrics_cache_params_old_version(monkeypatch):
def _make_cache_params(integration_id, organization_id, team_name=None, team_id=None):
team_name = team_name or "No team"
team_id = team_id or "no_team"

View file

@ -3,6 +3,7 @@ from unittest.mock import patch
import pytest
from apps.base.models import UserNotificationPolicyLogRecord
from apps.metrics_exporter.constants import NO_SERVICE_VALUE, SERVICE_LABEL
from apps.metrics_exporter.helpers import (
get_metric_alert_groups_response_time_key,
get_metric_alert_groups_total_key,
@ -21,6 +22,7 @@ def test_calculate_and_cache_metrics_task(
make_alert_receive_channel,
make_alert_group,
make_alert,
make_alert_group_label_association,
):
METRICS_RESPONSE_TIME_LEN = 3 # 1 for each alert group with changed state (acked, resolved, silenced)
organization = make_organization()
@ -45,6 +47,13 @@ def test_calculate_and_cache_metrics_task(
make_alert(alert_group=alert_group_to_sil, raw_request_data={})
alert_group_to_sil.silence()
alert_group_to_ack_with_service = make_alert_group(alert_receive_channel)
make_alert(alert_group=alert_group_to_ack, raw_request_data={})
make_alert_group_label_association(
organization, alert_group_to_ack_with_service, key_name=SERVICE_LABEL, value_name="test"
)
alert_group_to_ack_with_service.acknowledge()
metric_alert_groups_total_key = get_metric_alert_groups_total_key(organization.id)
metric_alert_groups_response_time_key = get_metric_alert_groups_response_time_key(organization.id)
@ -56,10 +65,20 @@ def test_calculate_and_cache_metrics_task(
"org_id": organization.org_id,
"slug": organization.stack_slug,
"id": organization.stack_id,
"firing": 2,
"silenced": 1,
"acknowledged": 1,
"resolved": 1,
"services": {
NO_SERVICE_VALUE: {
"firing": 2,
"silenced": 1,
"acknowledged": 1,
"resolved": 1,
},
"test": {
"firing": 0,
"silenced": 0,
"acknowledged": 1,
"resolved": 0,
},
},
},
alert_receive_channel_2.id: {
"integration_name": alert_receive_channel_2.verbal_name,
@ -68,10 +87,20 @@ def test_calculate_and_cache_metrics_task(
"org_id": organization.org_id,
"slug": organization.stack_slug,
"id": organization.stack_id,
"firing": 2,
"silenced": 1,
"acknowledged": 1,
"resolved": 1,
"services": {
NO_SERVICE_VALUE: {
"firing": 2,
"silenced": 1,
"acknowledged": 1,
"resolved": 1,
},
"test": {
"firing": 0,
"silenced": 0,
"acknowledged": 1,
"resolved": 0,
},
},
},
}
expected_result_metric_alert_groups_response_time = {
@ -82,7 +111,7 @@ def test_calculate_and_cache_metrics_task(
"org_id": organization.org_id,
"slug": organization.stack_slug,
"id": organization.stack_id,
"response_time": [],
"services": {NO_SERVICE_VALUE: [], "test": []},
},
alert_receive_channel_2.id: {
"integration_name": alert_receive_channel_2.verbal_name,
@ -91,7 +120,7 @@ def test_calculate_and_cache_metrics_task(
"org_id": organization.org_id,
"slug": organization.stack_slug,
"id": organization.stack_id,
"response_time": [],
"services": {NO_SERVICE_VALUE: [], "test": []},
},
}
@ -108,9 +137,14 @@ def test_calculate_and_cache_metrics_task(
metric_alert_groups_response_time_values = args[1].args
assert metric_alert_groups_response_time_values[0] == metric_alert_groups_response_time_key
for integration_id, values in metric_alert_groups_response_time_values[1].items():
assert len(values["response_time"]) == METRICS_RESPONSE_TIME_LEN
assert len(values["services"][NO_SERVICE_VALUE]) == METRICS_RESPONSE_TIME_LEN
# set response time to expected result because it is calculated on fly
expected_result_metric_alert_groups_response_time[integration_id]["response_time"] = values["response_time"]
expected_result_metric_alert_groups_response_time[integration_id]["services"][NO_SERVICE_VALUE] = values[
"services"
][NO_SERVICE_VALUE]
expected_result_metric_alert_groups_response_time[integration_id]["services"]["test"] = values["services"][
"test"
]
assert metric_alert_groups_response_time_values[1] == expected_result_metric_alert_groups_response_time

View file

@ -44,3 +44,33 @@ def test_application_metrics_collector(
# Since there is no recalculation timer for test org in cache, start_calculate_and_cache_metrics must be called
assert mocked_start_calculate_and_cache_metrics.called
test_metrics_registry.unregister(collector)
# todo:metrics: remove later when all cache is updated
@patch("apps.metrics_exporter.metrics_collectors.get_organization_ids", return_value=[1])
@patch("apps.metrics_exporter.metrics_collectors.start_calculate_and_cache_metrics.apply_async")
@pytest.mark.django_db
def test_application_metrics_collector_mixed_cache(
mocked_org_ids, mocked_start_calculate_and_cache_metrics, mock_cache_get_metrics_for_collector_mixed_versions
):
"""Test that ApplicationMetricsCollector generates expected metrics from previous and new versions of cache"""
collector = ApplicationMetricsCollector()
test_metrics_registry = CollectorRegistry()
test_metrics_registry.register(collector)
for metric in test_metrics_registry.collect():
if metric.name == ALERT_GROUPS_TOTAL:
# integration with labels for each alert group state
assert len(metric.samples) == len(AlertGroupState) * 2
elif metric.name == ALERT_GROUPS_RESPONSE_TIME:
# integration with labels for each value in collector's bucket + _count and _sum histogram values
assert len(metric.samples) == (len(collector._buckets) + 2) * 2
elif metric.name == USER_WAS_NOTIFIED_OF_ALERT_GROUPS:
# metric with labels for each notified user
assert len(metric.samples) == 1
result = generate_latest(test_metrics_registry).decode("utf-8")
assert result is not None
assert mocked_org_ids.called
# Since there is no recalculation timer for test org in cache, start_calculate_and_cache_metrics must be called
assert mocked_start_calculate_and_cache_metrics.called
test_metrics_registry.unregister(collector)

View file

@ -7,6 +7,7 @@ from django.test import override_settings
from apps.alerts.signals import alert_group_created_signal
from apps.alerts.tasks import notify_user_task
from apps.base.models import UserNotificationPolicy, UserNotificationPolicyLogRecord
from apps.metrics_exporter.constants import NO_SERVICE_VALUE, SERVICE_LABEL
from apps.metrics_exporter.helpers import (
get_metric_alert_groups_response_time_key,
get_metric_alert_groups_total_key,
@ -23,6 +24,8 @@ from apps.metrics_exporter.tests.conftest import (
METRICS_TEST_USER_USERNAME,
)
TEST_SERVICE_VALUE = "Test_service"
@pytest.fixture
def mock_apply_async(monkeypatch):
@ -44,6 +47,7 @@ def test_update_metric_alert_groups_total_cache_on_action(
make_alert_group,
make_alert,
make_metrics_cache_params,
make_alert_group_label_association,
monkeypatch,
):
organization = make_organization(
@ -64,13 +68,24 @@ def test_update_metric_alert_groups_total_cache_on_action(
"org_id": organization.org_id,
"slug": organization.stack_slug,
"id": organization.stack_id,
"firing": 0,
"silenced": 0,
"acknowledged": 0,
"resolved": 0,
"services": {
NO_SERVICE_VALUE: {
"firing": 0,
"silenced": 0,
"acknowledged": 0,
"resolved": 0,
},
},
}
}
default_state = {
"firing": 0,
"silenced": 0,
"acknowledged": 0,
"resolved": 0,
}
expected_result_firing = {
"firing": 1,
"silenced": 0,
@ -102,17 +117,21 @@ def test_update_metric_alert_groups_total_cache_on_action(
metrics_cache = make_metrics_cache_params(alert_receive_channel.id, organization.id)
monkeypatch.setattr(cache, "get", metrics_cache)
def get_called_arg_index_and_compare_results(update_expected_result):
def get_called_arg_index_and_compare_results(update_expected_result, service_name=NO_SERVICE_VALUE):
"""find index for the metric argument, that was set in cache"""
for idx, called_arg in enumerate(mock_cache_set_called_args):
if idx >= arg_idx and called_arg.args[0] == metric_alert_groups_total_key:
expected_result_metric_alert_groups_total[alert_receive_channel.id].update(update_expected_result)
expected_result_metric_alert_groups_total[alert_receive_channel.id]["services"].setdefault(
service_name, {}
).update(update_expected_result)
assert called_arg.args[1] == expected_result_metric_alert_groups_total
return idx + 1
raise AssertionError
with patch("apps.metrics_exporter.tasks.cache.set") as mock_cache_set:
arg_idx = 0
# create alert group without service label
alert_group = make_alert_group(alert_receive_channel)
make_alert(alert_group=alert_group, raw_request_data={})
# this signal is normally called in get_or_create_grouping on create alert
@ -138,7 +157,25 @@ def test_update_metric_alert_groups_total_cache_on_action(
arg_idx = get_called_arg_index_and_compare_results(expected_result_silenced)
alert_group.un_silence_by_user_or_backsync(user)
get_called_arg_index_and_compare_results(expected_result_firing)
arg_idx = get_called_arg_index_and_compare_results(expected_result_firing)
# create alert group with service label and check metric cache is updated properly
expected_result_metric_alert_groups_total[alert_receive_channel.id]["services"][NO_SERVICE_VALUE].update(
default_state
)
alert_group_with_service = make_alert_group(alert_receive_channel)
make_alert(alert_group=alert_group_with_service, raw_request_data={})
make_alert_group_label_association(
organization, alert_group_with_service, key_name=SERVICE_LABEL, value_name=TEST_SERVICE_VALUE
)
alert_group_created_signal.send(sender=alert_group_with_service.__class__, alert_group=alert_group_with_service)
# check alert_groups_total metric cache, get called args
arg_idx = get_called_arg_index_and_compare_results(expected_result_firing, TEST_SERVICE_VALUE)
alert_group_with_service.resolve_by_user_or_backsync(user)
get_called_arg_index_and_compare_results(expected_result_resolved, TEST_SERVICE_VALUE)
@patch("apps.alerts.models.alert_group_log_record.tasks.send_update_log_report_signal.apply_async")
@ -156,6 +193,7 @@ def test_update_metric_alert_groups_response_time_cache_on_action(
make_alert,
monkeypatch,
make_metrics_cache_params,
make_alert_group_label_association,
):
organization = make_organization(
org_id=METRICS_TEST_ORG_ID,
@ -175,21 +213,21 @@ def test_update_metric_alert_groups_response_time_cache_on_action(
"org_id": organization.org_id,
"slug": organization.stack_slug,
"id": organization.stack_id,
"response_time": [],
"services": {NO_SERVICE_VALUE: []},
}
}
metrics_cache = make_metrics_cache_params(alert_receive_channel.id, organization.id)
monkeypatch.setattr(cache, "get", metrics_cache)
def get_called_arg_index_and_compare_results():
def get_called_arg_index_and_compare_results(service_name=NO_SERVICE_VALUE):
"""find index for related to the metric argument, that was set in cache"""
for idx, called_arg in enumerate(mock_cache_set_called_args):
if idx >= arg_idx and called_arg.args[0] == metric_alert_groups_response_time_key:
response_time_values = called_arg.args[1][alert_receive_channel.id]["response_time"]
expected_result_metric_alert_groups_response_time[alert_receive_channel.id].update(
{"response_time": response_time_values}
)
response_time_values = called_arg.args[1][alert_receive_channel.id]["services"][service_name]
expected_result_metric_alert_groups_response_time[alert_receive_channel.id]["services"][
service_name
] = response_time_values
# response time values len always will be 1 here since cache is mocked and refreshed on every call
assert len(response_time_values) == 1
assert called_arg.args[1] == expected_result_metric_alert_groups_response_time
@ -236,7 +274,19 @@ def test_update_metric_alert_groups_response_time_cache_on_action(
arg_idx = get_called_arg_index_and_compare_results()
alert_group_3.silence_by_user_or_backsync(user, silence_delay=None)
get_called_arg_index_and_compare_results()
arg_idx = get_called_arg_index_and_compare_results()
# create alert group with service label and check metric cache is updated properly
expected_result_metric_alert_groups_response_time[alert_receive_channel.id]["services"][NO_SERVICE_VALUE] = []
alert_group_with_service = make_alert_group(alert_receive_channel)
make_alert(alert_group=alert_group_with_service, raw_request_data={})
make_alert_group_label_association(
organization, alert_group_with_service, key_name=SERVICE_LABEL, value_name=TEST_SERVICE_VALUE
)
assert_cache_was_not_changed_by_response_time_metric()
alert_group_with_service.acknowledge_by_user_or_backsync(user)
get_called_arg_index_and_compare_results(TEST_SERVICE_VALUE)
@pytest.mark.django_db
@ -296,10 +346,14 @@ def test_update_metrics_cache_on_update_integration(
"org_id": organization.org_id,
"slug": organization.stack_slug,
"id": organization.stack_id,
"firing": 0,
"silenced": 0,
"acknowledged": 0,
"resolved": 0,
"services": {
NO_SERVICE_VALUE: {
"firing": 0,
"silenced": 0,
"acknowledged": 0,
"resolved": 0,
},
},
}
}
expected_result_metric_alert_groups_response_time = {
@ -310,7 +364,7 @@ def test_update_metrics_cache_on_update_integration(
"org_id": organization.org_id,
"slug": organization.stack_slug,
"id": organization.stack_id,
"response_time": [],
"services": {NO_SERVICE_VALUE: []},
}
}
@ -409,10 +463,14 @@ def test_update_metrics_cache_on_update_team(
"org_id": organization.org_id,
"slug": organization.stack_slug,
"id": organization.stack_id,
"firing": 0,
"silenced": 0,
"acknowledged": 0,
"resolved": 0,
"services": {
NO_SERVICE_VALUE: {
"firing": 0,
"silenced": 0,
"acknowledged": 0,
"resolved": 0,
},
},
}
}
expected_result_metric_alert_groups_response_time = {
@ -423,7 +481,7 @@ def test_update_metrics_cache_on_update_team(
"org_id": organization.org_id,
"slug": organization.stack_slug,
"id": organization.stack_id,
"response_time": [],
"services": {NO_SERVICE_VALUE: []},
}
}
@ -568,10 +626,14 @@ def test_metrics_add_integrations_to_cache(make_organization, make_alert_receive
"org_id": organization.org_id,
"slug": organization.stack_slug,
"id": organization.stack_id,
"firing": firing,
"silenced": 0,
"acknowledged": 0,
"resolved": 0,
"services": {
NO_SERVICE_VALUE: {
"firing": firing,
"silenced": 0,
"acknowledged": 0,
"resolved": 0,
},
},
}
def _expected_alert_groups_response_time(alert_receive_channel, response_time=None):
@ -585,7 +647,9 @@ def test_metrics_add_integrations_to_cache(make_organization, make_alert_receive
"org_id": organization.org_id,
"slug": organization.stack_slug,
"id": organization.stack_id,
"response_time": response_time,
"services": {
NO_SERVICE_VALUE: response_time,
},
}
# clear cache, add some data
@ -612,3 +676,169 @@ def test_metrics_add_integrations_to_cache(make_organization, make_alert_receive
alert_receive_channel1.id: _expected_alert_groups_response_time(alert_receive_channel1),
alert_receive_channel2.id: _expected_alert_groups_response_time(alert_receive_channel2, response_time=[12]),
}
# todo:metrics: remove later when all cache is updated
@patch("apps.alerts.models.alert_group_log_record.tasks.send_update_log_report_signal.apply_async")
@patch("apps.alerts.tasks.send_alert_group_signal.alert_group_action_triggered_signal.send")
@pytest.mark.django_db
@override_settings(CELERY_TASK_ALWAYS_EAGER=True)
def test_update_metric_alert_groups_total_cache_on_action_backward_compatability(
mocked_send_log_signal,
mocked_action_signal_send,
mock_apply_async,
make_organization,
make_user_for_organization,
make_alert_receive_channel,
make_alert_group,
make_alert,
make_metrics_cache_params_old_version,
monkeypatch,
):
"""Test update metric cache works properly with previous version of cache"""
organization = make_organization(
org_id=METRICS_TEST_ORG_ID,
stack_slug=METRICS_TEST_INSTANCE_SLUG,
stack_id=METRICS_TEST_INSTANCE_ID,
)
user = make_user_for_organization(organization)
alert_receive_channel = make_alert_receive_channel(organization, verbal_name=METRICS_TEST_INTEGRATION_NAME)
metric_alert_groups_total_key = get_metric_alert_groups_total_key(organization.id)
expected_result_metric_alert_groups_total = {
alert_receive_channel.id: {
"integration_name": alert_receive_channel.verbal_name,
"team_name": "No team",
"team_id": "no_team",
"org_id": organization.org_id,
"slug": organization.stack_slug,
"id": organization.stack_id,
"firing": 0,
"silenced": 0,
"acknowledged": 0,
"resolved": 0,
}
}
expected_result_firing = {
"firing": 1,
"silenced": 0,
"acknowledged": 0,
"resolved": 0,
}
expected_result_acked = {
"firing": 0,
"silenced": 0,
"acknowledged": 1,
"resolved": 0,
}
expected_result_resolved = {
"firing": 0,
"silenced": 0,
"acknowledged": 0,
"resolved": 1,
}
metrics_cache = make_metrics_cache_params_old_version(alert_receive_channel.id, organization.id)
monkeypatch.setattr(cache, "get", metrics_cache)
def get_called_arg_index_and_compare_results(update_expected_result):
"""find index for the metric argument, that was set in cache"""
for idx, called_arg in enumerate(mock_cache_set_called_args):
if idx >= arg_idx and called_arg.args[0] == metric_alert_groups_total_key:
expected_result_metric_alert_groups_total[alert_receive_channel.id].update(update_expected_result)
assert called_arg.args[1] == expected_result_metric_alert_groups_total
return idx + 1
raise AssertionError
with patch("apps.metrics_exporter.tasks.cache.set") as mock_cache_set:
arg_idx = 0
alert_group = make_alert_group(alert_receive_channel)
make_alert(alert_group=alert_group, raw_request_data={})
# this signal is normally called in get_or_create_grouping on create alert
alert_group_created_signal.send(sender=alert_group.__class__, alert_group=alert_group)
# check alert_groups_total metric cache, get called args
mock_cache_set_called_args = mock_cache_set.call_args_list
arg_idx = get_called_arg_index_and_compare_results(expected_result_firing)
alert_group.acknowledge_by_user_or_backsync(user)
arg_idx = get_called_arg_index_and_compare_results(expected_result_acked)
alert_group.resolve_by_user_or_backsync(user)
arg_idx = get_called_arg_index_and_compare_results(expected_result_resolved)
alert_group.un_resolve_by_user_or_backsync(user)
arg_idx = get_called_arg_index_and_compare_results(expected_result_firing)
# todo:metrics: remove later when all cache is updated
@patch("apps.alerts.models.alert_group_log_record.tasks.send_update_log_report_signal.apply_async")
@patch("apps.alerts.tasks.send_alert_group_signal.alert_group_action_triggered_signal.send")
@pytest.mark.django_db
@override_settings(CELERY_TASK_ALWAYS_EAGER=True)
def test_update_metric_alert_groups_response_time_cache_on_action_backward_compatability(
mocked_send_log_signal,
mocked_action_signal_send,
mock_apply_async,
make_organization,
make_user_for_organization,
make_alert_receive_channel,
make_alert_group,
make_alert,
monkeypatch,
make_metrics_cache_params_old_version,
):
"""Test update metric cache works properly with previous version of cache"""
organization = make_organization(
org_id=METRICS_TEST_ORG_ID,
stack_slug=METRICS_TEST_INSTANCE_SLUG,
stack_id=METRICS_TEST_INSTANCE_ID,
)
user = make_user_for_organization(organization)
alert_receive_channel = make_alert_receive_channel(organization, verbal_name=METRICS_TEST_INTEGRATION_NAME)
metric_alert_groups_response_time_key = get_metric_alert_groups_response_time_key(organization.id)
expected_result_metric_alert_groups_response_time = {
alert_receive_channel.id: {
"integration_name": alert_receive_channel.verbal_name,
"team_name": "No team",
"team_id": "no_team",
"org_id": organization.org_id,
"slug": organization.stack_slug,
"id": organization.stack_id,
"response_time": [],
}
}
metrics_cache = make_metrics_cache_params_old_version(alert_receive_channel.id, organization.id)
monkeypatch.setattr(cache, "get", metrics_cache)
def get_called_arg_index_and_compare_results():
"""find index for related to the metric argument, that was set in cache"""
for idx, called_arg in enumerate(mock_cache_set_called_args):
if idx >= arg_idx and called_arg.args[0] == metric_alert_groups_response_time_key:
response_time_values = called_arg.args[1][alert_receive_channel.id]["response_time"]
expected_result_metric_alert_groups_response_time[alert_receive_channel.id].update(
{"response_time": response_time_values}
)
# response time values len always will be 1 here since cache is mocked and refreshed on every call
assert len(response_time_values) == 1
assert called_arg.args[1] == expected_result_metric_alert_groups_response_time
return idx + 1
raise AssertionError
with patch("apps.metrics_exporter.tasks.cache.set") as mock_cache_set:
arg_idx = 0
alert_group = make_alert_group(alert_receive_channel)
make_alert(alert_group=alert_group, raw_request_data={})
# check alert_groups_response_time metric cache, get called args
mock_cache_set_called_args = mock_cache_set.call_args_list
alert_group.acknowledge_by_user_or_backsync(user)
arg_idx = get_called_arg_index_and_compare_results()