diff --git a/engine/apps/metrics_exporter/metrics_collectors.py b/engine/apps/metrics_exporter/metrics_collectors.py index 720cdea9..db1ecc14 100644 --- a/engine/apps/metrics_exporter/metrics_collectors.py +++ b/engine/apps/metrics_exporter/metrics_collectors.py @@ -2,9 +2,10 @@ import logging import re import typing +from django.conf import settings from django.core.cache import cache from prometheus_client import CollectorRegistry -from prometheus_client.metrics_core import CounterMetricFamily, GaugeMetricFamily, HistogramMetricFamily +from prometheus_client.metrics_core import CounterMetricFamily, GaugeMetricFamily, HistogramMetricFamily, Metric from apps.alerts.constants import AlertGroupState from apps.metrics_exporter.constants import ( @@ -26,6 +27,11 @@ from apps.metrics_exporter.helpers import ( get_organization_ids, ) from apps.metrics_exporter.tasks import start_calculate_and_cache_metrics, start_recalculation_for_new_metric +from settings.base import ( + METRIC_ALERT_GROUPS_RESPONSE_TIME_NAME, + METRIC_ALERT_GROUPS_TOTAL_NAME, + METRIC_USER_WAS_NOTIFIED_OF_ALERT_GROUPS_NAME, +) application_metrics_registry = CollectorRegistry() @@ -42,6 +48,8 @@ RE_USER_WAS_NOTIFIED_OF_ALERT_GROUPS = re.compile(_RE_BASE_PATTERN.format(USER_W # https://github.com/prometheus/client_python#custom-collectors class ApplicationMetricsCollector: + GetMetricFunc = typing.Callable[[set], typing.Tuple[Metric, set]] + def __init__(self): self._buckets = (60, 300, 600, 3600, "+Inf") self._stack_labels = [ @@ -61,29 +69,33 @@ class ApplicationMetricsCollector: self._user_labels = ["username"] + self._stack_labels def collect(self): + """ + Collects metrics listed in METRICS_TO_COLLECT settings var + """ + metrics_map: typing.Dict[str, ApplicationMetricsCollector.GetMetricFunc] = { + METRIC_ALERT_GROUPS_TOTAL_NAME: self._get_alert_groups_total_metric, + METRIC_ALERT_GROUPS_RESPONSE_TIME_NAME: self._get_response_time_metric, + METRIC_USER_WAS_NOTIFIED_OF_ALERT_GROUPS_NAME: self._get_user_was_notified_of_alert_groups_metric, + } org_ids = set(get_organization_ids()) + metrics: typing.List[Metric] = [] + missing_org_ids: typing.Set[int] = set() - # alert groups total metric: gauge - alert_groups_total, missing_org_ids_1 = self._get_alert_groups_total_metric(org_ids) - # alert groups response time metrics: histogram - alert_groups_response_time_seconds, missing_org_ids_2 = self._get_response_time_metric(org_ids) - # user was notified of alert groups metrics: counter - user_was_notified, missing_org_ids_3 = self._get_user_was_notified_of_alert_groups_metric(org_ids) - - # This part is used for releasing new metrics to avoid recalculation for every metric. - # Uncomment with metric name when needed. - # # update new metric gradually - # missing_org_ids_3 = self._update_new_metric(USER_WAS_NOTIFIED_OF_ALERT_GROUPS, org_ids, missing_org_ids_3) + for metric_name in settings.METRICS_TO_COLLECT: + if metric_name not in metrics_map: + logger.error(f"Invalid metric name {metric_name} in `METRICS_TO_COLLECT` var") + continue + metric, missing_org_ids_temp = metrics_map[metric_name](org_ids) + metrics.append(metric) + missing_org_ids |= missing_org_ids_temp # check for orgs missing any of the metrics or needing a refresh, start recalculation task for missing org ids - missing_org_ids = missing_org_ids_1 | missing_org_ids_2 | missing_org_ids_3 self.recalculate_cache_for_missing_org_ids(org_ids, missing_org_ids) - yield alert_groups_total - yield alert_groups_response_time_seconds - yield user_was_notified + for metric in metrics: + yield metric - def _get_alert_groups_total_metric(self, org_ids): + def _get_alert_groups_total_metric(self, org_ids: set[int]) -> typing.Tuple[Metric, set[int]]: alert_groups_total = GaugeMetricFamily( ALERT_GROUPS_TOTAL, "All alert groups", labels=self._integration_labels_with_state ) @@ -98,15 +110,7 @@ class ApplicationMetricsCollector: logger.warning(f"Deleting stale metrics cache for {org_key}") cache.delete(org_key) break - # Labels values should have the same order as _integration_labels_with_state - labels_values = [ - integration_data["integration_name"], # integration - integration_data["team_name"], # team - integration_data["org_id"], # grafana org_id - integration_data["slug"], # grafana instance slug - integration_data["id"], # grafana instance id - ] - labels_values = list(map(str, labels_values)) + labels_values: typing.List[str] = self._get_labels_from_integration_data(integration_data) for service_name in integration_data["services"]: for state in AlertGroupState: alert_groups_total.add_metric( @@ -118,7 +122,25 @@ class ApplicationMetricsCollector: missing_org_ids = org_ids - processed_org_ids return alert_groups_total, missing_org_ids - def _get_response_time_metric(self, org_ids): + def _get_user_was_notified_of_alert_groups_metric(self, org_ids: set[int]) -> typing.Tuple[Metric, set[int]]: + user_was_notified = CounterMetricFamily( + USER_WAS_NOTIFIED_OF_ALERT_GROUPS, "Number of alert groups user was notified of", labels=self._user_labels + ) + processed_org_ids = set() + user_was_notified_keys = [get_metric_user_was_notified_of_alert_groups_key(org_id) for org_id in org_ids] + org_users: typing.Dict[str, typing.Dict[int, UserWasNotifiedOfAlertGroupsMetricsDict]] = cache.get_many( + user_was_notified_keys + ) + for org_key, users in org_users.items(): + for _, user_data in users.items(): + labels_values: typing.List[str] = self._get_labels_from_user_data(user_data) + user_was_notified.add_metric(labels_values, user_data["counter"]) + org_id_from_key = RE_USER_WAS_NOTIFIED_OF_ALERT_GROUPS.match(org_key).groups()[0] + processed_org_ids.add(int(org_id_from_key)) + missing_org_ids = org_ids - processed_org_ids + return user_was_notified, missing_org_ids + + def _get_response_time_metric(self, org_ids: set[int]) -> typing.Tuple[Metric, set[int]]: alert_groups_response_time_seconds = HistogramMetricFamily( ALERT_GROUPS_RESPONSE_TIME, "Users response time to alert groups in 7 days (seconds)", @@ -135,21 +157,12 @@ class ApplicationMetricsCollector: logger.warning(f"Deleting stale metrics cache for {org_key}") cache.delete(org_key) break - # Labels values should have the same order as _integration_labels - labels_values = [ - integration_data["integration_name"], # integration - integration_data["team_name"], # team - integration_data["org_id"], # grafana org_id - integration_data["slug"], # grafana instance slug - integration_data["id"], # grafana instance id - ] - labels_values = list(map(str, labels_values)) - + labels_values: typing.List[str] = self._get_labels_from_integration_data(integration_data) for service_name, response_time in integration_data["services"].items(): if not response_time: continue - buckets, sum_value = self.get_buckets_with_sum(response_time) - buckets = sorted(list(buckets.items()), key=lambda x: float(x[0])) + buckets_values, sum_value = self._get_buckets_with_sum(response_time) + buckets: list = sorted(list(buckets_values.items()), key=lambda x: float(x[0])) alert_groups_response_time_seconds.add_metric( labels_values + [service_name], buckets=buckets, @@ -160,55 +173,7 @@ class ApplicationMetricsCollector: missing_org_ids = org_ids - processed_org_ids return alert_groups_response_time_seconds, missing_org_ids - def _get_user_was_notified_of_alert_groups_metric(self, org_ids): - user_was_notified = CounterMetricFamily( - USER_WAS_NOTIFIED_OF_ALERT_GROUPS, "Number of alert groups user was notified of", labels=self._user_labels - ) - processed_org_ids = set() - user_was_notified_keys = [get_metric_user_was_notified_of_alert_groups_key(org_id) for org_id in org_ids] - org_users: typing.Dict[str, typing.Dict[int, UserWasNotifiedOfAlertGroupsMetricsDict]] = cache.get_many( - user_was_notified_keys - ) - for org_key, users in org_users.items(): - for _, user_data in users.items(): - # Labels values should have the same order as _user_labels - labels_values = [ - user_data["user_username"], # username - user_data["org_id"], # grafana org_id - user_data["slug"], # grafana instance slug - user_data["id"], # grafana instance id - ] - labels_values = list(map(str, labels_values)) - user_was_notified.add_metric(labels_values, user_data["counter"]) - org_id_from_key = RE_USER_WAS_NOTIFIED_OF_ALERT_GROUPS.match(org_key).groups()[0] - processed_org_ids.add(int(org_id_from_key)) - missing_org_ids = org_ids - processed_org_ids - return user_was_notified, missing_org_ids - - def _update_new_metric(self, metric_name, org_ids, missing_org_ids): - """ - This method is used for new metrics to calculate metrics gradually and avoid force recalculation for all orgs - """ - calculation_started_key = get_metric_calculation_started_key(metric_name) - is_calculation_started = cache.get(calculation_started_key) - if len(missing_org_ids) == len(org_ids) or is_calculation_started: - missing_org_ids = set() - if not is_calculation_started: - start_recalculation_for_new_metric.apply_async((metric_name,)) - return missing_org_ids - - def recalculate_cache_for_missing_org_ids(self, org_ids, missing_org_ids): - cache_timer_for_org_keys = [get_metrics_cache_timer_key(org_id) for org_id in org_ids] - cache_timers_for_org = cache.get_many(cache_timer_for_org_keys) - recalculate_orgs: typing.List[RecalculateOrgMetricsDict] = [] - for org_id in org_ids: - force_task = org_id in missing_org_ids - if force_task or not cache_timers_for_org.get(get_metrics_cache_timer_key(org_id)): - recalculate_orgs.append({"organization_id": org_id, "force": force_task}) - if recalculate_orgs: - start_calculate_and_cache_metrics.apply_async((recalculate_orgs,)) - - def get_buckets_with_sum(self, values): + def _get_buckets_with_sum(self, values: typing.List[int]) -> typing.Tuple[typing.Dict[str, float], int]: """Put values in correct buckets and count values sum""" buckets_values = {str(key): 0 for key in self._buckets} sum_value = 0 @@ -219,5 +184,51 @@ class ApplicationMetricsCollector: sum_value += value return buckets_values, sum_value + def _get_labels_from_integration_data( + self, integration_data: AlertGroupsTotalMetricsDict | AlertGroupsResponseTimeMetricsDict + ) -> typing.List[str]: + # Labels values should have the same order as _integration_labels_with_state + labels_values = [ + integration_data["integration_name"], # integration + integration_data["team_name"], # team + integration_data["org_id"], # grafana org_id + integration_data["slug"], # grafana instance slug + integration_data["id"], # grafana instance id + ] + return list(map(str, labels_values)) -application_metrics_registry.register(ApplicationMetricsCollector()) + def _get_labels_from_user_data(self, user_data: UserWasNotifiedOfAlertGroupsMetricsDict) -> typing.List[str]: + # Labels values should have the same order as _user_labels + labels_values = [ + user_data["user_username"], # username + user_data["org_id"], # grafana org_id + user_data["slug"], # grafana instance slug + user_data["id"], # grafana instance id + ] + return list(map(str, labels_values)) + + def _update_new_metric(self, metric_name: str, org_ids: set[int], missing_org_ids: set[int]) -> set[int]: + """ + This method is used for new metrics to calculate metrics gradually and avoid force recalculation for all orgs + Add to collect() method the following code with metric name when needed: + # update new metric gradually + missing_org_ids_X = self._update_new_metric(, org_ids, missing_org_ids_X) + """ + calculation_started_key = get_metric_calculation_started_key(metric_name) + is_calculation_started = cache.get(calculation_started_key) + if len(missing_org_ids) == len(org_ids) or is_calculation_started: + missing_org_ids = set() + if not is_calculation_started: + start_recalculation_for_new_metric.apply_async((metric_name,)) + return missing_org_ids + + def recalculate_cache_for_missing_org_ids(self, org_ids: set[int], missing_org_ids: set[int]) -> None: + cache_timer_for_org_keys = [get_metrics_cache_timer_key(org_id) for org_id in org_ids] + cache_timers_for_org = cache.get_many(cache_timer_for_org_keys) + recalculate_orgs: typing.List[RecalculateOrgMetricsDict] = [] + for org_id in org_ids: + force_task = org_id in missing_org_ids + if force_task or not cache_timers_for_org.get(get_metrics_cache_timer_key(org_id)): + recalculate_orgs.append({"organization_id": org_id, "force": force_task}) + if recalculate_orgs: + start_calculate_and_cache_metrics.apply_async((recalculate_orgs,)) diff --git a/engine/apps/metrics_exporter/tests/test_metrics_collectors.py b/engine/apps/metrics_exporter/tests/test_metrics_collectors.py index 640ac57b..4dcd1a32 100644 --- a/engine/apps/metrics_exporter/tests/test_metrics_collectors.py +++ b/engine/apps/metrics_exporter/tests/test_metrics_collectors.py @@ -15,16 +15,44 @@ from apps.metrics_exporter.constants import ( from apps.metrics_exporter.helpers import get_metric_alert_groups_response_time_key, get_metric_alert_groups_total_key from apps.metrics_exporter.metrics_collectors import ApplicationMetricsCollector from apps.metrics_exporter.tests.conftest import METRICS_TEST_SERVICE_NAME +from settings.base import ( + METRIC_ALERT_GROUPS_RESPONSE_TIME_NAME, + METRIC_ALERT_GROUPS_TOTAL_NAME, + METRIC_USER_WAS_NOTIFIED_OF_ALERT_GROUPS_NAME, +) # redis cluster usage modifies the cache keys for some operations, so we need to test both cases # see common.cache.ensure_cache_key_allocates_to_the_same_hash_slot for more details @pytest.mark.parametrize("use_redis_cluster", [True, False]) +@pytest.mark.parametrize( + "metric_base_names_and_metric_names", + [ + [ + [METRIC_ALERT_GROUPS_TOTAL_NAME, METRIC_USER_WAS_NOTIFIED_OF_ALERT_GROUPS_NAME], + [ALERT_GROUPS_TOTAL, USER_WAS_NOTIFIED_OF_ALERT_GROUPS], + ], + [[METRIC_ALERT_GROUPS_RESPONSE_TIME_NAME], [ALERT_GROUPS_RESPONSE_TIME]], + [ + [ + METRIC_ALERT_GROUPS_TOTAL_NAME, + METRIC_ALERT_GROUPS_RESPONSE_TIME_NAME, + METRIC_USER_WAS_NOTIFIED_OF_ALERT_GROUPS_NAME, + ], + [ALERT_GROUPS_TOTAL, USER_WAS_NOTIFIED_OF_ALERT_GROUPS, ALERT_GROUPS_RESPONSE_TIME], + ], + ], +) @patch("apps.metrics_exporter.metrics_collectors.get_organization_ids", return_value=[1]) @patch("apps.metrics_exporter.metrics_collectors.start_calculate_and_cache_metrics.apply_async") @pytest.mark.django_db -def test_application_metrics_collector( - mocked_org_ids, mocked_start_calculate_and_cache_metrics, mock_cache_get_metrics_for_collector, use_redis_cluster +def test_application_metrics_collectors( + mocked_org_ids, + mocked_start_calculate_and_cache_metrics, + mock_cache_get_metrics_for_collector, + use_redis_cluster, + metric_base_names_and_metric_names, + settings, ): """Test that ApplicationMetricsCollector generates expected metrics from cache""" @@ -41,10 +69,16 @@ def test_application_metrics_collector( return labels with override_settings(USE_REDIS_CLUSTER=use_redis_cluster): + settings.METRICS_TO_COLLECT = metric_base_names_and_metric_names[0] collector = ApplicationMetricsCollector() test_metrics_registry = CollectorRegistry() test_metrics_registry.register(collector) - for metric in test_metrics_registry.collect(): + + metrics = [i for i in test_metrics_registry.collect()] + assert len(metrics) == len(metric_base_names_and_metric_names[1]) + + for metric in metrics: + assert metric.name in metric_base_names_and_metric_names[1] if metric.name == ALERT_GROUPS_TOTAL: # 2 integrations with labels for each alert group state per service assert len(metric.samples) == len(AlertGroupState) * 3 # 2 from 1st integration and 1 from 2nd @@ -71,6 +105,8 @@ def test_application_metrics_collector( elif metric.name == USER_WAS_NOTIFIED_OF_ALERT_GROUPS: # metric with labels for each notified user assert len(metric.samples) == 1 + else: + raise AssertionError result = generate_latest(test_metrics_registry).decode("utf-8") assert result is not None assert mocked_org_ids.called @@ -91,7 +127,9 @@ def test_application_metrics_collector_with_old_metrics_without_services( collector = ApplicationMetricsCollector() test_metrics_registry = CollectorRegistry() test_metrics_registry.register(collector) - for metric in test_metrics_registry.collect(): + metrics = [i for i in test_metrics_registry.collect()] + assert len(metrics) == 3 + for metric in metrics: if metric.name == ALERT_GROUPS_TOTAL: alert_groups_total_metrics_cache = cache.get(get_metric_alert_groups_total_key(org_id)) assert alert_groups_total_metrics_cache and "services" not in alert_groups_total_metrics_cache[1] @@ -106,6 +144,8 @@ def test_application_metrics_collector_with_old_metrics_without_services( elif metric.name == USER_WAS_NOTIFIED_OF_ALERT_GROUPS: # metric with labels for each notified user assert len(metric.samples) == 1 + else: + raise AssertionError result = generate_latest(test_metrics_registry).decode("utf-8") assert result is not None assert mocked_org_ids.called diff --git a/engine/settings/base.py b/engine/settings/base.py index 5c77c080..9aace2df 100644 --- a/engine/settings/base.py +++ b/engine/settings/base.py @@ -107,6 +107,17 @@ CHATOPS_SIGNING_SECRET = os.environ.get("CHATOPS_SIGNING_SECRET", None) # Prometheus exporter metrics endpoint auth PROMETHEUS_EXPORTER_SECRET = os.environ.get("PROMETHEUS_EXPORTER_SECRET") +# Application metric names without prefixes +METRIC_ALERT_GROUPS_TOTAL_NAME = "alert_groups_total" +METRIC_ALERT_GROUPS_RESPONSE_TIME_NAME = "alert_groups_response_time" +METRIC_USER_WAS_NOTIFIED_OF_ALERT_GROUPS_NAME = "user_was_notified_of_alert_groups" +METRICS_ALL = [ + METRIC_ALERT_GROUPS_TOTAL_NAME, + METRIC_ALERT_GROUPS_RESPONSE_TIME_NAME, + METRIC_USER_WAS_NOTIFIED_OF_ALERT_GROUPS_NAME, +] +# List of metrics to collect. Collect all available application metrics by default +METRICS_TO_COLLECT = os.environ.get("METRICS_TO_COLLECT", METRICS_ALL) # Database