Add settings var to choose application metrics to collect (#4781)
# What this PR does
Adds settings var `METRICS_TO_COLLECT` to choose what metrics should be
collected by `ApplicationMetricsCollector`.
It allows to collect different application metrics using different
exporters.
## Which issue(s) this PR closes
## Checklist
- [x] Unit, integration, and e2e (if applicable) tests updated
- [x] Documentation added (or `pr:no public docs` PR label added if not
required)
- [x] Added the relevant release notes label (see labels prefixed w/
`release:`). These labels dictate how your PR will
show up in the autogenerated release notes.
This commit is contained in:
parent
e2bc9d784b
commit
503939783f
3 changed files with 155 additions and 93 deletions
|
|
@ -2,9 +2,10 @@ import logging
|
|||
import re
|
||||
import typing
|
||||
|
||||
from django.conf import settings
|
||||
from django.core.cache import cache
|
||||
from prometheus_client import CollectorRegistry
|
||||
from prometheus_client.metrics_core import CounterMetricFamily, GaugeMetricFamily, HistogramMetricFamily
|
||||
from prometheus_client.metrics_core import CounterMetricFamily, GaugeMetricFamily, HistogramMetricFamily, Metric
|
||||
|
||||
from apps.alerts.constants import AlertGroupState
|
||||
from apps.metrics_exporter.constants import (
|
||||
|
|
@ -26,6 +27,11 @@ from apps.metrics_exporter.helpers import (
|
|||
get_organization_ids,
|
||||
)
|
||||
from apps.metrics_exporter.tasks import start_calculate_and_cache_metrics, start_recalculation_for_new_metric
|
||||
from settings.base import (
|
||||
METRIC_ALERT_GROUPS_RESPONSE_TIME_NAME,
|
||||
METRIC_ALERT_GROUPS_TOTAL_NAME,
|
||||
METRIC_USER_WAS_NOTIFIED_OF_ALERT_GROUPS_NAME,
|
||||
)
|
||||
|
||||
application_metrics_registry = CollectorRegistry()
|
||||
|
||||
|
|
@ -42,6 +48,8 @@ RE_USER_WAS_NOTIFIED_OF_ALERT_GROUPS = re.compile(_RE_BASE_PATTERN.format(USER_W
|
|||
|
||||
# https://github.com/prometheus/client_python#custom-collectors
|
||||
class ApplicationMetricsCollector:
|
||||
GetMetricFunc = typing.Callable[[set], typing.Tuple[Metric, set]]
|
||||
|
||||
def __init__(self):
|
||||
self._buckets = (60, 300, 600, 3600, "+Inf")
|
||||
self._stack_labels = [
|
||||
|
|
@ -61,29 +69,33 @@ class ApplicationMetricsCollector:
|
|||
self._user_labels = ["username"] + self._stack_labels
|
||||
|
||||
def collect(self):
|
||||
"""
|
||||
Collects metrics listed in METRICS_TO_COLLECT settings var
|
||||
"""
|
||||
metrics_map: typing.Dict[str, ApplicationMetricsCollector.GetMetricFunc] = {
|
||||
METRIC_ALERT_GROUPS_TOTAL_NAME: self._get_alert_groups_total_metric,
|
||||
METRIC_ALERT_GROUPS_RESPONSE_TIME_NAME: self._get_response_time_metric,
|
||||
METRIC_USER_WAS_NOTIFIED_OF_ALERT_GROUPS_NAME: self._get_user_was_notified_of_alert_groups_metric,
|
||||
}
|
||||
org_ids = set(get_organization_ids())
|
||||
metrics: typing.List[Metric] = []
|
||||
missing_org_ids: typing.Set[int] = set()
|
||||
|
||||
# alert groups total metric: gauge
|
||||
alert_groups_total, missing_org_ids_1 = self._get_alert_groups_total_metric(org_ids)
|
||||
# alert groups response time metrics: histogram
|
||||
alert_groups_response_time_seconds, missing_org_ids_2 = self._get_response_time_metric(org_ids)
|
||||
# user was notified of alert groups metrics: counter
|
||||
user_was_notified, missing_org_ids_3 = self._get_user_was_notified_of_alert_groups_metric(org_ids)
|
||||
|
||||
# This part is used for releasing new metrics to avoid recalculation for every metric.
|
||||
# Uncomment with metric name when needed.
|
||||
# # update new metric gradually
|
||||
# missing_org_ids_3 = self._update_new_metric(USER_WAS_NOTIFIED_OF_ALERT_GROUPS, org_ids, missing_org_ids_3)
|
||||
for metric_name in settings.METRICS_TO_COLLECT:
|
||||
if metric_name not in metrics_map:
|
||||
logger.error(f"Invalid metric name {metric_name} in `METRICS_TO_COLLECT` var")
|
||||
continue
|
||||
metric, missing_org_ids_temp = metrics_map[metric_name](org_ids)
|
||||
metrics.append(metric)
|
||||
missing_org_ids |= missing_org_ids_temp
|
||||
|
||||
# check for orgs missing any of the metrics or needing a refresh, start recalculation task for missing org ids
|
||||
missing_org_ids = missing_org_ids_1 | missing_org_ids_2 | missing_org_ids_3
|
||||
self.recalculate_cache_for_missing_org_ids(org_ids, missing_org_ids)
|
||||
|
||||
yield alert_groups_total
|
||||
yield alert_groups_response_time_seconds
|
||||
yield user_was_notified
|
||||
for metric in metrics:
|
||||
yield metric
|
||||
|
||||
def _get_alert_groups_total_metric(self, org_ids):
|
||||
def _get_alert_groups_total_metric(self, org_ids: set[int]) -> typing.Tuple[Metric, set[int]]:
|
||||
alert_groups_total = GaugeMetricFamily(
|
||||
ALERT_GROUPS_TOTAL, "All alert groups", labels=self._integration_labels_with_state
|
||||
)
|
||||
|
|
@ -98,15 +110,7 @@ class ApplicationMetricsCollector:
|
|||
logger.warning(f"Deleting stale metrics cache for {org_key}")
|
||||
cache.delete(org_key)
|
||||
break
|
||||
# Labels values should have the same order as _integration_labels_with_state
|
||||
labels_values = [
|
||||
integration_data["integration_name"], # integration
|
||||
integration_data["team_name"], # team
|
||||
integration_data["org_id"], # grafana org_id
|
||||
integration_data["slug"], # grafana instance slug
|
||||
integration_data["id"], # grafana instance id
|
||||
]
|
||||
labels_values = list(map(str, labels_values))
|
||||
labels_values: typing.List[str] = self._get_labels_from_integration_data(integration_data)
|
||||
for service_name in integration_data["services"]:
|
||||
for state in AlertGroupState:
|
||||
alert_groups_total.add_metric(
|
||||
|
|
@ -118,7 +122,25 @@ class ApplicationMetricsCollector:
|
|||
missing_org_ids = org_ids - processed_org_ids
|
||||
return alert_groups_total, missing_org_ids
|
||||
|
||||
def _get_response_time_metric(self, org_ids):
|
||||
def _get_user_was_notified_of_alert_groups_metric(self, org_ids: set[int]) -> typing.Tuple[Metric, set[int]]:
|
||||
user_was_notified = CounterMetricFamily(
|
||||
USER_WAS_NOTIFIED_OF_ALERT_GROUPS, "Number of alert groups user was notified of", labels=self._user_labels
|
||||
)
|
||||
processed_org_ids = set()
|
||||
user_was_notified_keys = [get_metric_user_was_notified_of_alert_groups_key(org_id) for org_id in org_ids]
|
||||
org_users: typing.Dict[str, typing.Dict[int, UserWasNotifiedOfAlertGroupsMetricsDict]] = cache.get_many(
|
||||
user_was_notified_keys
|
||||
)
|
||||
for org_key, users in org_users.items():
|
||||
for _, user_data in users.items():
|
||||
labels_values: typing.List[str] = self._get_labels_from_user_data(user_data)
|
||||
user_was_notified.add_metric(labels_values, user_data["counter"])
|
||||
org_id_from_key = RE_USER_WAS_NOTIFIED_OF_ALERT_GROUPS.match(org_key).groups()[0]
|
||||
processed_org_ids.add(int(org_id_from_key))
|
||||
missing_org_ids = org_ids - processed_org_ids
|
||||
return user_was_notified, missing_org_ids
|
||||
|
||||
def _get_response_time_metric(self, org_ids: set[int]) -> typing.Tuple[Metric, set[int]]:
|
||||
alert_groups_response_time_seconds = HistogramMetricFamily(
|
||||
ALERT_GROUPS_RESPONSE_TIME,
|
||||
"Users response time to alert groups in 7 days (seconds)",
|
||||
|
|
@ -135,21 +157,12 @@ class ApplicationMetricsCollector:
|
|||
logger.warning(f"Deleting stale metrics cache for {org_key}")
|
||||
cache.delete(org_key)
|
||||
break
|
||||
# Labels values should have the same order as _integration_labels
|
||||
labels_values = [
|
||||
integration_data["integration_name"], # integration
|
||||
integration_data["team_name"], # team
|
||||
integration_data["org_id"], # grafana org_id
|
||||
integration_data["slug"], # grafana instance slug
|
||||
integration_data["id"], # grafana instance id
|
||||
]
|
||||
labels_values = list(map(str, labels_values))
|
||||
|
||||
labels_values: typing.List[str] = self._get_labels_from_integration_data(integration_data)
|
||||
for service_name, response_time in integration_data["services"].items():
|
||||
if not response_time:
|
||||
continue
|
||||
buckets, sum_value = self.get_buckets_with_sum(response_time)
|
||||
buckets = sorted(list(buckets.items()), key=lambda x: float(x[0]))
|
||||
buckets_values, sum_value = self._get_buckets_with_sum(response_time)
|
||||
buckets: list = sorted(list(buckets_values.items()), key=lambda x: float(x[0]))
|
||||
alert_groups_response_time_seconds.add_metric(
|
||||
labels_values + [service_name],
|
||||
buckets=buckets,
|
||||
|
|
@ -160,55 +173,7 @@ class ApplicationMetricsCollector:
|
|||
missing_org_ids = org_ids - processed_org_ids
|
||||
return alert_groups_response_time_seconds, missing_org_ids
|
||||
|
||||
def _get_user_was_notified_of_alert_groups_metric(self, org_ids):
|
||||
user_was_notified = CounterMetricFamily(
|
||||
USER_WAS_NOTIFIED_OF_ALERT_GROUPS, "Number of alert groups user was notified of", labels=self._user_labels
|
||||
)
|
||||
processed_org_ids = set()
|
||||
user_was_notified_keys = [get_metric_user_was_notified_of_alert_groups_key(org_id) for org_id in org_ids]
|
||||
org_users: typing.Dict[str, typing.Dict[int, UserWasNotifiedOfAlertGroupsMetricsDict]] = cache.get_many(
|
||||
user_was_notified_keys
|
||||
)
|
||||
for org_key, users in org_users.items():
|
||||
for _, user_data in users.items():
|
||||
# Labels values should have the same order as _user_labels
|
||||
labels_values = [
|
||||
user_data["user_username"], # username
|
||||
user_data["org_id"], # grafana org_id
|
||||
user_data["slug"], # grafana instance slug
|
||||
user_data["id"], # grafana instance id
|
||||
]
|
||||
labels_values = list(map(str, labels_values))
|
||||
user_was_notified.add_metric(labels_values, user_data["counter"])
|
||||
org_id_from_key = RE_USER_WAS_NOTIFIED_OF_ALERT_GROUPS.match(org_key).groups()[0]
|
||||
processed_org_ids.add(int(org_id_from_key))
|
||||
missing_org_ids = org_ids - processed_org_ids
|
||||
return user_was_notified, missing_org_ids
|
||||
|
||||
def _update_new_metric(self, metric_name, org_ids, missing_org_ids):
|
||||
"""
|
||||
This method is used for new metrics to calculate metrics gradually and avoid force recalculation for all orgs
|
||||
"""
|
||||
calculation_started_key = get_metric_calculation_started_key(metric_name)
|
||||
is_calculation_started = cache.get(calculation_started_key)
|
||||
if len(missing_org_ids) == len(org_ids) or is_calculation_started:
|
||||
missing_org_ids = set()
|
||||
if not is_calculation_started:
|
||||
start_recalculation_for_new_metric.apply_async((metric_name,))
|
||||
return missing_org_ids
|
||||
|
||||
def recalculate_cache_for_missing_org_ids(self, org_ids, missing_org_ids):
|
||||
cache_timer_for_org_keys = [get_metrics_cache_timer_key(org_id) for org_id in org_ids]
|
||||
cache_timers_for_org = cache.get_many(cache_timer_for_org_keys)
|
||||
recalculate_orgs: typing.List[RecalculateOrgMetricsDict] = []
|
||||
for org_id in org_ids:
|
||||
force_task = org_id in missing_org_ids
|
||||
if force_task or not cache_timers_for_org.get(get_metrics_cache_timer_key(org_id)):
|
||||
recalculate_orgs.append({"organization_id": org_id, "force": force_task})
|
||||
if recalculate_orgs:
|
||||
start_calculate_and_cache_metrics.apply_async((recalculate_orgs,))
|
||||
|
||||
def get_buckets_with_sum(self, values):
|
||||
def _get_buckets_with_sum(self, values: typing.List[int]) -> typing.Tuple[typing.Dict[str, float], int]:
|
||||
"""Put values in correct buckets and count values sum"""
|
||||
buckets_values = {str(key): 0 for key in self._buckets}
|
||||
sum_value = 0
|
||||
|
|
@ -219,5 +184,51 @@ class ApplicationMetricsCollector:
|
|||
sum_value += value
|
||||
return buckets_values, sum_value
|
||||
|
||||
def _get_labels_from_integration_data(
|
||||
self, integration_data: AlertGroupsTotalMetricsDict | AlertGroupsResponseTimeMetricsDict
|
||||
) -> typing.List[str]:
|
||||
# Labels values should have the same order as _integration_labels_with_state
|
||||
labels_values = [
|
||||
integration_data["integration_name"], # integration
|
||||
integration_data["team_name"], # team
|
||||
integration_data["org_id"], # grafana org_id
|
||||
integration_data["slug"], # grafana instance slug
|
||||
integration_data["id"], # grafana instance id
|
||||
]
|
||||
return list(map(str, labels_values))
|
||||
|
||||
application_metrics_registry.register(ApplicationMetricsCollector())
|
||||
def _get_labels_from_user_data(self, user_data: UserWasNotifiedOfAlertGroupsMetricsDict) -> typing.List[str]:
|
||||
# Labels values should have the same order as _user_labels
|
||||
labels_values = [
|
||||
user_data["user_username"], # username
|
||||
user_data["org_id"], # grafana org_id
|
||||
user_data["slug"], # grafana instance slug
|
||||
user_data["id"], # grafana instance id
|
||||
]
|
||||
return list(map(str, labels_values))
|
||||
|
||||
def _update_new_metric(self, metric_name: str, org_ids: set[int], missing_org_ids: set[int]) -> set[int]:
|
||||
"""
|
||||
This method is used for new metrics to calculate metrics gradually and avoid force recalculation for all orgs
|
||||
Add to collect() method the following code with metric name when needed:
|
||||
# update new metric gradually
|
||||
missing_org_ids_X = self._update_new_metric(<NEW_METRIC_NAME>, org_ids, missing_org_ids_X)
|
||||
"""
|
||||
calculation_started_key = get_metric_calculation_started_key(metric_name)
|
||||
is_calculation_started = cache.get(calculation_started_key)
|
||||
if len(missing_org_ids) == len(org_ids) or is_calculation_started:
|
||||
missing_org_ids = set()
|
||||
if not is_calculation_started:
|
||||
start_recalculation_for_new_metric.apply_async((metric_name,))
|
||||
return missing_org_ids
|
||||
|
||||
def recalculate_cache_for_missing_org_ids(self, org_ids: set[int], missing_org_ids: set[int]) -> None:
|
||||
cache_timer_for_org_keys = [get_metrics_cache_timer_key(org_id) for org_id in org_ids]
|
||||
cache_timers_for_org = cache.get_many(cache_timer_for_org_keys)
|
||||
recalculate_orgs: typing.List[RecalculateOrgMetricsDict] = []
|
||||
for org_id in org_ids:
|
||||
force_task = org_id in missing_org_ids
|
||||
if force_task or not cache_timers_for_org.get(get_metrics_cache_timer_key(org_id)):
|
||||
recalculate_orgs.append({"organization_id": org_id, "force": force_task})
|
||||
if recalculate_orgs:
|
||||
start_calculate_and_cache_metrics.apply_async((recalculate_orgs,))
|
||||
|
|
|
|||
|
|
@ -15,16 +15,44 @@ from apps.metrics_exporter.constants import (
|
|||
from apps.metrics_exporter.helpers import get_metric_alert_groups_response_time_key, get_metric_alert_groups_total_key
|
||||
from apps.metrics_exporter.metrics_collectors import ApplicationMetricsCollector
|
||||
from apps.metrics_exporter.tests.conftest import METRICS_TEST_SERVICE_NAME
|
||||
from settings.base import (
|
||||
METRIC_ALERT_GROUPS_RESPONSE_TIME_NAME,
|
||||
METRIC_ALERT_GROUPS_TOTAL_NAME,
|
||||
METRIC_USER_WAS_NOTIFIED_OF_ALERT_GROUPS_NAME,
|
||||
)
|
||||
|
||||
|
||||
# redis cluster usage modifies the cache keys for some operations, so we need to test both cases
|
||||
# see common.cache.ensure_cache_key_allocates_to_the_same_hash_slot for more details
|
||||
@pytest.mark.parametrize("use_redis_cluster", [True, False])
|
||||
@pytest.mark.parametrize(
|
||||
"metric_base_names_and_metric_names",
|
||||
[
|
||||
[
|
||||
[METRIC_ALERT_GROUPS_TOTAL_NAME, METRIC_USER_WAS_NOTIFIED_OF_ALERT_GROUPS_NAME],
|
||||
[ALERT_GROUPS_TOTAL, USER_WAS_NOTIFIED_OF_ALERT_GROUPS],
|
||||
],
|
||||
[[METRIC_ALERT_GROUPS_RESPONSE_TIME_NAME], [ALERT_GROUPS_RESPONSE_TIME]],
|
||||
[
|
||||
[
|
||||
METRIC_ALERT_GROUPS_TOTAL_NAME,
|
||||
METRIC_ALERT_GROUPS_RESPONSE_TIME_NAME,
|
||||
METRIC_USER_WAS_NOTIFIED_OF_ALERT_GROUPS_NAME,
|
||||
],
|
||||
[ALERT_GROUPS_TOTAL, USER_WAS_NOTIFIED_OF_ALERT_GROUPS, ALERT_GROUPS_RESPONSE_TIME],
|
||||
],
|
||||
],
|
||||
)
|
||||
@patch("apps.metrics_exporter.metrics_collectors.get_organization_ids", return_value=[1])
|
||||
@patch("apps.metrics_exporter.metrics_collectors.start_calculate_and_cache_metrics.apply_async")
|
||||
@pytest.mark.django_db
|
||||
def test_application_metrics_collector(
|
||||
mocked_org_ids, mocked_start_calculate_and_cache_metrics, mock_cache_get_metrics_for_collector, use_redis_cluster
|
||||
def test_application_metrics_collectors(
|
||||
mocked_org_ids,
|
||||
mocked_start_calculate_and_cache_metrics,
|
||||
mock_cache_get_metrics_for_collector,
|
||||
use_redis_cluster,
|
||||
metric_base_names_and_metric_names,
|
||||
settings,
|
||||
):
|
||||
"""Test that ApplicationMetricsCollector generates expected metrics from cache"""
|
||||
|
||||
|
|
@ -41,10 +69,16 @@ def test_application_metrics_collector(
|
|||
return labels
|
||||
|
||||
with override_settings(USE_REDIS_CLUSTER=use_redis_cluster):
|
||||
settings.METRICS_TO_COLLECT = metric_base_names_and_metric_names[0]
|
||||
collector = ApplicationMetricsCollector()
|
||||
test_metrics_registry = CollectorRegistry()
|
||||
test_metrics_registry.register(collector)
|
||||
for metric in test_metrics_registry.collect():
|
||||
|
||||
metrics = [i for i in test_metrics_registry.collect()]
|
||||
assert len(metrics) == len(metric_base_names_and_metric_names[1])
|
||||
|
||||
for metric in metrics:
|
||||
assert metric.name in metric_base_names_and_metric_names[1]
|
||||
if metric.name == ALERT_GROUPS_TOTAL:
|
||||
# 2 integrations with labels for each alert group state per service
|
||||
assert len(metric.samples) == len(AlertGroupState) * 3 # 2 from 1st integration and 1 from 2nd
|
||||
|
|
@ -71,6 +105,8 @@ def test_application_metrics_collector(
|
|||
elif metric.name == USER_WAS_NOTIFIED_OF_ALERT_GROUPS:
|
||||
# metric with labels for each notified user
|
||||
assert len(metric.samples) == 1
|
||||
else:
|
||||
raise AssertionError
|
||||
result = generate_latest(test_metrics_registry).decode("utf-8")
|
||||
assert result is not None
|
||||
assert mocked_org_ids.called
|
||||
|
|
@ -91,7 +127,9 @@ def test_application_metrics_collector_with_old_metrics_without_services(
|
|||
collector = ApplicationMetricsCollector()
|
||||
test_metrics_registry = CollectorRegistry()
|
||||
test_metrics_registry.register(collector)
|
||||
for metric in test_metrics_registry.collect():
|
||||
metrics = [i for i in test_metrics_registry.collect()]
|
||||
assert len(metrics) == 3
|
||||
for metric in metrics:
|
||||
if metric.name == ALERT_GROUPS_TOTAL:
|
||||
alert_groups_total_metrics_cache = cache.get(get_metric_alert_groups_total_key(org_id))
|
||||
assert alert_groups_total_metrics_cache and "services" not in alert_groups_total_metrics_cache[1]
|
||||
|
|
@ -106,6 +144,8 @@ def test_application_metrics_collector_with_old_metrics_without_services(
|
|||
elif metric.name == USER_WAS_NOTIFIED_OF_ALERT_GROUPS:
|
||||
# metric with labels for each notified user
|
||||
assert len(metric.samples) == 1
|
||||
else:
|
||||
raise AssertionError
|
||||
result = generate_latest(test_metrics_registry).decode("utf-8")
|
||||
assert result is not None
|
||||
assert mocked_org_ids.called
|
||||
|
|
|
|||
|
|
@ -107,6 +107,17 @@ CHATOPS_SIGNING_SECRET = os.environ.get("CHATOPS_SIGNING_SECRET", None)
|
|||
|
||||
# Prometheus exporter metrics endpoint auth
|
||||
PROMETHEUS_EXPORTER_SECRET = os.environ.get("PROMETHEUS_EXPORTER_SECRET")
|
||||
# Application metric names without prefixes
|
||||
METRIC_ALERT_GROUPS_TOTAL_NAME = "alert_groups_total"
|
||||
METRIC_ALERT_GROUPS_RESPONSE_TIME_NAME = "alert_groups_response_time"
|
||||
METRIC_USER_WAS_NOTIFIED_OF_ALERT_GROUPS_NAME = "user_was_notified_of_alert_groups"
|
||||
METRICS_ALL = [
|
||||
METRIC_ALERT_GROUPS_TOTAL_NAME,
|
||||
METRIC_ALERT_GROUPS_RESPONSE_TIME_NAME,
|
||||
METRIC_USER_WAS_NOTIFIED_OF_ALERT_GROUPS_NAME,
|
||||
]
|
||||
# List of metrics to collect. Collect all available application metrics by default
|
||||
METRICS_TO_COLLECT = os.environ.get("METRICS_TO_COLLECT", METRICS_ALL)
|
||||
|
||||
|
||||
# Database
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue