2023-08-10 10:25:00 +08:00
|
|
|
import datetime
|
2023-11-29 09:01:30 -03:00
|
|
|
from functools import partial
|
2022-06-03 08:09:47 -06:00
|
|
|
|
|
|
|
|
from celery.utils.log import get_task_logger
|
2023-08-10 10:25:00 +08:00
|
|
|
from django.conf import settings
|
2022-06-03 08:09:47 -06:00
|
|
|
from django.db import transaction
|
2023-08-10 10:25:00 +08:00
|
|
|
from django.db.models import DateTimeField, DurationField, ExpressionWrapper, F
|
|
|
|
|
from django.db.models.functions import Cast
|
2022-06-03 08:09:47 -06:00
|
|
|
from django.utils import timezone
|
|
|
|
|
|
2023-08-10 10:25:00 +08:00
|
|
|
from apps.heartbeat.models import IntegrationHeartBeat
|
|
|
|
|
from apps.integrations.tasks import create_alert
|
2022-06-03 08:09:47 -06:00
|
|
|
from common.custom_celery_tasks import shared_dedicated_queue_retry_task
|
2023-08-10 10:25:00 +08:00
|
|
|
from settings.base import DatabaseTypes
|
2022-06-03 08:09:47 -06:00
|
|
|
|
|
|
|
|
logger = get_task_logger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@shared_dedicated_queue_retry_task()
|
2023-08-10 10:25:00 +08:00
|
|
|
def check_heartbeats() -> str:
|
|
|
|
|
"""
|
|
|
|
|
Periodic task to check heartbeats status change and create alerts (or auto-resolve alerts) if needed
|
|
|
|
|
"""
|
|
|
|
|
# Heartbeat is considered enabled if it
|
|
|
|
|
# * has timeout_seconds set to non-zero (non-default) value,
|
|
|
|
|
# * received at least one checkup (last_heartbeat_time set to non-null value)\
|
2022-06-03 08:09:47 -06:00
|
|
|
|
2023-08-10 10:25:00 +08:00
|
|
|
def _get_timeout_expression() -> ExpressionWrapper:
|
|
|
|
|
if settings.DATABASES["default"]["ENGINE"] == f"django.db.backends.{DatabaseTypes.POSTGRESQL}":
|
|
|
|
|
# DurationField: When used on PostgreSQL, the data type used is an interval
|
|
|
|
|
# https://docs.djangoproject.com/en/3.2/ref/models/fields/#durationfield
|
|
|
|
|
return ExpressionWrapper(datetime.timedelta(seconds=1) * F("timeout_seconds"), output_field=DurationField())
|
|
|
|
|
else:
|
|
|
|
|
# DurationField: ...Otherwise a bigint of microseconds is used...
|
|
|
|
|
# microseconds = seconds * 10**6
|
|
|
|
|
# https://docs.djangoproject.com/en/3.2/ref/models/fields/#durationfield
|
|
|
|
|
return ExpressionWrapper(F("timeout_seconds") * 10**6, output_field=DurationField())
|
2023-07-25 10:43:23 +01:00
|
|
|
|
2023-08-10 10:25:00 +08:00
|
|
|
enabled_heartbeats = (
|
|
|
|
|
IntegrationHeartBeat.objects.filter(last_heartbeat_time__isnull=False)
|
|
|
|
|
.exclude(timeout_seconds=0)
|
|
|
|
|
.annotate(period_start=(Cast(timezone.now() - _get_timeout_expression(), DateTimeField())))
|
|
|
|
|
)
|
2022-06-03 08:09:47 -06:00
|
|
|
with transaction.atomic():
|
2023-08-10 10:25:00 +08:00
|
|
|
# Heartbeat is considered expired if it
|
|
|
|
|
# * is enabled,
|
|
|
|
|
# * is not already expired,
|
|
|
|
|
# * last check in was before the timeout period start
|
2023-11-03 18:05:37 +01:00
|
|
|
expired_heartbeats = (
|
|
|
|
|
enabled_heartbeats.select_for_update()
|
|
|
|
|
.filter(
|
|
|
|
|
last_heartbeat_time__lte=F("period_start"),
|
|
|
|
|
previous_alerted_state_was_life=True,
|
|
|
|
|
alert_receive_channel__organization__deleted_at__isnull=True,
|
|
|
|
|
)
|
|
|
|
|
.select_related("alert_receive_channel")
|
2022-06-03 08:09:47 -06:00
|
|
|
)
|
2023-08-10 10:25:00 +08:00
|
|
|
# Schedule alert creation for each expired heartbeat after transaction commit
|
2023-12-06 09:20:03 -03:00
|
|
|
timestamp = timezone.now().isoformat()
|
2023-08-10 10:25:00 +08:00
|
|
|
for heartbeat in expired_heartbeats:
|
|
|
|
|
transaction.on_commit(
|
2023-11-29 09:01:30 -03:00
|
|
|
partial(
|
|
|
|
|
create_alert.apply_async,
|
2023-08-10 10:25:00 +08:00
|
|
|
kwargs={
|
|
|
|
|
"title": heartbeat.alert_receive_channel.heartbeat_expired_title,
|
|
|
|
|
"message": heartbeat.alert_receive_channel.heartbeat_expired_message,
|
|
|
|
|
"image_url": None,
|
|
|
|
|
"link_to_upstream_details": None,
|
|
|
|
|
"alert_receive_channel_pk": heartbeat.alert_receive_channel.pk,
|
|
|
|
|
"integration_unique_data": {},
|
|
|
|
|
"raw_request_data": heartbeat.alert_receive_channel.heartbeat_expired_payload,
|
2023-12-06 09:20:03 -03:00
|
|
|
"received_at": timestamp,
|
2023-08-10 10:25:00 +08:00
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
# Update previous_alerted_state_was_life to False
|
|
|
|
|
expired_count = expired_heartbeats.update(previous_alerted_state_was_life=False)
|
|
|
|
|
with transaction.atomic():
|
|
|
|
|
# Heartbeat is considered restored if it
|
|
|
|
|
# * is enabled,
|
|
|
|
|
# * last check in was after the timeout period start,
|
|
|
|
|
# * was is alerted state (previous_alerted_state_was_life is False), i.e. was expired
|
|
|
|
|
restored_heartbeats = enabled_heartbeats.select_for_update().filter(
|
|
|
|
|
last_heartbeat_time__gte=F("period_start"), previous_alerted_state_was_life=False
|
2022-06-03 08:09:47 -06:00
|
|
|
)
|
2023-08-10 10:25:00 +08:00
|
|
|
# Schedule auto-resolve alert creation for each expired heartbeat after transaction commit
|
2023-12-06 09:20:03 -03:00
|
|
|
timestamp = timezone.now().isoformat()
|
2023-08-10 10:25:00 +08:00
|
|
|
for heartbeat in restored_heartbeats:
|
|
|
|
|
transaction.on_commit(
|
2023-11-29 09:01:30 -03:00
|
|
|
partial(
|
|
|
|
|
create_alert.apply_async,
|
2023-08-10 10:25:00 +08:00
|
|
|
kwargs={
|
|
|
|
|
"title": heartbeat.alert_receive_channel.heartbeat_restored_title,
|
|
|
|
|
"message": heartbeat.alert_receive_channel.heartbeat_restored_message,
|
|
|
|
|
"image_url": None,
|
|
|
|
|
"link_to_upstream_details": None,
|
|
|
|
|
"alert_receive_channel_pk": heartbeat.alert_receive_channel.pk,
|
|
|
|
|
"integration_unique_data": {},
|
|
|
|
|
"raw_request_data": heartbeat.alert_receive_channel.heartbeat_restored_payload,
|
2023-12-06 09:20:03 -03:00
|
|
|
"received_at": timestamp,
|
2023-08-10 10:25:00 +08:00
|
|
|
},
|
|
|
|
|
)
|
2022-06-03 08:09:47 -06:00
|
|
|
)
|
2023-08-10 10:25:00 +08:00
|
|
|
restored_count = restored_heartbeats.update(previous_alerted_state_was_life=True)
|
|
|
|
|
return f"Found {expired_count} expired and {restored_count} restored heartbeats"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@shared_dedicated_queue_retry_task()
|
|
|
|
|
def process_heartbeat_task(alert_receive_channel_pk):
|
|
|
|
|
IntegrationHeartBeat.objects.filter(
|
|
|
|
|
alert_receive_channel__pk=alert_receive_channel_pk,
|
|
|
|
|
).update(last_heartbeat_time=timezone.now())
|