import time from django.core.cache import cache from django.http import HttpResponse from django.views.generic import View from apps.integrations.mixins import AlertChannelDefiningMixin from common.custom_celery_tasks import shared_dedicated_queue_retry_task @shared_dedicated_queue_retry_task(ignore_result=True) def health_check_task(): return "Ok" class HealthCheckView(View): """ This view is used in k8s liveness probe. k8s periodically makes requests to this view and if the requests fail the container will be restarted """ dangerously_bypass_middlewares = True def get(self, request): return HttpResponse("Ok") class ReadinessCheckView(View): """ This view is used in k8s readiness probe. k8s periodically makes requests to this view and if the requests fail the container will stop getting the traffic. """ dangerously_bypass_middlewares = True def get(self, request): return HttpResponse("Ok") class StartupProbeView(View): """ This view is used in k8s startup probe. k8s makes requests to this view on the startup and if the requests fail the container will be restarted Caching AlertReceive channels if they are not cached. Also checking initial database connection. """ dangerously_bypass_middlewares = True def get(self, request): if cache.get(AlertChannelDefiningMixin.CACHE_KEY_DB_FALLBACK) is None: AlertChannelDefiningMixin().update_alert_receive_channel_cache() cache.set("healthcheck", "healthcheck", 30) # Checking cache connectivity assert cache.get("healthcheck") == "healthcheck" return HttpResponse("Ok") class SlowView(View): def get(self, request): time.sleep(1.5) return HttpResponse("Slept well.") class ExceptionView(View): def get(self, request): raise Exception("Trying exception!")