oncall-engine/engine/common/tests/test_task_queue_assignment.py
Joey Orlando f85cc6d33b
add more logging on celery task retry (#3695)
# What this PR does

This is a follow up to https://github.com/grafana/oncall/pull/3677.

It appears that when a task uses the [`autoretry_for`
kwarg](https://docs.celeryq.dev/en/stable/userguide/tasks.html#automatic-retry-for-known-exceptions)
in the task decorator, it doesn't log the exception in `on_failure` as
would be expected. Now when retrying, we log out a message + any
exception/stack trace information.

## Checklist

- [x] Unit, integration, and e2e (if applicable) tests updated
- [x] Documentation added (or `pr:no public docs` PR label added if not
required)
- [x] `CHANGELOG.md` updated (or `pr:no changelog` PR label added if not
required)
2024-01-16 07:13:16 -05:00

35 lines
1.4 KiB
Python

from celery import current_app
from settings.celery_task_routes import CELERY_TASK_ROUTES
"""
If a task has a legitimate reason to not have a queue assignment it can
be added here (In development, in process of deprecation, etc.) if possible
we should avoid @shared_dedicated_queue_retry_task or @shared_task and
remove entirely if it is not needed.
"""
COMMON_IGNORED_TASKS = {
"common.custom_celery_tasks.tests.test_dedicated_queue_retry_task.my_task",
"common.custom_celery_tasks.tests.test_log_exception_on_failure_task.my_task",
"common.custom_celery_tasks.tests.test_log_exception_on_failure_task.my_task_two",
}
def check_celery_task_route_mapping(task_ids, ignored_prefixes, additional_ignored_tasks=None):
tasks = set(k for k in current_app.tasks.keys() if not k.startswith(ignored_prefixes))
tasks -= set(COMMON_IGNORED_TASKS)
if additional_ignored_tasks:
tasks -= set(additional_ignored_tasks)
tasks -= set(task_ids)
if tasks:
print(f"Unassigned queue for celery task {tasks}")
assert len(tasks) == 0
def test_celery_task_route_mapping():
"""
If this test does not pass make sure you have added any newly added
@shared_dedicated_queue_retry_task or @shared_task to CELERY_TASK_ROUTES
in engine/settings/celery_task_routes.py
"""
check_celery_task_route_mapping(CELERY_TASK_ROUTES.keys(), ("extensions", "celery"))