oncall-engine/engine/apps/metrics_exporter/metrics_collectors.py
Yulya Artyukhina 29bd42c0b1
Fix collecting metrics (#4822)
# What this PR does
Reverts the accidental removal of the ApplicationMetricsCollector from
the metric register

## Which issue(s) this PR closes

Related to [issue link here]

<!--
*Note*: If you want the issue to be auto-closed once the PR is merged,
change "Related to" to "Closes" in the line above.
If you have more than one GitHub issue that this PR closes, be sure to
preface
each issue link with a [closing
keyword](https://docs.github.com/en/get-started/writing-on-github/working-with-advanced-formatting/using-keywords-in-issues-and-pull-requests#linking-a-pull-request-to-an-issue).
This ensures that the issue(s) are auto-closed once the PR has been
merged.
-->

## Checklist

- [ ] Unit, integration, and e2e (if applicable) tests updated
- [x] Documentation added (or `pr:no public docs` PR label added if not
required)
- [x] Added the relevant release notes label (see labels prefixed w/
`release:`). These labels dictate how your PR will
    show up in the autogenerated release notes.
2024-08-14 13:53:43 +00:00

237 lines
11 KiB
Python

import logging
import re
import typing
from django.conf import settings
from django.core.cache import cache
from prometheus_client import CollectorRegistry
from prometheus_client.metrics_core import CounterMetricFamily, GaugeMetricFamily, HistogramMetricFamily, Metric
from apps.alerts.constants import AlertGroupState
from apps.metrics_exporter.constants import (
ALERT_GROUPS_RESPONSE_TIME,
ALERT_GROUPS_TOTAL,
SERVICE_LABEL,
USER_WAS_NOTIFIED_OF_ALERT_GROUPS,
AlertGroupsResponseTimeMetricsDict,
AlertGroupsTotalMetricsDict,
RecalculateOrgMetricsDict,
UserWasNotifiedOfAlertGroupsMetricsDict,
)
from apps.metrics_exporter.helpers import (
get_metric_alert_groups_response_time_key,
get_metric_alert_groups_total_key,
get_metric_calculation_started_key,
get_metric_user_was_notified_of_alert_groups_key,
get_metrics_cache_timer_key,
get_organization_ids,
)
from apps.metrics_exporter.tasks import start_calculate_and_cache_metrics, start_recalculation_for_new_metric
from settings.base import (
METRIC_ALERT_GROUPS_RESPONSE_TIME_NAME,
METRIC_ALERT_GROUPS_TOTAL_NAME,
METRIC_USER_WAS_NOTIFIED_OF_ALERT_GROUPS_NAME,
)
application_metrics_registry = CollectorRegistry()
logger = logging.getLogger(__name__)
# _RE_BASE_PATTERN allows for optional curly-brackets around the metric name as in some cases this may occur
# see common.cache.ensure_cache_key_allocates_to_the_same_hash_slot for more details regarding this
_RE_BASE_PATTERN = r"{{?{}}}?_(\d+)"
RE_ALERT_GROUPS_TOTAL = re.compile(_RE_BASE_PATTERN.format(ALERT_GROUPS_TOTAL))
RE_ALERT_GROUPS_RESPONSE_TIME = re.compile(_RE_BASE_PATTERN.format(ALERT_GROUPS_RESPONSE_TIME))
RE_USER_WAS_NOTIFIED_OF_ALERT_GROUPS = re.compile(_RE_BASE_PATTERN.format(USER_WAS_NOTIFIED_OF_ALERT_GROUPS))
# https://github.com/prometheus/client_python#custom-collectors
class ApplicationMetricsCollector:
GetMetricFunc = typing.Callable[[set], typing.Tuple[Metric, set]]
def __init__(self):
self._buckets = (60, 300, 600, 3600, "+Inf")
self._stack_labels = [
"org_id",
"slug",
"id",
]
self._integration_labels = (
[
"integration",
"team",
]
+ self._stack_labels
+ [SERVICE_LABEL]
)
self._integration_labels_with_state = self._integration_labels + ["state"]
self._user_labels = ["username"] + self._stack_labels
def collect(self):
"""
Collects metrics listed in METRICS_TO_COLLECT settings var
"""
metrics_map: typing.Dict[str, ApplicationMetricsCollector.GetMetricFunc] = {
METRIC_ALERT_GROUPS_TOTAL_NAME: self._get_alert_groups_total_metric,
METRIC_ALERT_GROUPS_RESPONSE_TIME_NAME: self._get_response_time_metric,
METRIC_USER_WAS_NOTIFIED_OF_ALERT_GROUPS_NAME: self._get_user_was_notified_of_alert_groups_metric,
}
org_ids = set(get_organization_ids())
metrics: typing.List[Metric] = []
missing_org_ids: typing.Set[int] = set()
for metric_name in settings.METRICS_TO_COLLECT:
if metric_name not in metrics_map:
logger.error(f"Invalid metric name {metric_name} in `METRICS_TO_COLLECT` var")
continue
metric, missing_org_ids_temp = metrics_map[metric_name](org_ids)
metrics.append(metric)
missing_org_ids |= missing_org_ids_temp
# check for orgs missing any of the metrics or needing a refresh, start recalculation task for missing org ids
self.recalculate_cache_for_missing_org_ids(org_ids, missing_org_ids)
for metric in metrics:
yield metric
def _get_alert_groups_total_metric(self, org_ids: set[int]) -> typing.Tuple[Metric, set[int]]:
alert_groups_total = GaugeMetricFamily(
ALERT_GROUPS_TOTAL, "All alert groups", labels=self._integration_labels_with_state
)
processed_org_ids = set()
alert_groups_total_keys = [get_metric_alert_groups_total_key(org_id) for org_id in org_ids]
org_ag_states: typing.Dict[str, typing.Dict[int, AlertGroupsTotalMetricsDict]] = cache.get_many(
alert_groups_total_keys
)
for org_key, ag_states in org_ag_states.items():
for _, integration_data in ag_states.items():
if "services" not in integration_data:
logger.warning(f"Deleting stale metrics cache for {org_key}")
cache.delete(org_key)
break
labels_values: typing.List[str] = self._get_labels_from_integration_data(integration_data)
for service_name in integration_data["services"]:
for state in AlertGroupState:
alert_groups_total.add_metric(
labels_values + [service_name, state.value],
integration_data["services"][service_name][state.value],
)
org_id_from_key = RE_ALERT_GROUPS_TOTAL.match(org_key).groups()[0]
processed_org_ids.add(int(org_id_from_key))
missing_org_ids = org_ids - processed_org_ids
return alert_groups_total, missing_org_ids
def _get_user_was_notified_of_alert_groups_metric(self, org_ids: set[int]) -> typing.Tuple[Metric, set[int]]:
user_was_notified = CounterMetricFamily(
USER_WAS_NOTIFIED_OF_ALERT_GROUPS, "Number of alert groups user was notified of", labels=self._user_labels
)
processed_org_ids = set()
user_was_notified_keys = [get_metric_user_was_notified_of_alert_groups_key(org_id) for org_id in org_ids]
org_users: typing.Dict[str, typing.Dict[int, UserWasNotifiedOfAlertGroupsMetricsDict]] = cache.get_many(
user_was_notified_keys
)
for org_key, users in org_users.items():
for _, user_data in users.items():
labels_values: typing.List[str] = self._get_labels_from_user_data(user_data)
user_was_notified.add_metric(labels_values, user_data["counter"])
org_id_from_key = RE_USER_WAS_NOTIFIED_OF_ALERT_GROUPS.match(org_key).groups()[0]
processed_org_ids.add(int(org_id_from_key))
missing_org_ids = org_ids - processed_org_ids
return user_was_notified, missing_org_ids
def _get_response_time_metric(self, org_ids: set[int]) -> typing.Tuple[Metric, set[int]]:
alert_groups_response_time_seconds = HistogramMetricFamily(
ALERT_GROUPS_RESPONSE_TIME,
"Users response time to alert groups in 7 days (seconds)",
labels=self._integration_labels,
)
processed_org_ids = set()
alert_groups_response_time_keys = [get_metric_alert_groups_response_time_key(org_id) for org_id in org_ids]
org_ag_response_times: typing.Dict[str, typing.Dict[int, AlertGroupsResponseTimeMetricsDict]] = cache.get_many(
alert_groups_response_time_keys
)
for org_key, ag_response_time in org_ag_response_times.items():
for _, integration_data in ag_response_time.items():
if "services" not in integration_data:
logger.warning(f"Deleting stale metrics cache for {org_key}")
cache.delete(org_key)
break
labels_values: typing.List[str] = self._get_labels_from_integration_data(integration_data)
for service_name, response_time in integration_data["services"].items():
if not response_time:
continue
buckets_values, sum_value = self._get_buckets_with_sum(response_time)
buckets: list = sorted(list(buckets_values.items()), key=lambda x: float(x[0]))
alert_groups_response_time_seconds.add_metric(
labels_values + [service_name],
buckets=buckets,
sum_value=sum_value,
)
org_id_from_key = RE_ALERT_GROUPS_RESPONSE_TIME.match(org_key).groups()[0]
processed_org_ids.add(int(org_id_from_key))
missing_org_ids = org_ids - processed_org_ids
return alert_groups_response_time_seconds, missing_org_ids
def _get_buckets_with_sum(self, values: typing.List[int]) -> typing.Tuple[typing.Dict[str, float], int]:
"""Put values in correct buckets and count values sum"""
buckets_values = {str(key): 0 for key in self._buckets}
sum_value = 0
for value in values:
for bucket in self._buckets:
if value <= float(bucket):
buckets_values[str(bucket)] += 1.0
sum_value += value
return buckets_values, sum_value
def _get_labels_from_integration_data(
self, integration_data: AlertGroupsTotalMetricsDict | AlertGroupsResponseTimeMetricsDict
) -> typing.List[str]:
# Labels values should have the same order as _integration_labels_with_state
labels_values = [
integration_data["integration_name"], # integration
integration_data["team_name"], # team
integration_data["org_id"], # grafana org_id
integration_data["slug"], # grafana instance slug
integration_data["id"], # grafana instance id
]
return list(map(str, labels_values))
def _get_labels_from_user_data(self, user_data: UserWasNotifiedOfAlertGroupsMetricsDict) -> typing.List[str]:
# Labels values should have the same order as _user_labels
labels_values = [
user_data["user_username"], # username
user_data["org_id"], # grafana org_id
user_data["slug"], # grafana instance slug
user_data["id"], # grafana instance id
]
return list(map(str, labels_values))
def _update_new_metric(self, metric_name: str, org_ids: set[int], missing_org_ids: set[int]) -> set[int]:
"""
This method is used for new metrics to calculate metrics gradually and avoid force recalculation for all orgs
Add to collect() method the following code with metric name when needed:
# update new metric gradually
missing_org_ids_X = self._update_new_metric(<NEW_METRIC_NAME>, org_ids, missing_org_ids_X)
"""
calculation_started_key = get_metric_calculation_started_key(metric_name)
is_calculation_started = cache.get(calculation_started_key)
if len(missing_org_ids) == len(org_ids) or is_calculation_started:
missing_org_ids = set()
if not is_calculation_started:
start_recalculation_for_new_metric.apply_async((metric_name,))
return missing_org_ids
def recalculate_cache_for_missing_org_ids(self, org_ids: set[int], missing_org_ids: set[int]) -> None:
cache_timer_for_org_keys = [get_metrics_cache_timer_key(org_id) for org_id in org_ids]
cache_timers_for_org = cache.get_many(cache_timer_for_org_keys)
recalculate_orgs: typing.List[RecalculateOrgMetricsDict] = []
for org_id in org_ids:
force_task = org_id in missing_org_ids
if force_task or not cache_timers_for_org.get(get_metrics_cache_timer_key(org_id)):
recalculate_orgs.append({"organization_id": org_id, "force": force_task})
if recalculate_orgs:
start_calculate_and_cache_metrics.apply_async((recalculate_orgs,))
application_metrics_registry.register(ApplicationMetricsCollector())