oncall-engine/engine/engine/celery.py
Michael Derynck 6588f82a67
Remove kwargs from celery logging (#4316)
# What this PR does
Follow-up to #4314. Remove kwargs from celery logging as it included
more information than needed.

## Which issue(s) this PR closes

<!--
*Note*: if you have more than one GitHub issue that this PR closes, be
sure to preface
each issue link with a [closing
keyword](https://docs.github.com/en/get-started/writing-on-github/working-with-advanced-formatting/using-keywords-in-issues-and-pull-requests#linking-a-pull-request-to-an-issue).
This ensures that the issue(s) are auto-closed once the PR has been
merged.
-->

## Checklist

- [ ] Unit, integration, and e2e (if applicable) tests updated
- [x] Documentation added (or `pr:no public docs` PR label added if not
required)
- [x] Added the relevant release notes label (see labels prefixed w/
`release:`). These labels dictate how your PR will
    show up in the autogenerated release notes.
2024-05-07 14:29:49 +00:00

111 lines
4 KiB
Python

import logging
import os
import time
import celery
from celery import Celery
from celery.app.log import TaskFormatter
from celery.utils.debug import memdump, sample_mem
from celery.utils.log import get_task_logger
from django.conf import settings
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "settings.prod")
logger = get_task_logger(__name__)
logger.setLevel(logging.DEBUG)
app = Celery("proj")
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object("django.conf:settings", namespace="CELERY")
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
# This task is required for tests with celery, see:
# https://stackoverflow.com/questions/46530784/make-django-test-case-database-visible-to-celery
@app.task(name="celery.ping")
def ping():
# type: () -> str
"""Simple task that just returns 'pong'."""
return "pong"
@celery.signals.after_setup_logger.connect
@celery.signals.after_setup_task_logger.connect
def on_after_setup_logger(logger, **kwargs):
for handler in logger.handlers:
handler.setFormatter(
TaskFormatter(
"%(asctime)s source=engine:celery worker=%(processName)s task_id=%(task_id)s task_name=%(task_name)s name=%(name)s level=%(levelname)s %(message)s"
)
)
@celery.signals.worker_ready.connect
def on_worker_ready(*args, **kwargs):
from apps.telegram.tasks import register_telegram_webhook
if not settings.FEATURE_TELEGRAM_LONG_POLLING_ENABLED:
register_telegram_webhook.delay()
# Temporary disable celery auto instrumentation.
# I'm working on propagating trace_id to logs and trying to keep setup as easy as possible
# if settings.OTEL_TRACING_ENABLED and settings.OTEL_EXPORTER_OTLP_ENDPOINT and False:
#
# @celery.signals.worker_process_init.connect(weak=False)
# def init_celery_tracing(*args, **kwargs):
# trace.set_tracer_provider(TracerProvider())
# span_processor = BatchSpanProcessor(OTLPSpanExporter())
# trace.get_tracer_provider().add_span_processor(span_processor)
# PyMySQLInstrumentor().instrument()
# CeleryInstrumentor().instrument()
if settings.DEBUG_CELERY_TASKS_PROFILING:
@celery.signals.task_prerun.connect
def start_task_timer(task_id=None, task=None, *a, **kw):
logger.info("started: {} of {} with cpu={} at {}".format(task_id, task.name, time.perf_counter(), time.time()))
sample_mem()
@celery.signals.task_postrun.connect
def finish_task_timer(task_id=None, task=None, *a, **kw):
logger.info("ended: {} of {} with cpu={} at {}".format(task_id, task.name, time.perf_counter(), time.time()))
sample_mem()
memdump()
if settings.PYROSCOPE_PROFILER_ENABLED:
@celery.signals.worker_process_init.connect(weak=False)
def init_pyroscope(*args, **kwargs):
import pyroscope
pyroscope.configure(
application_name=settings.PYROSCOPE_APPLICATION_NAME,
server_address=settings.PYROSCOPE_SERVER_ADDRESS,
auth_token=settings.PYROSCOPE_AUTH_TOKEN,
detect_subprocesses=True, # detect subprocesses started by the main process; default is False
tags={"type": "celery", "celery_worker": os.environ.get("CELERY_WORKER_QUEUE", "no_queue_specified")},
)
if settings.LOG_CELERY_TASK_ARGUMENTS:
"""
Note: Task ID and name are already provided in TaskFormatter prefix, arguments get listed in message
"""
@celery.signals.task_prerun.connect
def log_started_task_arguments(sender=None, task_id=None, task=None, args=None, kwargs=None, **extras):
logger.info(f"task started args={args}")
@celery.signals.task_postrun.connect
def log_finished_task_arguments(sender=None, task_id=None, task=None, args=None, kwargs=None, **extras):
logger.info(f"task finished args={args}")