Refactor create/update contact points for Alerting integration (#872)

**What this PR does**:
- Keep grafana version on create/update contact points to avoid multiple
requests to alerting
- Add retry limit on create contact point async
- Fix bugs related on create contact point
- Update logs on create/update contact point, make them more clear
- Avoid unnecessary requests to Grafana Alerting
This commit is contained in:
Yulya Artyukhina 2023-01-25 09:42:42 +01:00 committed by GitHub
parent b61a98c5b1
commit de5d876d27
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 242 additions and 132 deletions

View file

@ -5,6 +5,16 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## Unreleased
### Changed
- Improve logging for creating contact point for Grafana Alerting integration
### Fixed
- Fix bugs related to creating contact point for Grafana Alerting integration
## v1.1.18 (2023-01-25)
### Added

View file

@ -1,6 +1,6 @@
import copy
import logging
from typing import Optional
from typing import Optional, Tuple
from django.apps import apps
from rest_framework import status
@ -19,6 +19,7 @@ class GrafanaAlertingSyncManager:
GRAFANA_CONTACT_POINT = "grafana"
ALERTING_DATASOURCE = "alertmanager"
IS_GRAFANA_VERSION_GRE_9 = None
def __init__(self, alert_receive_channel):
self.alert_receive_channel = alert_receive_channel
@ -26,6 +27,7 @@ class GrafanaAlertingSyncManager:
api_url=self.alert_receive_channel.organization.grafana_url,
api_token=self.alert_receive_channel.organization.api_token,
)
self.receiver_name = self.alert_receive_channel.emojized_verbal_name
@classmethod
def check_for_connection_errors(cls, organization) -> Optional[str]:
@ -35,7 +37,8 @@ class GrafanaAlertingSyncManager:
config, response_info = client.get_alerting_config(recipient)
if config is None:
logger.warning(
f"Failed to connect to contact point (GET): Is unified alerting enabled on instance? {response_info}"
f"GrafanaAlertingSyncManager: Failed to connect to contact point (GET): Is unified alerting enabled "
f"on instance? {response_info}"
)
return (
"Failed to create the integration with current Grafana Alerting. "
@ -45,8 +48,8 @@ class GrafanaAlertingSyncManager:
datasource_list, response_info = client.get_datasources()
if datasource_list is None:
logger.warning(
f"Failed to connect to alerting datasource (GET): Is unified alerting enabled "
f"on instance? {response_info}"
f"GrafanaAlertingSyncManager: Failed to connect to alerting datasource (GET): "
f"Is unified alerting enabled on instance? {response_info}"
)
return (
"Failed to create the integration with current Grafana Alerting. "
@ -62,6 +65,10 @@ class GrafanaAlertingSyncManager:
if is_grafana_datasource:
datasource_attr = GrafanaAlertingSyncManager.GRAFANA_CONTACT_POINT
config, response_info = client_method(datasource_attr, *args)
elif self.IS_GRAFANA_VERSION_GRE_9:
# Get config by datasource uid for Grafana version >= 9
datasource_attr = datasource_uid
config, response_info = client_method(datasource_attr, *args)
else:
# Get config by datasource id for Grafana version < 9
datasource_attr = datasource_id
@ -71,11 +78,18 @@ class GrafanaAlertingSyncManager:
# Get config by datasource uid for Grafana version >= 9
datasource_attr = datasource_uid
config, response_info = client_method(datasource_attr, *args)
if response_info["status_code"] in (
status.HTTP_200_OK,
status.HTTP_201_CREATED,
status.HTTP_202_ACCEPTED,
status.HTTP_204_NO_CONTENT,
):
self.IS_GRAFANA_VERSION_GRE_9 = True
if config is None:
logger.warning(
f"Got config None in alerting_config_with_respect_to_grafana_version with method "
f"{client_method.__name__} for is_grafana_datasource {is_grafana_datasource} for integration "
f"{self.alert_receive_channel.pk}; response: {response_info}"
f"GrafanaAlertingSyncManager: Got config None in alerting_config_with_respect_to_grafana_version "
f"with method {client_method.__name__} for is_grafana_datasource {is_grafana_datasource} "
f"for integration {self.alert_receive_channel.pk}; response: {response_info}"
)
return config, response_info
@ -92,8 +106,9 @@ class GrafanaAlertingSyncManager:
datasources, response_info = self.client.get_datasources()
if datasources is None:
logger.warning(
f"Failed to get datasource list for organization {self.alert_receive_channel.organization.stack_slug}, "
f"{response_info}"
f"GrafanaAlertingSyncManager: Failed to get datasource list for organization "
f"{self.alert_receive_channel.organization.stack_slug} "
f"for integration {self.alert_receive_channel.pk}, {response_info}"
)
return
@ -102,7 +117,7 @@ class GrafanaAlertingSyncManager:
# sync other datasource
for datasource in datasources:
if datasource["type"] == GrafanaAlertingSyncManager.ALERTING_DATASOURCE:
contact_point = self.create_contact_point(datasource)
contact_point, _ = self.create_contact_point(datasource)
if contact_point is None:
# Failed to create contact point duo to getting wrong alerting config. It is expected behaviour.
# Add datasource to list and retry to create contact point for it async
@ -110,8 +125,8 @@ class GrafanaAlertingSyncManager:
if datasources_to_create:
logger.warning(
f"Some contact points were not created for integration {self.alert_receive_channel.pk}, "
f"trying to create async"
f"GrafanaAlertingSyncManager: Some contact points were not created for integration "
f"{self.alert_receive_channel.pk}, trying to create async"
)
# create other contact points async
schedule_create_contact_points_for_datasource(self.alert_receive_channel.pk, datasources_to_create)
@ -119,18 +134,52 @@ class GrafanaAlertingSyncManager:
self.alert_receive_channel.is_finished_alerting_setup = True
self.alert_receive_channel.save(update_fields=["is_finished_alerting_setup"])
def create_contact_point(self, datasource=None) -> Optional["apps.alerts.models.GrafanaAlertingContactPoint"]:
def create_contact_point(
self, datasource=None
) -> Tuple[Optional["apps.alerts.models.GrafanaAlertingContactPoint"], dict]:
"""
Try to create a contact point for datasource.
Return None if contact point was not created otherwise return contact point object
Update datasource config in Grafana Alerting and create OnCall contact point
"""
if datasource is None:
datasource = {}
if datasource is None: # Means that we create contact point for default datasource
datasource = {
"type": GrafanaAlertingSyncManager.GRAFANA_CONTACT_POINT,
}
datasource_type = datasource.get("type") or GrafanaAlertingSyncManager.GRAFANA_CONTACT_POINT
is_grafana_datasource = datasource.get("id") is None
# get alerting config
config, response_info, config_to_update = self.get_alerting_config_for_datasource(
datasource, is_grafana_datasource
)
if not config_to_update: # failed to get alerting config for this datasource
return None, response_info
updated_config_from_alerting, response_info = self.add_contact_point_to_grafana_alerting(
datasource, is_grafana_datasource, config, config_to_update
)
if not updated_config_from_alerting:
return None, response_info
contact_point = self._create_contact_point_from_payload(updated_config_from_alerting, datasource)
logger.info(
f"Create contact point for {datasource_type} datasource, integration {self.alert_receive_channel.pk}"
f"GrafanaAlertingSyncManager: Contact point was created for datasource {datasource.get('type')} "
f"for integration {self.alert_receive_channel.pk}."
)
return contact_point, response_info
def get_alerting_config_for_datasource(
self, datasource, is_grafana_datasource
) -> Tuple[Optional[dict], dict, Optional[dict]]:
"""
Get datasource config from Grafana Alerting. If it doesn't exist, get default config for this datasource
"""
grafana_version = ">= 9" if self.IS_GRAFANA_VERSION_GRE_9 else "< 9 or unknown"
datasource_type = datasource.get("type")
logger.info(
f"GrafanaAlertingSyncManager: Get config for datasource {datasource_type} to create contact point "
f"for integration {self.alert_receive_channel.pk}, Grafana version is {grafana_version}"
)
config, response_info = self.alerting_config_with_respect_to_grafana_version(
is_grafana_datasource, datasource.get("id"), datasource.get("uid"), self.client.get_alerting_config
@ -138,14 +187,14 @@ class GrafanaAlertingSyncManager:
if config is None:
logger.warning(
f"Failed to create contact point (GET) for integration {self.alert_receive_channel.pk}: "
f"Is unified alerting enabled on instance? {response_info}"
f"GrafanaAlertingSyncManager: Got config None for datasource {datasource_type} "
f"for integration {self.alert_receive_channel.pk} (GET). "
f"Response: {response_info}. Is unified alerting enabled on instance? Trying to get default config. "
)
return
updated_config = copy.deepcopy(config)
config_to_update = copy.deepcopy(config)
if config["alertmanager_config"] is None:
if config is None or config.get("alertmanager_config") is None:
default_config, response_info = self.alerting_config_with_respect_to_grafana_version(
is_grafana_datasource,
datasource.get("id"),
@ -154,31 +203,50 @@ class GrafanaAlertingSyncManager:
)
if default_config is None:
logger.warning(
f"Failed to create contact point (alertmanager_config is None) for integration "
f"{self.alert_receive_channel.pk}, {response_info}"
f"GrafanaAlertingSyncManager: Got default config None (alertmanager_config is None) "
f"for datasource {datasource_type}. "
f"Failed to create contact point for integration {self.alert_receive_channel.pk}. "
f"Response: {response_info}"
)
return
updated_config = {"alertmanager_config": copy.deepcopy(default_config["config"])}
return config, response_info, None
receiver_name = self.alert_receive_channel.emojized_verbal_name
routes = updated_config["alertmanager_config"]["route"].get("routes", [])
new_route = GrafanaAlertingSyncManager._get_continue_route_config_for_datasource(
is_grafana_datasource,
receiver_name,
config_to_update = {"alertmanager_config": copy.deepcopy(default_config["config"])}
logger.debug(
f"GrafanaAlertingSyncManager: Successfully got config for datasource {datasource_type} "
f"for integration {self.alert_receive_channel.pk}."
)
# Append the new route to the beginning of the list
# It must have `continue=True` parameter otherwise it will intercept all the alerts
updated_config["alertmanager_config"]["route"]["routes"] = [new_route] + routes
return config, response_info, config_to_update
receivers = updated_config["alertmanager_config"]["receivers"]
new_receiver = GrafanaAlertingSyncManager._get_receiver_config_for_datasource(
def add_contact_point_to_grafana_alerting(
self, datasource, is_grafana_datasource, config, config_to_update
) -> Tuple[Optional[dict], dict]:
"""
Update datasource config with OnCall route and receiver and POST updated config to Grafana Alerting
"""
updated_config = self._add_contact_point_to_config(is_grafana_datasource, config_to_update)
updated_config_from_alerting, response_info = self.post_updated_config_and_get_the_result(
datasource,
is_grafana_datasource,
receiver_name,
self.alert_receive_channel.integration_url,
config,
updated_config,
)
updated_config["alertmanager_config"]["receivers"] = receivers + [new_receiver]
return updated_config_from_alerting, response_info
def post_updated_config_and_get_the_result(
self,
datasource,
is_grafana_datasource,
config,
updated_config,
) -> Tuple[Optional[dict], Optional[dict]]:
"""
POST updated datasource config to Grafana Alerting and GET the new alerting config
"""
datasource_type = datasource.get("type")
logger.info(
f"GrafanaAlertingSyncManager: Post updated config for datasource {datasource_type} to create contact point "
f"for integration {self.alert_receive_channel.pk}"
)
response, response_info = self.alerting_config_with_respect_to_grafana_version(
is_grafana_datasource,
datasource.get("id"),
@ -188,30 +256,55 @@ class GrafanaAlertingSyncManager:
)
if response is None:
logger.warning(
f"Failed to create contact point for integration {self.alert_receive_channel.pk} (POST): {response_info}"
f"GrafanaAlertingSyncManager: Failed to create contact point for integration "
f"{self.alert_receive_channel.pk} (POST). Response: {response_info}"
)
if response_info.get("status_code") == status.HTTP_400_BAD_REQUEST:
logger.warning(f"Config: {config}\nUpdated config: {updated_config}")
return
logger.warning(f"GrafanaAlertingSyncManager: Config: {config}, Updated config: {updated_config}")
return None, response_info
config, response_info = self.alerting_config_with_respect_to_grafana_version(
logger.info(
f"GrafanaAlertingSyncManager: Get updated config for datasource {datasource_type} to create contact point "
f"for integration {self.alert_receive_channel.pk}"
)
new_config, response_info = self.alerting_config_with_respect_to_grafana_version(
is_grafana_datasource, datasource.get("id"), datasource.get("uid"), self.client.get_alerting_config
)
contact_point = self._create_contact_point_from_payload(config, receiver_name, datasource)
contact_point_created_text = "created" if contact_point else "not created, creation will be retried"
logger.info(
f"Finished creating contact point for {datasource_type} datasource, "
f"integration {self.alert_receive_channel.pk}, contact point was {contact_point_created_text}"
)
return contact_point
if new_config:
logger.info(
f"GrafanaAlertingSyncManager: Alerting config for {datasource_type} datasource was successfully "
f"updated with contact point for integration {self.alert_receive_channel.pk}"
)
else:
logger.warning(
f"GrafanaAlertingSyncManager: Failed to get updated config to create contact point for integration "
f"{self.alert_receive_channel.pk} (GET). Response: {response_info}"
)
return new_config, response_info
@staticmethod
def _get_continue_route_config_for_datasource(is_grafana_datasource, receiver_name) -> dict:
def _add_contact_point_to_config(self, is_grafana_datasource, config_to_update) -> dict:
routes = config_to_update["alertmanager_config"]["route"].get("routes", [])
new_route = self._get_continue_route_config_for_datasource(
is_grafana_datasource,
)
# Append the new route to the beginning of the list
# It must have `continue=True` parameter otherwise it will intercept all the alerts
config_to_update["alertmanager_config"]["route"]["routes"] = [new_route] + routes
receivers = config_to_update["alertmanager_config"]["receivers"]
new_receiver = self._get_receiver_config_for_datasource(
is_grafana_datasource,
self.alert_receive_channel.integration_url,
)
config_to_update["alertmanager_config"]["receivers"] = receivers + [new_receiver]
return config_to_update
def _get_continue_route_config_for_datasource(self, is_grafana_datasource) -> dict:
"""Return route config, related on type of datasource"""
if is_grafana_datasource:
route = {
"receiver": receiver_name,
"receiver": self.receiver_name,
"continue": True,
}
else:
@ -219,21 +312,20 @@ class GrafanaAlertingSyncManager:
"continue": True,
"group_by": [],
"matchers": [],
"receiver": receiver_name,
"receiver": self.receiver_name,
"routes": [],
}
return route
@staticmethod
def _get_receiver_config_for_datasource(is_grafana_datasource, receiver_name, webhook_url) -> dict:
def _get_receiver_config_for_datasource(self, is_grafana_datasource, webhook_url) -> dict:
"""Return receiver config, related on type of datasource"""
if is_grafana_datasource:
receiver = {
"name": receiver_name,
"name": self.receiver_name,
"grafana_managed_receiver_configs": [
{
"name": receiver_name,
"name": self.receiver_name,
"type": "webhook",
"disableResolveMessage": False,
"settings": {
@ -246,7 +338,7 @@ class GrafanaAlertingSyncManager:
}
else:
receiver = {
"name": receiver_name,
"name": self.receiver_name,
"webhook_configs": [
{
"send_resolved": True,
@ -259,14 +351,13 @@ class GrafanaAlertingSyncManager:
def _create_contact_point_from_payload(
self,
payload,
receiver_name,
datasource,
) -> "apps.alerts.models.GrafanaAlertingContactPoint":
"""Get receiver data from payload and create contact point"""
is_grafana_datasource = datasource.get("id") is None
receiver_config = self._get_receiver_config(receiver_name, is_grafana_datasource, payload)
receiver_config = self._get_receiver_config(is_grafana_datasource, payload)
GrafanaAlertingContactPoint = apps.get_model("alerts", "GrafanaAlertingContactPoint")
contact_point = GrafanaAlertingContactPoint(
@ -280,14 +371,14 @@ class GrafanaAlertingSyncManager:
contact_point.save()
return contact_point
def _get_receiver_config(self, receiver_name, is_grafana_datasource, payload):
def _get_receiver_config(self, is_grafana_datasource, payload):
receiver_config = {}
receivers = payload["alertmanager_config"]["receivers"]
alerting_receiver = GrafanaAlertingSyncManager._get_receiver_by_name(receiver_name, receivers)
alerting_receiver = self._get_receiver_by_name(receivers)
if is_grafana_datasource: # means that datasource is Grafana
for config in alerting_receiver["grafana_managed_receiver_configs"]:
if config["name"] == receiver_name:
if config["name"] == self.receiver_name:
receiver_config = config
break
else: # other datasource
@ -297,15 +388,17 @@ class GrafanaAlertingSyncManager:
break
return receiver_config
@staticmethod
def _get_receiver_by_name(receiver_name, receivers):
def _get_receiver_by_name(self, receivers):
for alerting_receiver in receivers:
if alerting_receiver["name"] == receiver_name:
if alerting_receiver["name"] == self.receiver_name:
return alerting_receiver
def sync_each_contact_point(self) -> None:
"""Sync all channels contact points"""
logger.info(f"Starting to sync contact point for integration {self.alert_receive_channel.pk}")
logger.info(
f"GrafanaAlertingSyncManager: Starting to sync contact point for integration "
f"{self.alert_receive_channel.pk}"
)
contact_points = self.alert_receive_channel.contact_points.all()
for contact_point in contact_points:
self.sync_contact_point(contact_point)
@ -319,8 +412,8 @@ class GrafanaAlertingSyncManager:
)
is_grafana_datasource = datasource_type == GrafanaAlertingSyncManager.GRAFANA_CONTACT_POINT
logger.info(
f"Sync contact point for {datasource_type} (name: {contact_point.datasource_name}) datasource, integration "
f"{self.alert_receive_channel.pk}"
f"GrafanaAlertingSyncManager: Sync contact point for {datasource_type} "
f"(name: {contact_point.datasource_name}) datasource, for integration {self.alert_receive_channel.pk}"
)
config, response_info = self.alerting_config_with_respect_to_grafana_version(
@ -331,8 +424,9 @@ class GrafanaAlertingSyncManager:
)
if config is None:
logger.warning(
f"Failed to update contact point (GET) for integration {self.alert_receive_channel.pk}: Is unified "
f"alerting enabled on instance? {response_info}"
f"GrafanaAlertingSyncManager: Got config None for datasource {datasource_type}. "
f"Failed to update contact point for integration {self.alert_receive_channel.pk} (GET). "
f"Response: {response_info}. Is unified alerting enabled on instance? "
)
return
@ -342,17 +436,14 @@ class GrafanaAlertingSyncManager:
is_grafana_datasource,
receivers,
)
updated_config = copy.deepcopy(config)
# if integration exists, update name for contact point and related routes
if self.alert_receive_channel.deleted_at is None:
new_name = self.alert_receive_channel.emojized_verbal_name
updated_config = GrafanaAlertingSyncManager._update_contact_point_name_in_config(
updated_config = self._update_contact_point_name_in_config(
updated_config,
name_in_alerting,
new_name,
)
contact_point.name = new_name
contact_point.name = self.receiver_name
if not is_grafana_datasource:
datasource_name = self.get_datasource_name(contact_point)
contact_point.datasource_name = datasource_name
@ -372,45 +463,45 @@ class GrafanaAlertingSyncManager:
)
if response is None:
logger.warning(
f"Failed to update contact point for integration {self.alert_receive_channel.pk} "
f"(POST): {response_info}"
f"GrafanaAlertingSyncManager: Failed to update contact point for integration "
f"{self.alert_receive_channel.pk} (POST). Response: {response_info}"
)
if response_info.get("status_code") == status.HTTP_400_BAD_REQUEST:
logger.warning(f"GrafanaAlertingSyncManager: Config: {config}, Updated config: {updated_config}")
return
if self.alert_receive_channel.deleted_at:
contact_point.delete()
logger.info(
f"Finish to sync contact point for {datasource_type} (name: {contact_point.datasource_name}) datasource, "
f"integration {self.alert_receive_channel.pk}"
f"GrafanaAlertingSyncManager: Finish to sync contact point for {datasource_type} "
f"(name: {contact_point.datasource_name}) datasource, integration {self.alert_receive_channel.pk}"
)
@classmethod
def _update_contact_point_name_in_config(cls, config, name_in_alerting, new_name) -> dict:
def _update_contact_point_name_in_config(self, config, name_in_alerting) -> dict:
receivers = config["alertmanager_config"]["receivers"]
route = config["alertmanager_config"]["route"]
config["alertmanager_config"]["route"] = cls._recursive_rename_routes(route, name_in_alerting, new_name)
config["alertmanager_config"]["route"] = self._recursive_rename_routes(route, name_in_alerting)
for receiver in receivers:
if receiver["name"] == name_in_alerting:
receiver["name"] = new_name
receiver["name"] = self.receiver_name
receiver_configs = receiver.get("grafana_managed_receiver_configs", [])
for receiver_config in receiver_configs:
if receiver_config["name"] == name_in_alerting:
receiver_config["name"] = new_name
receiver_config["name"] = self.receiver_name
return config
@classmethod
def _recursive_rename_routes(cls, alerting_route, name_in_alerting, new_name) -> dict:
def _recursive_rename_routes(self, alerting_route, name_in_alerting) -> dict:
routes = alerting_route.get("routes", [])
for route in routes:
if route["receiver"] == name_in_alerting:
route["receiver"] = new_name
route["receiver"] = self.receiver_name
for idx, nested_route in enumerate(routes):
if nested_route.get("routes"):
alerting_route["routes"][idx] = cls._recursive_rename_routes(nested_route, name_in_alerting, new_name)
alerting_route["routes"][idx] = self._recursive_rename_routes(nested_route, name_in_alerting)
return alerting_route

View file

@ -5,7 +5,6 @@ from django.apps import apps
from django.core.cache import cache
from rest_framework import status
from apps.grafana_plugin.helpers import GrafanaAPIClient
from common.custom_celery_tasks import shared_dedicated_queue_retry_task
logger = get_task_logger(__name__)
@ -17,18 +16,22 @@ def get_cache_key_create_contact_points_for_datasource(alert_receive_channel_id)
return f"{CACHE_KEY_PREFIX}_{alert_receive_channel_id}"
def set_cache_key_create_contact_points_for_datasource(alert_receive_channel_id, task_id):
CACHE_LIFETIME = 600
cache_key = get_cache_key_create_contact_points_for_datasource(alert_receive_channel_id)
cache.set(cache_key, task_id, timeout=CACHE_LIFETIME)
@shared_dedicated_queue_retry_task
def schedule_create_contact_points_for_datasource(alert_receive_channel_id, datasource_list):
CACHE_LIFETIME = 600
START_TASK_DELAY = 3
task = create_contact_points_for_datasource.apply_async(
args=[alert_receive_channel_id, datasource_list], countdown=START_TASK_DELAY
)
cache_key = get_cache_key_create_contact_points_for_datasource(alert_receive_channel_id)
cache.set(cache_key, task.id, timeout=CACHE_LIFETIME)
set_cache_key_create_contact_points_for_datasource(alert_receive_channel_id, task.id)
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=10)
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=20)
def create_contact_points_for_datasource(alert_receive_channel_id, datasource_list):
"""
Try to create contact points for other datasource.
@ -45,57 +48,63 @@ def create_contact_points_for_datasource(alert_receive_channel_id, datasource_li
alert_receive_channel = AlertReceiveChannel.objects.filter(pk=alert_receive_channel_id).first()
if not alert_receive_channel:
logger.debug(
f"Cannot create contact point for integration {alert_receive_channel_id}: integration does not exist"
f"Create CP task: Cannot create contact point for integration {alert_receive_channel_id}: "
f"integration does not exist"
)
return
grafana_alerting_sync_manager = alert_receive_channel.grafana_alerting_sync_manager
client = GrafanaAPIClient(
api_url=alert_receive_channel.organization.grafana_url,
api_token=alert_receive_channel.organization.api_token,
logger.debug(
f"Create CP task: Create contact points for integration {alert_receive_channel_id}, "
f"retry counter: {create_contact_points_for_datasource.request.retries}, datasource list {len(datasource_list)}"
)
# list of datasource for which contact point creation was failed
datasources_to_create = []
for datasource in datasource_list:
contact_point = None
is_grafana_datasource = not (datasource.get("id") or datasource.get("uid"))
config, response_info = grafana_alerting_sync_manager.alerting_config_with_respect_to_grafana_version(
is_grafana_datasource, datasource.get("id"), datasource.get("uid"), client.get_alerting_config
datasource_type = datasource.get("type")
logger.debug(
f"Create CP task: Create contact point for datasource {datasource_type} "
f"for integration {alert_receive_channel_id}"
)
if config is None:
logger.debug(
f"Got config None for is_grafana_datasource {is_grafana_datasource} "
f"for integration {alert_receive_channel_id}; response: {response_info}"
)
if response_info.get("status_code") == status.HTTP_404_NOT_FOUND:
grafana_alerting_sync_manager.alerting_config_with_respect_to_grafana_version(
is_grafana_datasource,
datasource.get("id"),
datasource.get("uid"),
client.get_alertmanager_status_with_config,
)
contact_point = grafana_alerting_sync_manager.create_contact_point(datasource)
elif response_info.get("status_code") == status.HTTP_400_BAD_REQUEST:
contact_point, response_info = grafana_alerting_sync_manager.create_contact_point(datasource)
if contact_point is None:
if response_info.get("status_code") == status.HTTP_400_BAD_REQUEST:
logger.warning(
f"Failed to create contact point for integration {alert_receive_channel_id}, "
f"datasource info: {datasource}; response: {response_info}"
f"Create CP task: Failed to create contact point for integration {alert_receive_channel_id}, "
f"datasource info: {datasource}; response: {response_info}. "
f"Got 400 Bad Request, exclude from retry list."
)
continue
else:
contact_point = grafana_alerting_sync_manager.create_contact_point(datasource)
if contact_point is None:
logger.warning(
f"Failed to create contact point for integration {alert_receive_channel_id} due to getting wrong "
f"config, datasource info: {datasource}; response: {response_info}. Retrying"
f"Create CP task: Failed to create contact point for integration {alert_receive_channel_id}, "
f"datasource info: {datasource}; response: {response_info}. Retrying"
)
# Failed to create contact point due to getting wrong alerting config.
# Add datasource to list and retry to create contact point for it again
# Failed to create contact point. Add datasource to list and retry to create contact point for it again
datasources_to_create.append(datasource)
# if some contact points were not created, restart task for them
if datasources_to_create:
schedule_create_contact_points_for_datasource(alert_receive_channel_id, datasources_to_create)
if (
datasources_to_create
and create_contact_points_for_datasource.request.retries < create_contact_points_for_datasource.max_retries
):
logger.debug(
f"Create CP task: Retry to create contact points for integration {alert_receive_channel_id}, "
f"retry counter: {create_contact_points_for_datasource.request.retries}, "
f"datasource list {len(datasources_to_create)}"
)
# Save task id in cache and restart the task
set_cache_key_create_contact_points_for_datasource(alert_receive_channel_id, current_task_id)
create_contact_points_for_datasource.retry(args=(alert_receive_channel_id, datasources_to_create), countdown=3)
else:
alert_receive_channel.is_finished_alerting_setup = True
alert_receive_channel.save(update_fields=["is_finished_alerting_setup"])
logger.debug(
f"Create CP task: Alerting setup for integration {alert_receive_channel_id} is finished, "
f"retry counter: {create_contact_points_for_datasource.request.retries}, "
f"datasource list {len(datasource_list)}"
)
logger.debug(
f"Create CP task: Finished task to create contact points for integration {alert_receive_channel_id}, "
f"datasource list {len(datasource_list)}"
)