Refactor metrics exporter to use cache.get_many (#2212)

Eventually we could also process orgs by chunks.
This commit is contained in:
Matias Bordese 2023-06-14 09:14:29 -03:00 committed by GitHub
parent a9245e3855
commit f21bd8d0b5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 76 additions and 58 deletions

View file

@ -1,3 +1,4 @@
import re
import typing
from django.core.cache import cache
@ -15,7 +16,7 @@ from apps.metrics_exporter.constants import (
from apps.metrics_exporter.helpers import (
get_metric_alert_groups_response_time_key,
get_metric_alert_groups_total_key,
get_metrics_cache_timer_for_organization,
get_metrics_cache_timer_key,
get_organization_ids,
)
from apps.metrics_exporter.tasks import start_calculate_and_cache_metrics
@ -23,6 +24,10 @@ from apps.metrics_exporter.tasks import start_calculate_and_cache_metrics
application_metrics_registry = CollectorRegistry()
RE_ALERT_GROUPS_TOTAL = re.compile(r"{}_(\d+)".format(ALERT_GROUPS_TOTAL))
RE_ALERT_GROUPS_RESPONSE_TIME = re.compile(r"{}_(\d+)".format(ALERT_GROUPS_RESPONSE_TIME))
# https://github.com/prometheus/client_python#custom-collectors
class ApplicationMetricsCollector:
def __init__(self):
@ -42,67 +47,72 @@ class ApplicationMetricsCollector:
ALERT_GROUPS_RESPONSE_TIME, "Users response time to alert groups in 7 days (seconds)", labels=self._labels
)
metrics_to_recalculate = []
org_ids = get_organization_ids()
org_ids = set(get_organization_ids())
for organization_id in org_ids:
start_calculation_task = False
# get alert_groups_total metric
alert_groups_total_key = get_metric_alert_groups_total_key(organization_id)
ag_states: typing.Dict[int, AlertGroupsTotalMetricsDict] = cache.get(alert_groups_total_key)
# alert groups total metrics
processed_org_ids = set()
alert_groups_total_keys = [get_metric_alert_groups_total_key(org_id) for org_id in org_ids]
org_ag_states: typing.Dict[str, typing.Dict[int, AlertGroupsTotalMetricsDict]] = cache.get_many(
alert_groups_total_keys
)
for org_key, ag_states in org_ag_states.items():
for integration, integration_data in ag_states.items():
# Labels values should have the same order as _labels
labels_values = [
integration_data["integration_name"], # integration
integration_data["team_name"], # team
integration_data["org_id"], # org_id
integration_data["slug"], # grafana instance slug
integration_data["id"], # grafana instance id
]
labels_values = list(map(str, labels_values))
for state in AlertGroupState:
alert_groups_total.add_metric(labels_values + [state.value], integration_data[state.value])
org_id_from_key = RE_ALERT_GROUPS_TOTAL.match(org_key).groups()[0]
processed_org_ids.add(int(org_id_from_key))
# get missing orgs
missing_org_ids_1 = org_ids - processed_org_ids
if ag_states is None:
start_calculation_task = True
else:
for integration, integration_data in ag_states.items():
# Labels values should have the same order as _labels
labels_values = [
integration_data["integration_name"], # integration
integration_data["team_name"], # team
integration_data["org_id"], # org_id
integration_data["slug"], # grafana instance slug
integration_data["id"], # grafana instance id
]
# alert groups response time metrics
processed_org_ids = set()
alert_groups_response_time_keys = [get_metric_alert_groups_response_time_key(org_id) for org_id in org_ids]
org_ag_response_times: typing.Dict[str, typing.Dict[int, AlertGroupsResponseTimeMetricsDict]] = cache.get_many(
alert_groups_response_time_keys
)
for org_key, ag_response_time in org_ag_response_times.items():
for integration, integration_data in ag_response_time.items():
# Labels values should have the same order as _labels
labels_values = [
integration_data["integration_name"], # integration
integration_data["team_name"], # team
integration_data["org_id"], # org_id
integration_data["slug"], # grafana instance slug
integration_data["id"], # grafana instance id
]
labels_values = list(map(str, labels_values))
labels_values = list(map(str, labels_values))
for state in AlertGroupState:
alert_groups_total.add_metric(labels_values + [state.value], integration_data[state.value])
response_time_values = integration_data["response_time"]
if not response_time_values:
continue
buckets, sum_value = self.get_buckets_with_sum(response_time_values)
buckets = sorted(list(buckets.items()), key=lambda x: float(x[0]))
alert_groups_response_time_seconds.add_metric(labels_values, buckets=buckets, sum_value=sum_value)
org_id_from_key = RE_ALERT_GROUPS_RESPONSE_TIME.match(org_key).groups()[0]
processed_org_ids.add(int(org_id_from_key))
# get missing orgs
missing_org_ids_2 = org_ids - processed_org_ids
# get alert_groups_response_time metric
alert_groups_response_time_key = get_metric_alert_groups_response_time_key(organization_id)
ag_response_time: typing.Dict[int, AlertGroupsResponseTimeMetricsDict] = cache.get(
alert_groups_response_time_key
)
if ag_response_time is None:
start_calculation_task = True
else:
for integration, integration_data in ag_response_time.items():
# Labels values should have the same order as _labels
labels_values = [
integration_data["integration_name"], # integration
integration_data["team_name"], # team
integration_data["org_id"], # org_id
integration_data["slug"], # grafana instance slug
integration_data["id"], # grafana instance id
]
labels_values = list(map(str, labels_values))
response_time_values = integration_data["response_time"]
if not response_time_values:
continue
buckets, sum_value = self.get_buckets_with_sum(response_time_values)
buckets = sorted(list(buckets.items()), key=lambda x: float(x[0]))
alert_groups_response_time_seconds.add_metric(labels_values, buckets=buckets, sum_value=sum_value)
if start_calculation_task or not get_metrics_cache_timer_for_organization(organization_id):
org_to_recalculate: RecalculateOrgMetricsDict = {
"organization_id": organization_id,
"force": start_calculation_task,
}
metrics_to_recalculate.append(org_to_recalculate)
if metrics_to_recalculate:
start_calculate_and_cache_metrics.apply_async((metrics_to_recalculate,))
# check for orgs missing any of the metrics or needing a refresh
missing_org_ids = missing_org_ids_1 | missing_org_ids_2
cache_timer_for_org_keys = [get_metrics_cache_timer_key(org_id) for org_id in org_ids]
cache_timers_for_org = cache.get_many(cache_timer_for_org_keys)
recalculate_orgs: typing.List[RecalculateOrgMetricsDict] = []
for org_id in org_ids:
force_task = org_id in missing_org_ids
if force_task or not cache_timers_for_org.get(get_metrics_cache_timer_key(org_id)):
recalculate_orgs.append({"organization_id": org_id, "force": force_task})
if recalculate_orgs:
start_calculate_and_cache_metrics.apply_async((recalculate_orgs,))
yield alert_groups_total
yield alert_groups_response_time_seconds

View file

@ -46,7 +46,11 @@ def mock_cache_get_metrics_for_collector(monkeypatch):
}
return test_metrics.get(key)
def _mock_cache_get_many(keys, *args, **kwargs):
return {key: _mock_cache_get(key) for key in keys if _mock_cache_get(key)}
monkeypatch.setattr(cache, "get", _mock_cache_get)
monkeypatch.setattr(cache, "get_many", _mock_cache_get_many)
@pytest.fixture()
@ -54,7 +58,11 @@ def mock_get_metrics_cache(monkeypatch):
def _mock_cache_get(key, *args, **kwargs):
return {}
def _mock_cache_get_many(keys, *args, **kwargs):
return {}
monkeypatch.setattr(cache, "get", _mock_cache_get)
monkeypatch.setattr(cache, "get_many", _mock_cache_get_many)
@pytest.fixture