oncall-engine/engine/apps/integrations/tasks.py
Innokentii Konstantinov c733d8b9f2
Cleanup ScenarioStep (#1213)
# What this PR does
This PR cleanup ScenarioStep. It's needed to simplify moving Slack to
the messaging backends in future.

1. Introduce AlertGroupSlackService to move logic from ScenarioStep.
Also it allowed to get rid of importing ScenarioSteps in the code not
related to processing of slack callbacks.
2. Remove tags from ScenarioSteps, they are unused.
3. Remove ScenarioStep.dispatch method. It just was calling
ScenarioStep.process_scenario.
4. Remove "action" param from process_scenario, it was unused.
5. Remove creation of SlackActionRecord on handling SlackEvents. We are
not using it, but it generates INSERT query on most of the user-slack
interactions.
6. Remove "random_prefix_for_routing" from ScenarioStep, it was unused.
## Which issue(s) this PR fixes

## Checklist

- [ ] Tests updated
- [ ] Documentation added
- [ ] `CHANGELOG.md` updated

---------

Co-authored-by: Joey Orlando <joey.orlando@grafana.com>
2023-02-21 20:22:11 +01:00

167 lines
6.3 KiB
Python

import logging
import random
from celery import shared_task
from celery.utils.log import get_task_logger
from django.apps import apps
from django.conf import settings
from django.core.cache import cache
from apps.alerts.models.alert_group_counter import ConcurrentUpdateError
from apps.alerts.tasks import resolve_alert_group_by_source_if_needed
from apps.slack.slack_client import SlackClientWithErrorHandling
from apps.slack.slack_client.exceptions import SlackAPIException
from common.custom_celery_tasks import shared_dedicated_queue_retry_task
from common.custom_celery_tasks.create_alert_base_task import CreateAlertBaseTask
logger = get_task_logger(__name__)
logger.setLevel(logging.DEBUG)
@shared_task(
base=CreateAlertBaseTask,
autoretry_for=(Exception,),
retry_backoff=True,
max_retries=1 if settings.DEBUG else None,
)
def create_alertmanager_alerts(alert_receive_channel_pk, alert, is_demo=False, force_route_id=None):
AlertReceiveChannel = apps.get_model("alerts", "AlertReceiveChannel")
Alert = apps.get_model("alerts", "Alert")
alert_receive_channel = AlertReceiveChannel.objects_with_deleted.get(pk=alert_receive_channel_pk)
if (
alert_receive_channel.deleted_at is not None
or alert_receive_channel.integration == AlertReceiveChannel.INTEGRATION_MAINTENANCE
):
logger.info(f"AlertReceiveChannel alert ignored if deleted/maintenance")
return
try:
alert = Alert.create(
title=None,
message=None,
image_url=None,
link_to_upstream_details=None,
alert_receive_channel=alert_receive_channel,
integration_unique_data=None,
raw_request_data=alert,
enable_autoresolve=False,
is_demo=is_demo,
force_route_id=force_route_id,
)
except ConcurrentUpdateError:
# This error is raised when there are concurrent updates on AlertGroupCounter due to optimistic lock on it.
# The idea is to not block the worker with a database lock and retry the task in case of concurrent updates.
countdown = random.randint(1, 10)
create_alertmanager_alerts.apply_async((alert_receive_channel_pk, alert), countdown=countdown)
logger.warning(f"Retrying the task gracefully in {countdown} seconds due to ConcurrentUpdateError")
return
if alert_receive_channel.allow_source_based_resolving:
task = resolve_alert_group_by_source_if_needed.apply_async((alert.group.pk,), countdown=5)
alert.group.active_resolve_calculation_id = task.id
alert.group.save(update_fields=["active_resolve_calculation_id"])
logger.info(f"Created alert {alert.pk} for alert group {alert.group.pk}")
@shared_task(
base=CreateAlertBaseTask,
autoretry_for=(Exception,),
retry_backoff=True,
max_retries=1 if settings.DEBUG else None,
)
def create_alert(
title,
message,
image_url,
link_to_upstream_details,
alert_receive_channel_pk,
integration_unique_data,
raw_request_data,
is_demo=False,
force_route_id=None,
):
AlertReceiveChannel = apps.get_model("alerts", "AlertReceiveChannel")
Alert = apps.get_model("alerts", "Alert")
try:
alert_receive_channel = AlertReceiveChannel.objects.get(pk=alert_receive_channel_pk)
except AlertReceiveChannel.DoesNotExist:
return
if image_url is not None:
image_url = str(image_url)[:299]
try:
alert = Alert.create(
title=title,
message=message,
image_url=image_url,
link_to_upstream_details=link_to_upstream_details,
alert_receive_channel=alert_receive_channel,
integration_unique_data=integration_unique_data,
raw_request_data=raw_request_data,
force_route_id=force_route_id,
is_demo=is_demo,
)
logger.info(f"Created alert {alert.pk} for alert group {alert.group.pk}")
except ConcurrentUpdateError:
# This error is raised when there are concurrent updates on AlertGroupCounter due to optimistic lock on it.
# The idea is to not block the worker with a database lock and retry the task in case of concurrent updates.
countdown = random.randint(1, 10)
create_alert.apply_async(
(
title,
message,
image_url,
link_to_upstream_details,
alert_receive_channel_pk,
integration_unique_data,
raw_request_data,
),
countdown=countdown,
)
logger.warning(f"Retrying the task gracefully in {countdown} seconds due to ConcurrentUpdateError")
@shared_dedicated_queue_retry_task()
def start_notify_about_integration_ratelimit(team_id, text, **kwargs):
notify_about_integration_ratelimit_in_slack.apply_async(
args=(
team_id,
text,
),
kwargs=kwargs,
expires=60 * 5,
)
@shared_dedicated_queue_retry_task(
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else 5
)
def notify_about_integration_ratelimit_in_slack(organization_id, text, **kwargs):
# TODO: Review ratelimits
Organization = apps.get_model("user_management", "Organization")
try:
organization = Organization.objects.get(pk=organization_id)
except Organization.DoesNotExist:
logger.warning(f"Organization {organization_id} does not exist")
return
cache_key = f"notify_about_integration_ratelimit_in_slack_{organization.pk}"
if cache.get(cache_key):
logger.debug(f"Message was sent recently for organization {organization_id}")
return
else:
cache.set(cache_key, True, 60 * 15) # Set cache before sending message to make sure we don't ratelimit slack
slack_team_identity = organization.slack_team_identity
if slack_team_identity is not None:
try:
sc = SlackClientWithErrorHandling(slack_team_identity.bot_access_token)
sc.api_call(
"chat.postMessage", channel=organization.general_log_channel_id, text=text, team=slack_team_identity
)
except SlackAPIException as e:
logger.warning(f"Slack exception {e} while sending message for organization {organization_id}")