oncall-engine/engine/apps/heartbeat/tasks.py

113 lines
5.3 KiB
Python
Raw Permalink Normal View History

import datetime
from functools import partial
from celery.utils.log import get_task_logger
from django.conf import settings
from django.db import transaction
from django.db.models import DateTimeField, DurationField, ExpressionWrapper, F
from django.db.models.functions import Cast
from django.utils import timezone
from apps.heartbeat.models import IntegrationHeartBeat
from apps.integrations.tasks import create_alert
from common.custom_celery_tasks import shared_dedicated_queue_retry_task
from settings.base import DatabaseTypes
logger = get_task_logger(__name__)
@shared_dedicated_queue_retry_task()
def check_heartbeats() -> str:
"""
Periodic task to check heartbeats status change and create alerts (or auto-resolve alerts) if needed
"""
# Heartbeat is considered enabled if it
# * has timeout_seconds set to non-zero (non-default) value,
# * received at least one checkup (last_heartbeat_time set to non-null value)\
def _get_timeout_expression() -> ExpressionWrapper:
if settings.DATABASES["default"]["ENGINE"] == f"django.db.backends.{DatabaseTypes.POSTGRESQL}":
# DurationField: When used on PostgreSQL, the data type used is an interval
# https://docs.djangoproject.com/en/3.2/ref/models/fields/#durationfield
return ExpressionWrapper(datetime.timedelta(seconds=1) * F("timeout_seconds"), output_field=DurationField())
else:
# DurationField: ...Otherwise a bigint of microseconds is used...
# microseconds = seconds * 10**6
# https://docs.djangoproject.com/en/3.2/ref/models/fields/#durationfield
return ExpressionWrapper(F("timeout_seconds") * 10**6, output_field=DurationField())
`apps.get_model` -> `import` (#2619) # What this PR does Remove [`apps.get_model`](https://docs.djangoproject.com/en/3.2/ref/applications/#django.apps.apps.get_model) invocations and use inline `import` statements in places where models are imported within functions/methods to avoid circular imports. I believe `import` statements are more appropriate for most use cases as they allow for better static code analysis & formatting, and solve the issue of circular imports without being unnecessarily dynamic as `apps.get_model`. With `import` statements, it's possible to: - Jump to model definitions in most IDEs - Automatically sort inline imports with `isort` - Find import errors faster/easier (most IDEs highlight broken imports) - Have more consistency across regular & inline imports when importing models This PR also adds a flake8 rule to ban imports of `django.apps.apps`, so it's harder to use `apps.get_model` by mistake (it's possible to ignore this rule by using `# noqa: I251`). The rule is not enforced on directories with migration files, because `apps.get_model` is often used to get a historical state of a model, which is useful when writing migrations ([see this SO answer for more details](https://stackoverflow.com/a/37769213)). So `apps.get_model` is considered OK in migrations (even necessary in some cases). ## Checklist - [x] Unit, integration, and e2e (if applicable) tests updated - [x] Documentation added (or `pr:no public docs` PR label added if not required) - [x] `CHANGELOG.md` updated (or `pr:no changelog` PR label added if not required)
2023-07-25 10:43:23 +01:00
enabled_heartbeats = (
IntegrationHeartBeat.objects.filter(last_heartbeat_time__isnull=False)
.exclude(timeout_seconds=0)
.annotate(period_start=(Cast(timezone.now() - _get_timeout_expression(), DateTimeField())))
)
with transaction.atomic():
# Heartbeat is considered expired if it
# * is enabled,
# * is not already expired,
# * last check in was before the timeout period start
expired_heartbeats = (
enabled_heartbeats.select_for_update()
.filter(
last_heartbeat_time__lte=F("period_start"),
previous_alerted_state_was_life=True,
alert_receive_channel__organization__deleted_at__isnull=True,
)
.select_related("alert_receive_channel")
)
# Schedule alert creation for each expired heartbeat after transaction commit
timestamp = timezone.now().isoformat()
for heartbeat in expired_heartbeats:
transaction.on_commit(
partial(
create_alert.apply_async,
kwargs={
"title": heartbeat.alert_receive_channel.heartbeat_expired_title,
"message": heartbeat.alert_receive_channel.heartbeat_expired_message,
"image_url": None,
"link_to_upstream_details": None,
"alert_receive_channel_pk": heartbeat.alert_receive_channel.pk,
"integration_unique_data": {},
"raw_request_data": heartbeat.alert_receive_channel.heartbeat_expired_payload,
"received_at": timestamp,
},
)
)
# Update previous_alerted_state_was_life to False
expired_count = expired_heartbeats.update(previous_alerted_state_was_life=False)
with transaction.atomic():
# Heartbeat is considered restored if it
# * is enabled,
# * last check in was after the timeout period start,
# * was is alerted state (previous_alerted_state_was_life is False), i.e. was expired
restored_heartbeats = enabled_heartbeats.select_for_update().filter(
last_heartbeat_time__gte=F("period_start"), previous_alerted_state_was_life=False
)
# Schedule auto-resolve alert creation for each expired heartbeat after transaction commit
timestamp = timezone.now().isoformat()
for heartbeat in restored_heartbeats:
transaction.on_commit(
partial(
create_alert.apply_async,
kwargs={
"title": heartbeat.alert_receive_channel.heartbeat_restored_title,
"message": heartbeat.alert_receive_channel.heartbeat_restored_message,
"image_url": None,
"link_to_upstream_details": None,
"alert_receive_channel_pk": heartbeat.alert_receive_channel.pk,
"integration_unique_data": {},
"raw_request_data": heartbeat.alert_receive_channel.heartbeat_restored_payload,
"received_at": timestamp,
},
)
)
restored_count = restored_heartbeats.update(previous_alerted_state_was_life=True)
return f"Found {expired_count} expired and {restored_count} restored heartbeats"
@shared_dedicated_queue_retry_task()
def process_heartbeat_task(alert_receive_channel_pk):
IntegrationHeartBeat.objects.filter(
alert_receive_channel__pk=alert_receive_channel_pk,
).update(last_heartbeat_time=timezone.now())