oncall-engine/engine/apps/schedules/tasks/drop_cached_ical.py

33 lines
1.4 KiB
Python

from celery.utils.log import get_task_logger
from common.custom_celery_tasks import shared_dedicated_queue_retry_task
task_logger = get_task_logger(__name__)
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=1)
def drop_cached_ical_task(schedule_pk):
from apps.schedules.models import OnCallSchedule
from apps.schedules.tasks import refresh_ical_final_schedule
task_logger.info(f"Start drop_cached_ical_task for schedule {schedule_pk}")
try:
schedule = OnCallSchedule.objects.get(pk=schedule_pk)
schedule.drop_cached_ical()
except OnCallSchedule.DoesNotExist:
task_logger.info(f"Tried to drop_cached_ical_task for non-existing schedule {schedule_pk}")
else:
# queue a refresh for final schedule
refresh_ical_final_schedule.apply_async((schedule_pk,))
task_logger.info(f"Finish drop_cached_ical_task for schedule {schedule_pk}")
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=1)
def drop_cached_ical_for_custom_events_for_organization(organization_id):
from apps.schedules.models import OnCallScheduleCalendar
for schedule in OnCallScheduleCalendar.objects.filter(organization_id=organization_id):
drop_cached_ical_task.apply_async(
(schedule.pk,),
)
task_logger.info(f"drop cached ica for org_id {organization_id}")