from celery import shared_task from celery.utils.log import get_task_logger from common.custom_celery_tasks.log_exception_on_failure_task import LogExceptionOnFailureTask RETRY_QUEUE = "retry" logger = logger = get_task_logger(__name__) class DedicatedQueueRetryTask(LogExceptionOnFailureTask): """ Custom task sends all retried task to the dedicated retry queue. It is needed to not to overload regular (high, medium, low) queues with retried tasks. """ def retry( self, args=None, kwargs=None, exc=None, throw=True, eta=None, countdown=None, max_retries=None, **options ): logger.warning("Retrying celery task", exc_info=exc) # Just call retry with queue argument return super().retry( args=args, kwargs=kwargs, exc=exc, throw=throw, eta=eta, countdown=countdown, max_retries=max_retries, queue=RETRY_QUEUE, **options, ) def shared_dedicated_queue_retry_task(*args, **kwargs): return shared_task(*args, base=DedicatedQueueRetryTask, **kwargs)