import logging import os import time import celery from celery import Celery from celery.app.log import TaskFormatter from celery.utils.debug import memdump, sample_mem from celery.utils.log import get_task_logger from django.conf import settings os.environ.setdefault("DJANGO_SETTINGS_MODULE", "settings.prod") logger = get_task_logger(__name__) logger.setLevel(logging.DEBUG) app = Celery("proj") # Using a string here means the worker doesn't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object("django.conf:settings", namespace="CELERY") # Load task modules from all registered Django app configs. app.autodiscover_tasks() # This task is required for tests with celery, see: # https://stackoverflow.com/questions/46530784/make-django-test-case-database-visible-to-celery @app.task(name="celery.ping") def ping(): # type: () -> str """Simple task that just returns 'pong'.""" return "pong" @celery.signals.after_setup_logger.connect @celery.signals.after_setup_task_logger.connect def on_after_setup_logger(logger, **kwargs): for handler in logger.handlers: handler.setFormatter( TaskFormatter( "%(asctime)s source=engine:celery worker=%(processName)s task_id=%(task_id)s task_name=%(task_name)s name=%(name)s level=%(levelname)s %(message)s" ) ) @celery.signals.worker_ready.connect def on_worker_ready(*args, **kwargs): from apps.telegram.tasks import register_telegram_webhook if not settings.FEATURE_TELEGRAM_LONG_POLLING_ENABLED: register_telegram_webhook.delay() # Temporary disable celery auto instrumentation. # I'm working on propagating trace_id to logs and trying to keep setup as easy as possible # if settings.OTEL_TRACING_ENABLED and settings.OTEL_EXPORTER_OTLP_ENDPOINT and False: # # @celery.signals.worker_process_init.connect(weak=False) # def init_celery_tracing(*args, **kwargs): # trace.set_tracer_provider(TracerProvider()) # span_processor = BatchSpanProcessor(OTLPSpanExporter()) # trace.get_tracer_provider().add_span_processor(span_processor) # PyMySQLInstrumentor().instrument() # CeleryInstrumentor().instrument() if settings.DEBUG_CELERY_TASKS_PROFILING: @celery.signals.task_prerun.connect def start_task_timer(task_id=None, task=None, *a, **kw): logger.info("started: {} of {} with cpu={} at {}".format(task_id, task.name, time.perf_counter(), time.time())) sample_mem() @celery.signals.task_postrun.connect def finish_task_timer(task_id=None, task=None, *a, **kw): logger.info("ended: {} of {} with cpu={} at {}".format(task_id, task.name, time.perf_counter(), time.time())) sample_mem() memdump() if settings.PYROSCOPE_PROFILER_ENABLED: @celery.signals.worker_process_init.connect(weak=False) def init_pyroscope(*args, **kwargs): import pyroscope pyroscope.configure( application_name=settings.PYROSCOPE_APPLICATION_NAME, server_address=settings.PYROSCOPE_SERVER_ADDRESS, auth_token=settings.PYROSCOPE_AUTH_TOKEN, detect_subprocesses=True, # detect subprocesses started by the main process; default is False tags={"type": "celery", "celery_worker": os.environ.get("CELERY_WORKER_QUEUE", "no_queue_specified")}, ) if settings.LOG_CELERY_TASK_ARGUMENTS: """ Note: Task ID and name are already provided in TaskFormatter prefix, arguments get listed in message """ @celery.signals.task_prerun.connect def log_started_task_arguments(sender=None, task_id=None, task=None, args=None, kwargs=None, **extras): logger.info(f"task started args={args}") @celery.signals.task_postrun.connect def log_finished_task_arguments(sender=None, task_id=None, task=None, args=None, kwargs=None, **extras): logger.info(f"task finished args={args}")