# What this PR does Limits organizations that a metrics exporter is responsible for. As more organizations are added it becomes more difficult for the exporter to deliver metrics within the scrape timeout. This would let us use the settings to divide up the organizations between multiple exporters. ## Which issue(s) this PR closes Related to [issue link here] <!-- *Note*: If you want the issue to be auto-closed once the PR is merged, change "Related to" to "Closes" in the line above. If you have more than one GitHub issue that this PR closes, be sure to preface each issue link with a [closing keyword](https://docs.github.com/en/get-started/writing-on-github/working-with-advanced-formatting/using-keywords-in-issues-and-pull-requests#linking-a-pull-request-to-an-issue). This ensures that the issue(s) are auto-closed once the PR has been merged. --> ## Checklist - [x] Unit, integration, and e2e (if applicable) tests updated - [x] Documentation added (or `pr:no public docs` PR label added if not required) - [x] Added the relevant release notes label (see labels prefixed w/ `release:`). These labels dictate how your PR will show up in the autogenerated release notes.
356 lines
15 KiB
Python
356 lines
15 KiB
Python
import datetime
|
|
import random
|
|
import typing
|
|
|
|
from django.conf import settings
|
|
from django.core.cache import cache
|
|
from django.utils import timezone
|
|
|
|
from apps.alerts.constants import AlertGroupState
|
|
from apps.metrics_exporter.constants import (
|
|
ALERT_GROUPS_RESPONSE_TIME,
|
|
ALERT_GROUPS_TOTAL,
|
|
METRICS_CACHE_LIFETIME,
|
|
METRICS_CACHE_TIMER,
|
|
METRICS_ORGANIZATIONS_IDS,
|
|
METRICS_ORGANIZATIONS_IDS_CACHE_TIMEOUT,
|
|
METRICS_RECALCULATION_CACHE_TIMEOUT,
|
|
METRICS_RECALCULATION_CACHE_TIMEOUT_DISPERSE,
|
|
METRICS_RESPONSE_TIME_CALCULATION_PERIOD,
|
|
NO_SERVICE_VALUE,
|
|
USER_WAS_NOTIFIED_OF_ALERT_GROUPS,
|
|
AlertGroupsResponseTimeMetricsDict,
|
|
AlertGroupStateDict,
|
|
AlertGroupsTotalMetricsDict,
|
|
RecalculateMetricsTimer,
|
|
UserWasNotifiedOfAlertGroupsMetricsDict,
|
|
)
|
|
from common.cache import ensure_cache_key_allocates_to_the_same_hash_slot
|
|
|
|
if typing.TYPE_CHECKING:
|
|
from apps.alerts.models import AlertReceiveChannel
|
|
from apps.user_management.models import Organization
|
|
|
|
|
|
def get_organization_ids_from_db():
|
|
from apps.alerts.models import AlertReceiveChannel
|
|
|
|
# get only not deleted organizations that have integrations
|
|
organizations_ids = (
|
|
AlertReceiveChannel.objects.filter(organization__deleted_at__isnull=True)
|
|
.values_list("organization_id", flat=True)
|
|
.distinct()
|
|
)
|
|
organizations_ids = list(organizations_ids)
|
|
return organizations_ids
|
|
|
|
|
|
def get_organization_ids():
|
|
"""Try to get organizations ids from cache, otherwise get from db and save values in cache"""
|
|
organizations_ids = cache.get(METRICS_ORGANIZATIONS_IDS, [])
|
|
if not organizations_ids:
|
|
organizations_ids = get_organization_ids_from_db()
|
|
cache.set(organizations_ids, METRICS_ORGANIZATIONS_IDS, METRICS_ORGANIZATIONS_IDS_CACHE_TIMEOUT)
|
|
|
|
group_id = settings.METRICS_EXPORTER_ORGANIZATION_GROUP_ID
|
|
group_count = settings.METRICS_EXPORTER_TOTAL_ORGANIZATION_GROUPS
|
|
return [i for i in organizations_ids if i % group_count == group_id]
|
|
|
|
|
|
def is_allowed_to_start_metrics_calculation(organization_id, force=False) -> bool:
|
|
"""Check if metrics_cache_timer doesn't exist or if recalculation was started by force."""
|
|
recalculate_timeout = get_metrics_recalculation_timeout()
|
|
metrics_cache_timer_key = get_metrics_cache_timer_key(organization_id)
|
|
metrics_cache_timer: typing.Optional[RecalculateMetricsTimer]
|
|
metrics_cache_timer = cache.get(metrics_cache_timer_key)
|
|
|
|
if metrics_cache_timer:
|
|
if not force or metrics_cache_timer.get("forced_started", False):
|
|
return False
|
|
else:
|
|
metrics_cache_timer["forced_started"] = True
|
|
else:
|
|
metrics_cache_timer = {
|
|
"recalculate_timeout": recalculate_timeout,
|
|
"forced_started": force,
|
|
}
|
|
|
|
metrics_cache_timer["recalculate_timeout"] = recalculate_timeout
|
|
cache.set(metrics_cache_timer_key, metrics_cache_timer, timeout=recalculate_timeout)
|
|
return True
|
|
|
|
|
|
def get_response_time_period() -> datetime.datetime:
|
|
"""Returns period for response time calculation"""
|
|
return timezone.now() - METRICS_RESPONSE_TIME_CALCULATION_PERIOD
|
|
|
|
|
|
def get_metrics_recalculation_timeout() -> int:
|
|
"""
|
|
Returns timeout when metrics should be recalculated.
|
|
Add some dispersion to avoid starting recalculation tasks for all organizations at the same time.
|
|
"""
|
|
return METRICS_RECALCULATION_CACHE_TIMEOUT + random.randint(*METRICS_RECALCULATION_CACHE_TIMEOUT_DISPERSE)
|
|
|
|
|
|
def get_metrics_cache_timeout(organization_id):
|
|
metrics_cache_timer_key = get_metrics_cache_timer_key(organization_id)
|
|
metrics_cache_timer = cache.get(metrics_cache_timer_key)
|
|
if metrics_cache_timer:
|
|
TWO_HOURS = 7200
|
|
metrics_cache_timeout = int(metrics_cache_timer.get("recalculate_timeout")) + TWO_HOURS
|
|
else:
|
|
metrics_cache_timeout = METRICS_CACHE_LIFETIME
|
|
return metrics_cache_timeout
|
|
|
|
|
|
def get_metrics_cache_timer_key(organization_id) -> str:
|
|
return ensure_cache_key_allocates_to_the_same_hash_slot(
|
|
f"{METRICS_CACHE_TIMER}_{organization_id}", METRICS_CACHE_TIMER
|
|
)
|
|
|
|
|
|
def get_metric_alert_groups_total_key(organization_id) -> str:
|
|
return ensure_cache_key_allocates_to_the_same_hash_slot(
|
|
f"{ALERT_GROUPS_TOTAL}_{organization_id}", ALERT_GROUPS_TOTAL
|
|
)
|
|
|
|
|
|
def get_metric_alert_groups_response_time_key(organization_id) -> str:
|
|
return ensure_cache_key_allocates_to_the_same_hash_slot(
|
|
f"{ALERT_GROUPS_RESPONSE_TIME}_{organization_id}", ALERT_GROUPS_RESPONSE_TIME
|
|
)
|
|
|
|
|
|
def get_metric_user_was_notified_of_alert_groups_key(organization_id) -> str:
|
|
return ensure_cache_key_allocates_to_the_same_hash_slot(
|
|
f"{USER_WAS_NOTIFIED_OF_ALERT_GROUPS}_{organization_id}", USER_WAS_NOTIFIED_OF_ALERT_GROUPS
|
|
)
|
|
|
|
|
|
def get_metric_calculation_started_key(metric_name) -> str:
|
|
return f"calculation_started_for_{metric_name}"
|
|
|
|
|
|
def get_default_states_dict() -> AlertGroupStateDict:
|
|
return {
|
|
AlertGroupState.FIRING.value: 0,
|
|
AlertGroupState.ACKNOWLEDGED.value: 0,
|
|
AlertGroupState.RESOLVED.value: 0,
|
|
AlertGroupState.SILENCED.value: 0,
|
|
}
|
|
|
|
|
|
def metrics_update_integration_cache(integration: "AlertReceiveChannel") -> None:
|
|
"""Update integration data in metrics cache"""
|
|
metrics_cache_timeout = get_metrics_cache_timeout(integration.organization_id)
|
|
metric_alert_groups_total_key = get_metric_alert_groups_total_key(integration.organization_id)
|
|
metric_alert_groups_response_time_key = get_metric_alert_groups_response_time_key(integration.organization_id)
|
|
|
|
for metric_key in [metric_alert_groups_total_key, metric_alert_groups_response_time_key]:
|
|
metric_cache = cache.get(metric_key, {})
|
|
integration_metric_cache = metric_cache.get(integration.id)
|
|
if integration_metric_cache:
|
|
cache_updated = False
|
|
if integration_metric_cache["team_id"] != integration.team_id_or_no_team:
|
|
integration_metric_cache["team_id"] = integration.team_id_or_no_team
|
|
integration_metric_cache["team_name"] = integration.team_name
|
|
cache_updated = True
|
|
if integration_metric_cache["integration_name"] != integration.emojized_verbal_name:
|
|
integration_metric_cache["integration_name"] = integration.emojized_verbal_name
|
|
cache_updated = True
|
|
if cache_updated:
|
|
cache.set(metric_key, metric_cache, timeout=metrics_cache_timeout)
|
|
|
|
|
|
def metrics_remove_deleted_integration_from_cache(integration: "AlertReceiveChannel"):
|
|
"""Remove data related to deleted integration from metrics cache"""
|
|
metrics_cache_timeout = get_metrics_cache_timeout(integration.organization_id)
|
|
metric_alert_groups_total_key = get_metric_alert_groups_total_key(integration.organization_id)
|
|
metric_alert_groups_response_time_key = get_metric_alert_groups_response_time_key(integration.organization_id)
|
|
|
|
for metric_key in [metric_alert_groups_total_key, metric_alert_groups_response_time_key]:
|
|
metric_cache = cache.get(metric_key)
|
|
if metric_cache:
|
|
metric_cache.pop(integration.id, None)
|
|
cache.set(metric_key, metric_cache, timeout=metrics_cache_timeout)
|
|
|
|
|
|
def metrics_add_integrations_to_cache(integrations: list["AlertReceiveChannel"], organization: "Organization"):
|
|
"""
|
|
Bulk add new integration data to metrics cache. This method is safe to call multiple times on the same integrations.
|
|
"""
|
|
metrics_cache_timeout = get_metrics_cache_timeout(organization.id)
|
|
metric_alert_groups_total_key = get_metric_alert_groups_total_key(organization.id)
|
|
|
|
instance_slug = organization.stack_slug
|
|
instance_id = organization.stack_id
|
|
grafana_org_id = organization.org_id
|
|
metric_alert_groups_total: typing.Dict[int, AlertGroupsTotalMetricsDict] = cache.get(
|
|
metric_alert_groups_total_key, {}
|
|
)
|
|
|
|
for integration in integrations:
|
|
metric_alert_groups_total.setdefault(
|
|
integration.id,
|
|
{
|
|
"integration_name": integration.emojized_verbal_name,
|
|
"team_name": integration.team_name,
|
|
"team_id": integration.team_id_or_no_team,
|
|
"org_id": grafana_org_id,
|
|
"slug": instance_slug,
|
|
"id": instance_id,
|
|
"services": {NO_SERVICE_VALUE: get_default_states_dict()},
|
|
},
|
|
)
|
|
cache.set(metric_alert_groups_total_key, metric_alert_groups_total, timeout=metrics_cache_timeout)
|
|
|
|
metric_alert_groups_response_time_key = get_metric_alert_groups_response_time_key(organization.id)
|
|
metric_alert_groups_response_time: typing.Dict[int, AlertGroupsResponseTimeMetricsDict] = cache.get(
|
|
metric_alert_groups_response_time_key, {}
|
|
)
|
|
|
|
for integration in integrations:
|
|
metric_alert_groups_response_time.setdefault(
|
|
integration.id,
|
|
{
|
|
"integration_name": integration.emojized_verbal_name,
|
|
"team_name": integration.team_name,
|
|
"team_id": integration.team_id_or_no_team,
|
|
"org_id": grafana_org_id,
|
|
"slug": instance_slug,
|
|
"id": instance_id,
|
|
"services": {NO_SERVICE_VALUE: []},
|
|
},
|
|
)
|
|
cache.set(metric_alert_groups_response_time_key, metric_alert_groups_response_time, timeout=metrics_cache_timeout)
|
|
|
|
|
|
def metrics_bulk_update_team_label_cache(teams_updated_data: dict, organization_id: int):
|
|
"""Update team related data in metrics cache for each team in `teams_updated_data`"""
|
|
if not teams_updated_data:
|
|
return
|
|
metrics_cache_timeout = get_metrics_cache_timeout(organization_id)
|
|
metric_alert_groups_total_key = get_metric_alert_groups_total_key(organization_id)
|
|
metric_alert_groups_response_time_key = get_metric_alert_groups_response_time_key(organization_id)
|
|
|
|
metric_alert_groups_total = cache.get(metric_alert_groups_total_key, {})
|
|
metric_alert_groups_response_time = cache.get(metric_alert_groups_response_time_key, {})
|
|
for team_id, team_data in teams_updated_data.items():
|
|
for integration_id in metric_alert_groups_total:
|
|
if metric_alert_groups_total[integration_id]["team_id"] == team_id:
|
|
integration_response_time_metrics = metric_alert_groups_response_time.get(integration_id)
|
|
if team_data["deleted"]:
|
|
metric_alert_groups_total[integration_id]["team_id"] = "no_team"
|
|
metric_alert_groups_total[integration_id]["team_name"] = "No team"
|
|
if integration_response_time_metrics:
|
|
integration_response_time_metrics["team_id"] = "no_team"
|
|
integration_response_time_metrics["team_name"] = "No team"
|
|
else:
|
|
metric_alert_groups_total[integration_id]["team_name"] = team_data["team_name"]
|
|
if integration_response_time_metrics:
|
|
integration_response_time_metrics["team_name"] = team_data["team_name"]
|
|
|
|
cache.set(metric_alert_groups_total_key, metric_alert_groups_total, timeout=metrics_cache_timeout)
|
|
cache.set(metric_alert_groups_response_time_key, metric_alert_groups_response_time, timeout=metrics_cache_timeout)
|
|
|
|
|
|
def metrics_update_alert_groups_state_cache(states_diff: dict, organization_id: int):
|
|
"""
|
|
Update alert groups state metric cache for each integration in states_diff dict.
|
|
states_diff example:
|
|
{
|
|
<integration_id>: {
|
|
<service name>: {
|
|
"previous_states": {
|
|
firing: 1,
|
|
acknowledged: 0,
|
|
resolved: 0,
|
|
silenced: 0,
|
|
},
|
|
"new_states": {
|
|
firing: 0,
|
|
acknowledged: 1,
|
|
resolved: 0,
|
|
silenced: 0,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
if not states_diff:
|
|
return
|
|
|
|
metrics_cache_timeout = get_metrics_cache_timeout(organization_id)
|
|
metric_alert_groups_total_key = get_metric_alert_groups_total_key(organization_id)
|
|
metric_alert_groups_total = cache.get(metric_alert_groups_total_key, {})
|
|
if not metric_alert_groups_total:
|
|
return
|
|
for integration_id, service_data in states_diff.items():
|
|
integration_alert_groups = metric_alert_groups_total.get(int(integration_id))
|
|
if not integration_alert_groups:
|
|
continue
|
|
for service_name, service_state_diff in service_data.items():
|
|
states_to_update = integration_alert_groups["services"].setdefault(service_name, get_default_states_dict())
|
|
for previous_state, counter in service_state_diff["previous_states"].items():
|
|
if states_to_update[previous_state] - counter > 0:
|
|
states_to_update[previous_state] -= counter
|
|
else:
|
|
states_to_update[previous_state] = 0
|
|
for new_state, counter in service_state_diff["new_states"].items():
|
|
states_to_update[new_state] += counter
|
|
|
|
cache.set(metric_alert_groups_total_key, metric_alert_groups_total, timeout=metrics_cache_timeout)
|
|
|
|
|
|
def metrics_update_alert_groups_response_time_cache(integrations_response_time: dict, organization_id: int):
|
|
"""
|
|
Update alert groups response time metric cache for each integration in `integrations_response_time` dict.
|
|
integrations_response_time dict example:
|
|
{
|
|
<integration_id>: {
|
|
<service name>: [10],
|
|
}
|
|
}
|
|
"""
|
|
if not integrations_response_time:
|
|
return
|
|
|
|
metrics_cache_timeout = get_metrics_cache_timeout(organization_id)
|
|
metric_alert_groups_response_time_key = get_metric_alert_groups_response_time_key(organization_id)
|
|
metric_alert_groups_response_time = cache.get(metric_alert_groups_response_time_key, {})
|
|
if not metric_alert_groups_response_time:
|
|
return
|
|
for integration_id, service_data in integrations_response_time.items():
|
|
integration_response_time_metrics = metric_alert_groups_response_time.get(int(integration_id))
|
|
if not integration_response_time_metrics:
|
|
continue
|
|
for service_name, response_time_values in service_data.items():
|
|
integration_response_time_metrics["services"].setdefault(service_name, [])
|
|
integration_response_time_metrics["services"][service_name].extend(response_time_values)
|
|
cache.set(metric_alert_groups_response_time_key, metric_alert_groups_response_time, timeout=metrics_cache_timeout)
|
|
|
|
|
|
def metrics_update_user_cache(user, counter=1):
|
|
"""
|
|
Increase "user_was_notified_of_alert_groups" metric cache by counter.
|
|
Counter shows how many alert groups user was notified of.
|
|
"""
|
|
metrics_cache_timeout = get_metrics_cache_timeout(user.organization_id)
|
|
metric_user_was_notified_key = get_metric_user_was_notified_of_alert_groups_key(user.organization_id)
|
|
metric_user_was_notified: typing.Dict[int, UserWasNotifiedOfAlertGroupsMetricsDict] = cache.get(
|
|
metric_user_was_notified_key, {}
|
|
)
|
|
|
|
metric_user_was_notified.setdefault(
|
|
user.id,
|
|
{
|
|
"user_username": user.username,
|
|
"org_id": user.organization.org_id,
|
|
"slug": user.organization.stack_slug,
|
|
"id": user.organization.stack_id,
|
|
"counter": 0,
|
|
},
|
|
)["counter"] += counter
|
|
|
|
cache.set(metric_user_was_notified_key, metric_user_was_notified, timeout=metrics_cache_timeout)
|