From f21bd8d0b582b02f2f1d702c15875ddfeda57d1c Mon Sep 17 00:00:00 2001 From: Matias Bordese Date: Wed, 14 Jun 2023 09:14:29 -0300 Subject: [PATCH] Refactor metrics exporter to use `cache.get_many` (#2212) Eventually we could also process orgs by chunks. --- .../metrics_exporter/metrics_collectors.py | 126 ++++++++++-------- .../apps/metrics_exporter/tests/conftest.py | 8 ++ 2 files changed, 76 insertions(+), 58 deletions(-) diff --git a/engine/apps/metrics_exporter/metrics_collectors.py b/engine/apps/metrics_exporter/metrics_collectors.py index 67dcf2c2..a659b739 100644 --- a/engine/apps/metrics_exporter/metrics_collectors.py +++ b/engine/apps/metrics_exporter/metrics_collectors.py @@ -1,3 +1,4 @@ +import re import typing from django.core.cache import cache @@ -15,7 +16,7 @@ from apps.metrics_exporter.constants import ( from apps.metrics_exporter.helpers import ( get_metric_alert_groups_response_time_key, get_metric_alert_groups_total_key, - get_metrics_cache_timer_for_organization, + get_metrics_cache_timer_key, get_organization_ids, ) from apps.metrics_exporter.tasks import start_calculate_and_cache_metrics @@ -23,6 +24,10 @@ from apps.metrics_exporter.tasks import start_calculate_and_cache_metrics application_metrics_registry = CollectorRegistry() +RE_ALERT_GROUPS_TOTAL = re.compile(r"{}_(\d+)".format(ALERT_GROUPS_TOTAL)) +RE_ALERT_GROUPS_RESPONSE_TIME = re.compile(r"{}_(\d+)".format(ALERT_GROUPS_RESPONSE_TIME)) + + # https://github.com/prometheus/client_python#custom-collectors class ApplicationMetricsCollector: def __init__(self): @@ -42,67 +47,72 @@ class ApplicationMetricsCollector: ALERT_GROUPS_RESPONSE_TIME, "Users response time to alert groups in 7 days (seconds)", labels=self._labels ) - metrics_to_recalculate = [] - org_ids = get_organization_ids() + org_ids = set(get_organization_ids()) - for organization_id in org_ids: - start_calculation_task = False - # get alert_groups_total metric - alert_groups_total_key = get_metric_alert_groups_total_key(organization_id) - ag_states: typing.Dict[int, AlertGroupsTotalMetricsDict] = cache.get(alert_groups_total_key) + # alert groups total metrics + processed_org_ids = set() + alert_groups_total_keys = [get_metric_alert_groups_total_key(org_id) for org_id in org_ids] + org_ag_states: typing.Dict[str, typing.Dict[int, AlertGroupsTotalMetricsDict]] = cache.get_many( + alert_groups_total_keys + ) + for org_key, ag_states in org_ag_states.items(): + for integration, integration_data in ag_states.items(): + # Labels values should have the same order as _labels + labels_values = [ + integration_data["integration_name"], # integration + integration_data["team_name"], # team + integration_data["org_id"], # org_id + integration_data["slug"], # grafana instance slug + integration_data["id"], # grafana instance id + ] + labels_values = list(map(str, labels_values)) + for state in AlertGroupState: + alert_groups_total.add_metric(labels_values + [state.value], integration_data[state.value]) + org_id_from_key = RE_ALERT_GROUPS_TOTAL.match(org_key).groups()[0] + processed_org_ids.add(int(org_id_from_key)) + # get missing orgs + missing_org_ids_1 = org_ids - processed_org_ids - if ag_states is None: - start_calculation_task = True - else: - for integration, integration_data in ag_states.items(): - # Labels values should have the same order as _labels - labels_values = [ - integration_data["integration_name"], # integration - integration_data["team_name"], # team - integration_data["org_id"], # org_id - integration_data["slug"], # grafana instance slug - integration_data["id"], # grafana instance id - ] + # alert groups response time metrics + processed_org_ids = set() + alert_groups_response_time_keys = [get_metric_alert_groups_response_time_key(org_id) for org_id in org_ids] + org_ag_response_times: typing.Dict[str, typing.Dict[int, AlertGroupsResponseTimeMetricsDict]] = cache.get_many( + alert_groups_response_time_keys + ) + for org_key, ag_response_time in org_ag_response_times.items(): + for integration, integration_data in ag_response_time.items(): + # Labels values should have the same order as _labels + labels_values = [ + integration_data["integration_name"], # integration + integration_data["team_name"], # team + integration_data["org_id"], # org_id + integration_data["slug"], # grafana instance slug + integration_data["id"], # grafana instance id + ] + labels_values = list(map(str, labels_values)) - labels_values = list(map(str, labels_values)) - for state in AlertGroupState: - alert_groups_total.add_metric(labels_values + [state.value], integration_data[state.value]) + response_time_values = integration_data["response_time"] + if not response_time_values: + continue + buckets, sum_value = self.get_buckets_with_sum(response_time_values) + buckets = sorted(list(buckets.items()), key=lambda x: float(x[0])) + alert_groups_response_time_seconds.add_metric(labels_values, buckets=buckets, sum_value=sum_value) + org_id_from_key = RE_ALERT_GROUPS_RESPONSE_TIME.match(org_key).groups()[0] + processed_org_ids.add(int(org_id_from_key)) + # get missing orgs + missing_org_ids_2 = org_ids - processed_org_ids - # get alert_groups_response_time metric - alert_groups_response_time_key = get_metric_alert_groups_response_time_key(organization_id) - ag_response_time: typing.Dict[int, AlertGroupsResponseTimeMetricsDict] = cache.get( - alert_groups_response_time_key - ) - if ag_response_time is None: - start_calculation_task = True - else: - for integration, integration_data in ag_response_time.items(): - # Labels values should have the same order as _labels - labels_values = [ - integration_data["integration_name"], # integration - integration_data["team_name"], # team - integration_data["org_id"], # org_id - integration_data["slug"], # grafana instance slug - integration_data["id"], # grafana instance id - ] - labels_values = list(map(str, labels_values)) - - response_time_values = integration_data["response_time"] - if not response_time_values: - continue - buckets, sum_value = self.get_buckets_with_sum(response_time_values) - buckets = sorted(list(buckets.items()), key=lambda x: float(x[0])) - alert_groups_response_time_seconds.add_metric(labels_values, buckets=buckets, sum_value=sum_value) - - if start_calculation_task or not get_metrics_cache_timer_for_organization(organization_id): - org_to_recalculate: RecalculateOrgMetricsDict = { - "organization_id": organization_id, - "force": start_calculation_task, - } - metrics_to_recalculate.append(org_to_recalculate) - - if metrics_to_recalculate: - start_calculate_and_cache_metrics.apply_async((metrics_to_recalculate,)) + # check for orgs missing any of the metrics or needing a refresh + missing_org_ids = missing_org_ids_1 | missing_org_ids_2 + cache_timer_for_org_keys = [get_metrics_cache_timer_key(org_id) for org_id in org_ids] + cache_timers_for_org = cache.get_many(cache_timer_for_org_keys) + recalculate_orgs: typing.List[RecalculateOrgMetricsDict] = [] + for org_id in org_ids: + force_task = org_id in missing_org_ids + if force_task or not cache_timers_for_org.get(get_metrics_cache_timer_key(org_id)): + recalculate_orgs.append({"organization_id": org_id, "force": force_task}) + if recalculate_orgs: + start_calculate_and_cache_metrics.apply_async((recalculate_orgs,)) yield alert_groups_total yield alert_groups_response_time_seconds diff --git a/engine/apps/metrics_exporter/tests/conftest.py b/engine/apps/metrics_exporter/tests/conftest.py index 9ebc3a29..19b45cdf 100644 --- a/engine/apps/metrics_exporter/tests/conftest.py +++ b/engine/apps/metrics_exporter/tests/conftest.py @@ -46,7 +46,11 @@ def mock_cache_get_metrics_for_collector(monkeypatch): } return test_metrics.get(key) + def _mock_cache_get_many(keys, *args, **kwargs): + return {key: _mock_cache_get(key) for key in keys if _mock_cache_get(key)} + monkeypatch.setattr(cache, "get", _mock_cache_get) + monkeypatch.setattr(cache, "get_many", _mock_cache_get_many) @pytest.fixture() @@ -54,7 +58,11 @@ def mock_get_metrics_cache(monkeypatch): def _mock_cache_get(key, *args, **kwargs): return {} + def _mock_cache_get_many(keys, *args, **kwargs): + return {} + monkeypatch.setattr(cache, "get", _mock_cache_get) + monkeypatch.setattr(cache, "get_many", _mock_cache_get_many) @pytest.fixture