import copy import datetime import itertools import re import typing from collections import defaultdict from enum import Enum import icalendar import pytz from django.conf import settings from django.core.validators import MinLengthValidator from django.db import models from django.db.utils import DatabaseError from django.utils import timezone from django.utils.functional import cached_property from polymorphic.managers import PolymorphicManager from polymorphic.models import PolymorphicModel from polymorphic.query import PolymorphicQuerySet from apps.grafana_plugin.ui_url_builder import UIURLBuilder from apps.schedules.constants import ( EXPORT_WINDOW_DAYS_AFTER, EXPORT_WINDOW_DAYS_BEFORE, ICAL_COMPONENT_VEVENT, ICAL_DATETIME_END, ICAL_DATETIME_STAMP, ICAL_DATETIME_START, ICAL_LAST_MODIFIED, ICAL_PRIORITY, ICAL_STATUS, ICAL_STATUS_CANCELLED, ICAL_SUMMARY, ICAL_UID, PREFETCHED_SHIFT_SWAPS, SCHEDULE_CHECK_NEXT_DAYS, ) from apps.schedules.ical_utils import ( EmptyShifts, create_base_icalendar, fetch_ical_file_or_get_error, get_oncall_users_for_multiple_schedules, list_of_empty_shifts_in_schedule, list_of_oncall_shifts_from_ical, ) from apps.schedules.models import CustomOnCallShift from apps.user_management.models import User from common.database import NON_POLYMORPHIC_CASCADE, NON_POLYMORPHIC_SET_NULL from common.public_primary_keys import generate_public_primary_key, increase_public_primary_key_length if typing.TYPE_CHECKING: from django.db.models.manager import RelatedManager from apps.alerts.models import EscalationPolicy from apps.auth_token.models import ScheduleExportAuthToken from apps.schedules.models import ShiftSwapRequest from apps.slack.models import SlackChannel, SlackUserGroup from apps.user_management.models import Organization, Team RE_ICAL_SEARCH_USERNAME = r"SUMMARY:(\[L[0-9]+\] )?{}" RE_ICAL_FETCH_USERNAME = re.compile(r"SUMMARY:(?:\[L[0-9]+\] )?([^\s]+)") # Utility classes for schedule quality report class QualityReportCommentType(str, Enum): INFO = "info" WARNING = "warning" class QualityReportComment(typing.TypedDict): type: QualityReportCommentType text: str class QualityReportOverloadedUser(typing.TypedDict): id: str username: str score: int QualityReportOverloadedUsers = typing.List[QualityReportOverloadedUser] QualityReportComments = typing.List[QualityReportComment] class QualityReport(typing.TypedDict): total_score: int comments: QualityReportComments overloaded_users: QualityReportOverloadedUsers class ScheduleEventUser(typing.TypedDict): display_name: str pk: str email: str avatar_full: str class SwapRequest(typing.TypedDict): pk: str user: typing.Optional[ScheduleEventUser] class MaybeSwappedScheduleEventUser(ScheduleEventUser): swap_request: typing.Optional[SwapRequest] class ScheduleEventShift(typing.TypedDict): pk: str class ScheduleEvent(typing.TypedDict): all_day: bool start: datetime.datetime end: datetime.datetime users: typing.List[MaybeSwappedScheduleEventUser] missing_users: typing.List[str] priority_level: typing.Optional[int] source: typing.Optional[str] calendar_type: typing.Optional[int] is_empty: bool is_gap: bool is_override: bool shift: ScheduleEventShift class ScheduleFinalShift(typing.TypedDict): user_pk: str user_email: str user_username: str shift_start: str shift_end: str ScheduleEvents = typing.List[ScheduleEvent] ScheduleEventIntervals = typing.List[typing.List[datetime.datetime]] ScheduleFinalShifts = typing.List[ScheduleFinalShift] DurationMap = typing.Dict[str, datetime.timedelta] def generate_public_primary_key_for_oncall_schedule_channel(): prefix = "S" new_public_primary_key = generate_public_primary_key(prefix) failure_counter = 0 while OnCallSchedule.objects.filter(public_primary_key=new_public_primary_key).exists(): new_public_primary_key = increase_public_primary_key_length( failure_counter=failure_counter, prefix=prefix, model_name="OnCallSchedule" ) failure_counter += 1 return new_public_primary_key class OnCallScheduleQuerySet(PolymorphicQuerySet): def get_oncall_users(self, events_datetime=None): return get_oncall_users_for_multiple_schedules(self.all(), events_datetime) def related_to_user(self, user): username_regex = RE_ICAL_SEARCH_USERNAME.format(user.username) return self.filter( cached_ical_final_schedule__regex=username_regex, organization=user.organization, ) class OnCallSchedule(PolymorphicModel): custom_shifts: "RelatedManager['CustomOnCallShift']" organization: "Organization" shift_swap_requests: "RelatedManager['ShiftSwapRequest']" slack_channel: typing.Optional["SlackChannel"] team: typing.Optional["Team"] user_group: typing.Optional["SlackUserGroup"] objects: models.Manager["OnCallSchedule"] = PolymorphicManager.from_queryset(OnCallScheduleQuerySet)() # type of calendars in schedule TYPE_ICAL_PRIMARY, TYPE_ICAL_OVERRIDES, TYPE_CALENDAR = range( 3 ) # todo: discuss do we need the third type (this types used for frontend) PRIMARY, OVERRIDES = range(2) CALENDAR_TYPE_VERBAL = {PRIMARY: "primary", OVERRIDES: "overrides"} public_primary_key = models.CharField( max_length=20, validators=[MinLengthValidator(settings.PUBLIC_PRIMARY_KEY_MIN_LENGTH + 1)], unique=True, default=generate_public_primary_key_for_oncall_schedule_channel, ) cached_ical_file_primary = models.TextField(null=True, default=None) prev_ical_file_primary = models.TextField(null=True, default=None) cached_ical_file_overrides = models.TextField(null=True, default=None) prev_ical_file_overrides = models.TextField(null=True, default=None) cached_ical_final_schedule = models.TextField(null=True, default=None) organization = models.ForeignKey( "user_management.Organization", on_delete=NON_POLYMORPHIC_CASCADE, related_name="oncall_schedules" ) team = models.ForeignKey( "user_management.Team", on_delete=NON_POLYMORPHIC_SET_NULL, related_name="oncall_schedules", null=True, default=None, ) name = models.CharField(max_length=200) slack_channel = models.ForeignKey( "slack.SlackChannel", null=True, default=None, on_delete=models.SET_NULL, related_name="+", ) # Slack user group to be updated when on-call users change for this schedule user_group = models.ForeignKey( to="slack.SlackUserGroup", null=True, on_delete=NON_POLYMORPHIC_SET_NULL, related_name="oncall_schedules" ) # schedule reminder related fields class NotifyOnCallShiftFreq(models.IntegerChoices): NEVER = 0, "Never" EACH_SHIFT = 1, "Each shift" class NotifyEmptyOnCall(models.IntegerChoices): ALL = 0, "Notify all people in the channel" PREV = 1, "Mention person from the previous slot" NO_ONE = 2, "Inform about empty slot" current_shifts = models.TextField(null=False, default="{}") # Used to not drop current_shifts to use them when "Mention person from the previous slot" empty_oncall = models.BooleanField(default=True) notify_oncall_shift_freq = models.IntegerField( null=False, choices=NotifyOnCallShiftFreq.choices, default=NotifyOnCallShiftFreq.EACH_SHIFT, ) mention_oncall_start = models.BooleanField(null=False, default=True) mention_oncall_next = models.BooleanField(null=False, default=False) notify_empty_oncall = models.IntegerField( null=False, choices=NotifyEmptyOnCall.choices, default=NotifyEmptyOnCall.ALL ) # Gaps-checker related fields has_gaps = models.BooleanField(default=False) gaps_report_sent_at = models.DateField(null=True, default=None) # empty shifts checker related fields has_empty_shifts = models.BooleanField(default=False) empty_shifts_report_sent_at = models.DateField(null=True, default=None) @property def web_page_link(self) -> str: return UIURLBuilder(self.organization).schedules() @property def web_detail_page_link(self) -> str: return UIURLBuilder(self.organization).schedule_detail(self.public_primary_key) @property def slack_url(self) -> str: return f"<{self.web_detail_page_link}|{self.name}>" @property def slack_channel_slack_id(self) -> typing.Optional[str]: return self.slack_channel.slack_id if self.slack_channel else None def get_icalendars(self) -> typing.Tuple[typing.Optional[icalendar.Calendar], typing.Optional[icalendar.Calendar]]: """Returns list of calendars. Primary calendar should always be the first""" # if self._ical_file_(primary|overrides) is None -> no cache, will trigger a refresh # if self._ical_file_(primary|overrides) == "" -> cached value for an empty schedule if self._ical_file_primary: calendar_primary: icalendar.Calendar = icalendar.Calendar.from_ical(self._ical_file_primary) else: calendar_primary = None if self._ical_file_overrides: calendar_overrides: icalendar.Calendar = icalendar.Calendar.from_ical(self._ical_file_overrides) else: calendar_overrides = None return calendar_primary, calendar_overrides def get_prev_and_current_ical_files(self): """Returns list of tuples with prev and current iCal files for each calendar""" return [ (self.prev_ical_file_primary, self.cached_ical_file_primary), (self.prev_ical_file_overrides, self.cached_ical_file_overrides), ] def check_gaps_and_empty_shifts_for_next_days(self, days=SCHEDULE_CHECK_NEXT_DAYS) -> None: datetime_start = timezone.now() datetime_end = datetime_start + datetime.timedelta(days=days) # get empty shifts from all events and gaps from final events events = self.filter_events( datetime_start, datetime_end, with_empty=True, with_gap=True, all_day_datetime=True, ) has_empty_shifts = len([event for event in events if event["is_empty"]]) != 0 final_events = self._resolve_schedule(events, datetime_start, datetime_end) has_gaps = len([final_event for final_event in final_events if final_event["is_gap"]]) != 0 if has_gaps != self.has_gaps or has_empty_shifts != self.has_empty_shifts: self.has_gaps = has_gaps self.has_empty_shifts = has_empty_shifts self.save(update_fields=["has_gaps", "has_empty_shifts"]) def get_gaps_for_next_days(self, days=SCHEDULE_CHECK_NEXT_DAYS) -> ScheduleEvents: today = timezone.now() events = self.final_events(today, today + datetime.timedelta(days=days)) return [event for event in events if event["is_gap"]] def get_empty_shifts_for_next_days(self, days=SCHEDULE_CHECK_NEXT_DAYS) -> EmptyShifts: today = timezone.now().date() return list_of_empty_shifts_in_schedule(self, today, today + datetime.timedelta(days=days)) def drop_cached_ical(self): self._drop_primary_ical_file() self._drop_overrides_ical_file() def refresh_ical_file(self): self._refresh_primary_ical_file() self._refresh_overrides_ical_file() @property def _ical_file_primary(self): raise NotImplementedError @property def _ical_file_overrides(self): raise NotImplementedError def _refresh_primary_ical_file(self): raise NotImplementedError def _refresh_overrides_ical_file(self): raise NotImplementedError def _drop_primary_ical_file(self): self.prev_ical_file_primary = self.cached_ical_file_primary self.cached_ical_file_primary = None self.save(update_fields=["cached_ical_file_primary", "prev_ical_file_primary"]) def _drop_overrides_ical_file(self): self.prev_ical_file_overrides = self.cached_ical_file_overrides self.cached_ical_file_overrides = None self.save(update_fields=["cached_ical_file_overrides", "prev_ical_file_overrides"]) def related_users(self): """Return users referenced in the schedule.""" usernames = [] if self.cached_ical_final_schedule: usernames += RE_ICAL_FETCH_USERNAME.findall(self.cached_ical_final_schedule) return self.organization.users.filter(username__in=usernames) def filter_events( self, datetime_start: datetime.datetime, datetime_end: datetime.datetime, with_empty: bool = False, with_gap: bool = False, filter_by: str | None = None, all_day_datetime: bool = False, ignore_untaken_swaps: bool = False, from_cached_final: bool = False, include_shift_info: bool = False, ) -> ScheduleEvents: """Return filtered events from schedule.""" try: shifts = ( list_of_oncall_shifts_from_ical( self, datetime_start, datetime_end, with_empty, with_gap, filter_by=filter_by, from_cached_final=from_cached_final, ) or [] ) except ValueError: # raised when filtering events on a non-saved/deleted schedule return [] shifts_data = {} if include_shift_info: pks = set(shift["shift_pk"] for shift in shifts) shifts_from_db = CustomOnCallShift.objects.filter( organization=self.organization, public_primary_key__in=pks ) shifts_data = {s.public_primary_key: {"name": s.name, "type": s.type} for s in shifts_from_db} events: ScheduleEvents = [] for shift in shifts: start = shift["start"] all_day = type(start) is datetime.date # fix confusing end date for all-day event end = shift["end"] - datetime.timedelta(days=1) if all_day else shift["end"] if all_day and all_day_datetime: start = datetime.datetime.combine(start, datetime.datetime.min.time(), tzinfo=pytz.UTC) end = datetime.datetime.combine(end, datetime.datetime.max.time(), tzinfo=pytz.UTC).replace( microsecond=0 ) is_gap = shift.get("is_gap", False) shift_json: ScheduleEvent = { "all_day": all_day, "start": start, "end": end, "users": [ { "display_name": user.username, "email": user.email, "pk": user.public_primary_key, "avatar_full": user.avatar_full_url(self.organization), } for user in shift["users"] ], "missing_users": shift["missing_users"], "priority_level": shift["priority"] or 0, "source": shift["source"], "calendar_type": shift["calendar_type"], "is_empty": len(shift["users"]) == 0 and not is_gap, "is_gap": is_gap, "is_override": shift["calendar_type"] == OnCallSchedule.TYPE_ICAL_OVERRIDES, "shift": { "pk": shift["shift_pk"], }, } if include_shift_info and not is_gap: no_data = { "name": None, "type": CustomOnCallShift.TYPE_OVERRIDE if shift["calendar_type"] == OnCallSchedule.TYPE_ICAL_OVERRIDES else None, } shift_data = shifts_data.get(shift["shift_pk"], no_data) shift_json["shift"].update(shift_data) events.append(shift_json) # combine multiple-users same-shift events into one events = self._merge_events(events) # annotate events with swap request details swapping users as needed events = self._apply_swap_requests( events, datetime_start, datetime_end, ignore_untaken_swaps=ignore_untaken_swaps ) return events def final_events( self, datetime_start: datetime.datetime, datetime_end: datetime.datetime, with_empty: bool = True, with_gap: bool = True, ignore_untaken_swaps: bool = False, include_shift_info: bool = False, ) -> ScheduleEvents: """Return schedule final events, after resolving shifts and overrides.""" events = self.filter_events( datetime_start, datetime_end, with_empty=with_empty, with_gap=with_gap, all_day_datetime=True, ignore_untaken_swaps=ignore_untaken_swaps, include_shift_info=include_shift_info, ) events = self._resolve_schedule(events, datetime_start, datetime_end) return events def filter_swap_requests( self, datetime_start: datetime.datetime, datetime_end: datetime.datetime ) -> "RelatedManager['ShiftSwapRequest']": swap_requests = self.shift_swap_requests.filter( # starting before but ongoing swap_start__lt=datetime_start, swap_end__gte=datetime_start ).union( self.shift_swap_requests.filter( # starting after but before end swap_start__gte=datetime_start, swap_start__lte=datetime_end ) ) swap_requests = swap_requests.order_by("created_at") return swap_requests def refresh_ical_final_schedule(self): now = timezone.now() # window to consider: from now, -15 days + 6 months delta = EXPORT_WINDOW_DAYS_BEFORE days = EXPORT_WINDOW_DAYS_AFTER + delta datetime_start = now.replace(hour=0, minute=0, second=0, microsecond=0) - datetime.timedelta(days=delta) datetime_end = datetime_start + datetime.timedelta(days=days - 1, hours=23, minutes=59, seconds=59) # setup calendar with final schedule shift events calendar = create_base_icalendar(self.name) events = self.final_events(datetime_start, datetime_end, ignore_untaken_swaps=True) updated_ids = set() for e in events: for u in e["users"]: event = icalendar.Event() event.add(ICAL_SUMMARY, u["display_name"]) event.add(ICAL_DATETIME_START, e["start"]) event.add(ICAL_DATETIME_END, e["end"]) event.add(ICAL_DATETIME_STAMP, now) event.add(ICAL_LAST_MODIFIED, now) # set priority based on primary/overrides # 0: undefined priority, 1: high priority event.add(ICAL_PRIORITY, e["calendar_type"]) event_uid = "{}-{}-{}".format(e["shift"]["pk"], e["start"].strftime("%Y%m%d%H%S"), u["pk"]) event[ICAL_UID] = event_uid calendar.add_component(event) updated_ids.add(event_uid) # check previously cached final schedule for potentially cancelled events if self.cached_ical_final_schedule: previous = icalendar.Calendar.from_ical(self.cached_ical_final_schedule) for component in previous.walk(): if component.name == ICAL_COMPONENT_VEVENT and component[ICAL_UID] not in updated_ids: # check if event was ended or cancelled, update ical dtend = component.get(ICAL_DATETIME_END) dtend_datetime = dtend.dt if dtend else None if dtend_datetime and type(dtend_datetime) is datetime.date: # shift or overrides coming from ical calendars can be all day events, change to datetime dtend_datetime = datetime.datetime.combine( dtend.dt, datetime.datetime.min.time(), tzinfo=pytz.UTC ) if dtend_datetime and dtend_datetime < datetime_start: # event ended before window start continue is_cancelled = component.get(ICAL_STATUS) last_modified = component.get(ICAL_LAST_MODIFIED) if is_cancelled and last_modified and last_modified.dt < datetime_start: # drop already ended events older than the window we consider continue elif is_cancelled and not last_modified: # set last_modified if it was missing (e.g. from previous export ical implementation) component[ICAL_LAST_MODIFIED] = icalendar.vDatetime(now).to_ical() elif not is_cancelled: # set the event as cancelled component[ICAL_DATETIME_END] = component[ICAL_DATETIME_START] component[ICAL_STATUS] = ICAL_STATUS_CANCELLED component[ICAL_LAST_MODIFIED] = icalendar.vDatetime(now).to_ical() # include just cancelled events as well as those that were cancelled during the time window calendar.add_component(component) ical_data = calendar.to_ical().decode() self.cached_ical_final_schedule = ical_data self.save(update_fields=["cached_ical_final_schedule"]) def shifts_for_user( self, user: User, datetime_start: datetime.datetime, datetime_end: typing.Optional[datetime.datetime] = None, days: typing.Optional[int] = None, ) -> typing.Tuple[ScheduleEvents, ScheduleEvents, ScheduleEvents]: """ NOTE: must specify at least `datetime_end` or `days` """ if not datetime_end and not days: raise ValueError("Must specify at least `datetime_end` or `days`") now = timezone.now() if days is not None: datetime_end = datetime_start + datetime.timedelta(days=days) passed_shifts: ScheduleEvents = [] current_shifts: ScheduleEvents = [] upcoming_shifts: ScheduleEvents = [] if self.cached_ical_final_schedule is None: # no final schedule info available return passed_shifts, current_shifts, upcoming_shifts events = self.filter_events( datetime_start, datetime_end, all_day_datetime=True, from_cached_final=True, include_shift_info=True ) events.sort(key=lambda e: e["start"]) for event in events: users = {u["pk"] for u in event["users"]} if user.public_primary_key in users: if event["end"] <= now: passed_shifts.append(event) elif event["start"] <= now < event["end"]: current_shifts.append(event) else: upcoming_shifts.append(event) return passed_shifts, current_shifts, upcoming_shifts def quality_report(self, date: typing.Optional[datetime.datetime], days: typing.Optional[int]) -> QualityReport: """ Return schedule quality report to be used by the web UI. TODO: Add scores on "inside working hours" and "balance outside working hours" when TODO: working hours editor is implemented in the web UI. """ # get events to consider for calculation if date is None: today = timezone.now() date = today - datetime.timedelta(days=7 - today.weekday()) # start of next week in UTC if days is None: days = 52 * 7 # consider next 52 weeks (~1 year) datetime_end = date + datetime.timedelta(days=days - 1, hours=23, minutes=59, seconds=59) events = self.final_events(date, datetime_end) # an event is “good” if it's not a gap and not empty good_events: ScheduleEvents = [event for event in events if not event["is_gap"] and not event["is_empty"]] if not good_events: return { "total_score": 0, "comments": [{"type": QualityReportCommentType.WARNING, "text": "Schedule is empty"}], "overloaded_users": [], } def event_duration(ev: ScheduleEvent) -> datetime.timedelta: return ev["end"] - ev["start"] def timedelta_sum(deltas: typing.Iterable[datetime.timedelta]) -> datetime.timedelta: return sum(deltas, start=datetime.timedelta()) def score_to_percent(value: float) -> int: return round(value * 100) def get_duration_map(evs: ScheduleEvents) -> DurationMap: """Return a map of user PKs to total duration of events they are in.""" result: DurationMap = defaultdict(datetime.timedelta) for ev in evs: for user in ev["users"]: user_pk = user["pk"] result[user_pk] += event_duration(ev) return result def get_balance_score_by_duration_map(dur_map: DurationMap) -> float: """ Return a score between 0 and 1, based on how balanced the durations are in the duration map. The formula is taken from https://github.com/grafana/oncall/issues/118#issuecomment-1161787854. """ if len(dur_map) <= 1: return 1 result = 0.0 for key_1, key_2 in itertools.combinations(dur_map, 2): duration_1 = dur_map[key_1] duration_2 = dur_map[key_2] result += min(duration_1, duration_2) / max(duration_1, duration_2) number_of_pairs = len(dur_map) * (len(dur_map) - 1) // 2 return result / number_of_pairs # calculate good event score good_events_duration = timedelta_sum(event_duration(event) for event in good_events) good_event_score = min(good_events_duration / datetime.timedelta(days=days), 1) good_event_score = score_to_percent(good_event_score) # calculate balance score duration_map = get_duration_map(good_events) balance_score = get_balance_score_by_duration_map(duration_map) balance_score = score_to_percent(balance_score) # calculate overloaded users overloaded_users: QualityReportOverloadedUsers = [] if balance_score >= 95: # tolerate minor imbalance balance_score = 100 else: average_duration = timedelta_sum(duration_map.values()) / len(duration_map) overloaded_user_pks = [ user_pk for user_pk, duration in duration_map.items() if score_to_percent(duration / average_duration) > 100 ] usernames = { u.public_primary_key: u.username for u in User.objects.filter(public_primary_key__in=overloaded_user_pks).only( "public_primary_key", "username" ) } for user_pk in overloaded_user_pks: score = score_to_percent(duration_map[user_pk] / average_duration) - 100 username = usernames.get(user_pk) or "unknown" # fallback to "unknown" if user is not found overloaded_users.append({"id": user_pk, "username": username, "score": score}) # show most overloaded users first overloaded_users.sort(key=lambda u: (-u["score"], u["username"])) # generate comments regarding gaps comments: QualityReportComments = [] if good_event_score == 100: comments.append({"type": QualityReportCommentType.INFO, "text": "Schedule has no gaps"}) else: not_covered = 100 - good_event_score comments.append( {"type": QualityReportCommentType.WARNING, "text": f"Schedule has gaps ({not_covered}% not covered)"} ) # generate comments regarding balance if balance_score == 100: comments.append({"type": QualityReportCommentType.INFO, "text": "Schedule is perfectly balanced"}) else: comments.append( {"type": QualityReportCommentType.WARNING, "text": "Schedule has balance issues (see overloaded users)"} ) # calculate total score (weighted sum of good event score and balance score) total_score = round((good_event_score + balance_score) / 2) return { "total_score": total_score, "comments": comments, "overloaded_users": overloaded_users, } def _apply_swap_requests( self, events: ScheduleEvents, datetime_start: datetime.datetime, datetime_end: datetime.datetime, ignore_untaken_swaps: bool = False, ) -> ScheduleEvents: """Apply swap requests details to schedule events.""" # get swaps requests affecting this schedule / time range prefetched_swaps = getattr(self, PREFETCHED_SHIFT_SWAPS, None) swaps = ( prefetched_swaps if prefetched_swaps is not None else self.filter_swap_requests(datetime_start, datetime_end) ) def _insert_event(index: int, event: ScheduleEvent) -> int: # add event, if any, to events list in the specified index # return incremented index if the event was added if event is None: return index events.insert(index, event) return index + 1 # apply swaps sequentially for swap in swaps: if swap.is_past_due or (ignore_untaken_swaps and not swap.is_taken): # ignore expired requests, or untaken if specified continue i = 0 while i < len(events): event = events.pop(i) if event["start"] >= swap.swap_end or event["end"] <= swap.swap_start: # event outside the swap period, keep as it is and continue i = _insert_event(i, event) continue users = set(u["pk"] for u in event["users"]) if swap.beneficiary.public_primary_key in users: # swap request affects current event split_before = None if event["start"] < swap.swap_start: # partially included start -> split split_before = copy.deepcopy(event) split_before["end"] = swap.swap_start # update event to swap event["start"] = swap.swap_start split_after = None if event["end"] > swap.swap_end: # partially included end -> split split_after = copy.deepcopy(event) split_after["start"] = swap.swap_end # update event to swap event["end"] = swap.swap_end # identify user to swap user_to_swap = None for u in event["users"]: if u["pk"] == swap.beneficiary.public_primary_key: user_to_swap = u break # apply swap changes to event user swap_details = {"pk": swap.public_primary_key} if swap.benefactor: # swap is taken, update user in shift user_to_swap["pk"] = swap.benefactor.public_primary_key user_to_swap["display_name"] = swap.benefactor.username user_to_swap["email"] = swap.benefactor.email user_to_swap["avatar_full"] = swap.benefactor.avatar_full_url(self.organization) # add beneficiary user to details swap_details["user"] = { "display_name": swap.beneficiary.username, "email": swap.beneficiary.email, "pk": swap.beneficiary.public_primary_key, "avatar_full": swap.beneficiary.avatar_full_url(self.organization), } user_to_swap["swap_request"] = swap_details # update events list # keep first split event in its original index i = _insert_event(i, split_before) # insert updated swap-related event i = _insert_event(i, event) # keep second split event after swap i = _insert_event(i, split_after) else: # event for different user(s), keep as it is and continue i = _insert_event(i, event) return events def _resolve_schedule( self, events: ScheduleEvents, datetime_start: datetime.datetime, datetime_end: datetime.datetime ) -> ScheduleEvents: """Calculate final schedule shifts considering rotations and overrides. Exclude events that after split/update are out of the requested (datetime_start, datetime_end) range. """ if not events: return [] def event_start_cmp_key(e: ScheduleEvent) -> datetime.datetime: return e["start"] def event_cmp_key(e: ScheduleEvent) -> typing.Tuple[int, int, datetime.datetime]: """Sorting key criteria for events.""" start = event_start_cmp_key(e) return ( -e["calendar_type"] if e["calendar_type"] else 0, # overrides: 1, shifts: 0, gaps: None -e["priority_level"] if e["priority_level"] else 0, start, ) def insort_event(eventlist: ScheduleEvents, e: ScheduleEvent) -> None: """Insert event keeping ordering criteria into already sorted event list.""" idx = 0 for i in eventlist: if event_cmp_key(e) > event_cmp_key(i): idx += 1 else: break eventlist.insert(idx, e) def _merge_intervals(evs: ScheduleEvents) -> ScheduleEventIntervals: """Keep track of scheduled intervals.""" if not evs: return [] intervals = [[e["start"], e["end"]] for e in evs] result = [intervals[0]] for interval in intervals[1:]: previous_interval = result[-1] if previous_interval[0] <= interval[0] <= previous_interval[1]: previous_interval[1] = max(previous_interval[1], interval[1]) else: result.append(interval) return result # sort schedule events by (type desc, priority desc, start timestamp asc) events.sort(key=event_cmp_key) # iterate over events, reserving schedule slots based on their priority # if the expected slot was already scheduled for a higher priority event, # split the event, or fix start/end timestamps accordingly intervals = [] resolved: ScheduleEvents = [] pending: ScheduleEvents = events current_interval_idx = 0 # current scheduled interval being checked current_type: typing.Optional[int] = OnCallSchedule.TYPE_ICAL_OVERRIDES # current calendar type current_priority: typing.Optional[int] = None # current priority level being resolved while pending: ev = pending.pop(0) if ev["is_empty"]: # exclude events without active users continue if ev["start"] >= datetime_end or ev["end"] <= datetime_start: # avoid including split events which now are outside the requested time range continue # api/terraform shifts could be missing a priority; assume None means 0 priority = ev["priority_level"] or 0 if priority != current_priority or current_type != ev["calendar_type"]: # update scheduled intervals on priority change # and start from the beginning for the new priority level # also for calendar event type (overrides first, then apply regular shifts) resolved.sort(key=event_start_cmp_key) intervals = _merge_intervals(resolved) current_interval_idx = 0 current_priority = priority current_type = ev["calendar_type"] if current_interval_idx >= len(intervals): # event outside scheduled intervals, add to resolved # only if still starts before datetime_end if ev["start"] < datetime_end: resolved.append(ev) elif ev["start"] < intervals[current_interval_idx][0] and ev["end"] <= intervals[current_interval_idx][0]: # event starts and ends outside an already scheduled interval, add to resolved resolved.append(ev) elif ev["start"] < intervals[current_interval_idx][0] and ev["end"] > intervals[current_interval_idx][0]: # event starts outside interval but overlaps with an already scheduled interval # 1. add a split event copy to schedule the time before the already scheduled interval to_add = ev.copy() to_add["end"] = intervals[current_interval_idx][0] if to_add["end"] >= datetime_start: # only include if updated event ends inside the requested time range resolved.append(to_add) # 2. check if there is still time to be scheduled after the current scheduled interval ends if ev["end"] > intervals[current_interval_idx][1]: # event ends after current interval, update event start timestamp to match the interval end # and process the updated event as any other event ev["start"] = intervals[current_interval_idx][1] if ev["start"] < datetime_end: # only include event if it is still inside the requested time range # reorder pending events after updating current event start date # (ie. insert the event where it should be to keep the order criteria) # TODO: switch to bisect insert on python 3.10 (or consider heapq) insort_event(pending, ev) # done, go to next event elif ev["start"] >= intervals[current_interval_idx][0] and ev["end"] <= intervals[current_interval_idx][1]: # event inside an already scheduled interval, ignore (go to next) continue elif ( ev["start"] >= intervals[current_interval_idx][0] and ev["start"] < intervals[current_interval_idx][1] and ev["end"] > intervals[current_interval_idx][1] ): # event starts inside a scheduled interval but ends out of it # update the event start timestamp to match the interval end ev["start"] = intervals[current_interval_idx][1] # unresolved, re-add to pending # TODO: switch to bisect insert on python 3.10 (or consider heapq) insort_event(pending, ev) elif ev["start"] >= intervals[current_interval_idx][1]: # event starts after the current interval, move to next interval and go through it current_interval_idx += 1 # unresolved, re-add to pending # TODO: switch to bisect insert on python 3.10 (or consider heapq) insort_event(pending, ev) resolved.sort(key=lambda e: (event_start_cmp_key(e), e["shift"]["pk"] or "")) return resolved def _merge_events(self, events: ScheduleEvents) -> ScheduleEvents: """Merge user groups same-shift events.""" if events: merged = [events[0]] current = merged[0] for next_event in events[1:]: if ( current["start"] == next_event["start"] and current["shift"]["pk"] is not None and current["shift"]["pk"] == next_event["shift"]["pk"] ): current["users"] += [u for u in next_event["users"] if u not in current["users"]] current["missing_users"] += [ u for u in next_event["missing_users"] if u not in current["missing_users"] ] else: merged.append(next_event) current = next_event events = merged return events def _generate_ical_file_from_shifts(self, qs, extra_shifts=None, allow_empty_users=False): """Generate iCal events file from custom on-call shifts.""" # default to empty string since it is not possible to have a no-events ical file ical = "" if qs.exists() or extra_shifts is not None: if extra_shifts is None: extra_shifts = [] end_line = "END:VCALENDAR" calendar = icalendar.Calendar() calendar.add("prodid", "-//web schedule//oncall//") calendar.add("version", "2.0") calendar.add("method", "PUBLISH") ical_file = calendar.to_ical().decode() ical = ical_file.replace(end_line, "").strip() ical = f"{ical}\r\n" for event in itertools.chain(qs.all(), extra_shifts): ical += event.convert_to_ical(allow_empty_users=allow_empty_users) ical += f"{end_line}\r\n" return ical def preview_shift(self, custom_shift, datetime_start, datetime_end, updated_shift_pk=None): """Return unsaved rotation and final schedule preview events.""" if custom_shift.type == CustomOnCallShift.TYPE_OVERRIDE: qs = self.custom_shifts.filter(type=CustomOnCallShift.TYPE_OVERRIDE) ical_attr = "cached_ical_file_overrides" ical_property = "_ical_file_overrides" elif custom_shift.type == CustomOnCallShift.TYPE_ROLLING_USERS_EVENT: qs = self.custom_shifts.exclude(type=CustomOnCallShift.TYPE_OVERRIDE) ical_attr = "cached_ical_file_primary" ical_property = "_ical_file_primary" else: raise ValueError("Invalid shift type") def _invalidate_cache(schedule, prop_name): """Invalidate cached property cache""" try: delattr(schedule, prop_name) except AttributeError: pass extra_shifts = [custom_shift] if updated_shift_pk is not None: # only reuse PK for preview when updating a rotation that won't be started after the update custom_shift.public_primary_key = updated_shift_pk qs = qs.exclude(public_primary_key=updated_shift_pk) ical_file = self._generate_ical_file_from_shifts(qs, extra_shifts=extra_shifts, allow_empty_users=True) original_value = getattr(self, ical_attr) _invalidate_cache(self, ical_property) setattr(self, ical_attr, ical_file) # filter events using a temporal overriden calendar including the not-yet-saved shift events = self.filter_events(datetime_start, datetime_end, with_empty=True, with_gap=True) # return preview events for affected shifts updated_shift_pks = {s.public_primary_key for s in extra_shifts} shift_events = [e.copy() for e in events if e["shift"]["pk"] in updated_shift_pks] final_events = self._resolve_schedule(events, datetime_start, datetime_end) _invalidate_cache(self, ical_property) setattr(self, ical_attr, original_value) return shift_events, final_events # Insight logs @property def insight_logs_verbal(self): return self.name @property def insight_logs_serialized(self): result = { "name": self.name, } if self.team: result["team"] = self.team.name result["team_id"] = self.team.public_primary_key else: result["team"] = "General" if self.organization.slack_team_identity: if self.slack_channel is not None: result["slack_channel"] = self.slack_channel.name if self.user_group is not None: result["user_group"] = self.user_group.handle result["notification_frequency"] = self.get_notify_oncall_shift_freq_display() result["current_shift_notification"] = self.mention_oncall_start result["next_shift_notification"] = self.mention_oncall_next result["notify_empty_oncall"] = self.get_notify_empty_oncall_display() return result @property def insight_logs_metadata(self): result = {} if self.team: result["team"] = self.team.name result["team_id"] = self.team.public_primary_key else: result["team"] = "General" return result class OnCallScheduleICal(OnCallSchedule): escalation_policies: "RelatedManager['EscalationPolicy']" objects: models.Manager["OnCallScheduleICal"] schedule_export_token: "RelatedManager['ScheduleExportAuthToken']" # For the ical schedule both primary and overrides icals are imported via ical url ical_url_primary = models.CharField(max_length=500, null=True, default=None) ical_file_error_primary = models.CharField(max_length=200, null=True, default=None) ical_url_overrides = models.CharField(max_length=500, null=True, default=None) ical_file_error_overrides = models.CharField(max_length=200, null=True, default=None) @cached_property def _ical_file_primary(self): """ Download iCal file imported from calendar """ cached_ical_file = self.cached_ical_file_primary if self.ical_url_primary is not None and self.cached_ical_file_primary is None: self.cached_ical_file_primary, self.ical_file_error_primary = fetch_ical_file_or_get_error( self.ical_url_primary ) self.save(update_fields=["cached_ical_file_primary", "ical_file_error_primary"]) cached_ical_file = self.cached_ical_file_primary return cached_ical_file @cached_property def _ical_file_overrides(self): """ Download iCal file imported from calendar """ cached_ical_file = self.cached_ical_file_overrides if self.ical_url_overrides is not None and self.cached_ical_file_overrides is None: self.cached_ical_file_overrides, self.ical_file_error_overrides = fetch_ical_file_or_get_error( self.ical_url_overrides ) self.save(update_fields=["cached_ical_file_overrides", "ical_file_error_overrides"]) cached_ical_file = self.cached_ical_file_overrides return cached_ical_file def _refresh_primary_ical_file(self): self.prev_ical_file_primary = self.cached_ical_file_primary if self.ical_url_primary is not None: self.cached_ical_file_primary, self.ical_file_error_primary = fetch_ical_file_or_get_error( self.ical_url_primary, ) self.save(update_fields=["cached_ical_file_primary", "prev_ical_file_primary", "ical_file_error_primary"]) def _refresh_overrides_ical_file(self): self.prev_ical_file_overrides = self.cached_ical_file_overrides if self.ical_url_overrides is not None: self.cached_ical_file_overrides, self.ical_file_error_overrides = fetch_ical_file_or_get_error( self.ical_url_overrides, ) self.save(update_fields=["cached_ical_file_overrides", "prev_ical_file_overrides", "ical_file_error_overrides"]) # Insight logs @property def insight_logs_serialized(self): res = super().insight_logs_serialized res["primary_calendar_url"] = self.ical_url_primary res["overrides_calendar_url"] = self.ical_url_overrides return res @property def insight_logs_type_verbal(self): return "ical_schedule" class OnCallScheduleCalendar(OnCallSchedule): escalation_policies: "RelatedManager['EscalationPolicy']" objects: models.Manager["OnCallScheduleCalendar"] schedule_export_token: "RelatedManager['ScheduleExportAuthToken']" # For the calendar schedule only overrides ical is imported via ical url. ical_url_overrides = models.CharField(max_length=500, null=True, default=None) ical_file_error_overrides = models.CharField(max_length=200, null=True, default=None) # Primary ical is generated from custom_on_call_shifts. time_zone = models.CharField(max_length=100, default="UTC") custom_on_call_shifts = models.ManyToManyField("schedules.CustomOnCallShift", related_name="schedules") enable_web_overrides = models.BooleanField(default=False, null=True) @cached_property def _ical_file_primary(self): """ Return cached ical file with iCal events from custom on-call shifts """ if self.cached_ical_file_primary is None: self.cached_ical_file_primary = self._generate_ical_file_primary() self.save(update_fields=["cached_ical_file_primary"]) return self.cached_ical_file_primary @cached_property def _ical_file_overrides(self): """ Download iCal file imported from calendar """ if self.cached_ical_file_overrides is not None: return self.cached_ical_file_overrides self._refresh_overrides_ical_file() return self.cached_ical_file_overrides def _refresh_primary_ical_file(self): self.prev_ical_file_primary = self.cached_ical_file_primary self.cached_ical_file_primary = self._generate_ical_file_primary() self.save( update_fields=[ "cached_ical_file_primary", "prev_ical_file_primary", ] ) def _refresh_overrides_ical_file(self): self.prev_ical_file_overrides = self.cached_ical_file_overrides if self.enable_web_overrides: # web overrides qs = self.custom_shifts.filter(type=CustomOnCallShift.TYPE_OVERRIDE) self.cached_ical_file_overrides = self._generate_ical_file_from_shifts(qs) elif self.ical_url_overrides is not None: self.cached_ical_file_overrides, self.ical_file_error_overrides = fetch_ical_file_or_get_error( self.ical_url_overrides, ) self.save(update_fields=["cached_ical_file_overrides", "prev_ical_file_overrides", "ical_file_error_overrides"]) def _generate_ical_file_primary(self): """ Generate iCal events file from custom on-call shifts (created via API) """ # default to empty string since it is not possible to have a no-events ical file ical = "" if self.custom_on_call_shifts.exists(): end_line = "END:VCALENDAR" calendar = icalendar.Calendar() calendar.add("prodid", "-//My calendar product//amixr//") calendar.add("version", "2.0") calendar.add("method", "PUBLISH") ical_file = calendar.to_ical().decode() ical = ical_file.replace(end_line, "").strip() ical = f"{ical}\r\n" for event in self.custom_on_call_shifts.all(): ical += event.convert_to_ical(self.time_zone) ical += f"{end_line}\r\n" return ical def preview_shift(self, custom_shift, datetime_start, datetime_end, updated_shift_pk=None): """Return unsaved rotation and final schedule preview events.""" if custom_shift.type != CustomOnCallShift.TYPE_OVERRIDE: raise ValueError("Invalid shift type") return super().preview_shift(custom_shift, datetime_start, datetime_end, updated_shift_pk=updated_shift_pk) @property def insight_logs_type_verbal(self): return "calendar_schedule" @property def insight_logs_serialized(self): res = super().insight_logs_serialized res["overrides_calendar_url"] = self.ical_url_overrides return res class OnCallScheduleWeb(OnCallSchedule): escalation_policies: "RelatedManager['EscalationPolicy']" objects: models.Manager["OnCallScheduleWeb"] schedule_export_token: "RelatedManager['ScheduleExportAuthToken']" time_zone = models.CharField(max_length=100, default="UTC") def _generate_ical_file_primary(self): qs = self.custom_shifts.exclude(type=CustomOnCallShift.TYPE_OVERRIDE) return self._generate_ical_file_from_shifts(qs) def _generate_ical_file_overrides(self): qs = self.custom_shifts.filter(type=CustomOnCallShift.TYPE_OVERRIDE) return self._generate_ical_file_from_shifts(qs) @cached_property def _ical_file_primary(self): """Return cached ical file with iCal events from custom on-call shifts.""" if self.cached_ical_file_primary is None: self.cached_ical_file_primary = self._generate_ical_file_primary() try: self.save(update_fields=["cached_ical_file_primary"]) except DatabaseError: # schedule may have been deleted from db return return self.cached_ical_file_primary def _refresh_primary_ical_file(self): self.prev_ical_file_primary = self.cached_ical_file_primary self.cached_ical_file_primary = self._generate_ical_file_primary() self.save(update_fields=["cached_ical_file_primary", "prev_ical_file_primary"]) @cached_property def _ical_file_overrides(self): """Return cached ical file with iCal events from custom on-call overrides shifts.""" if self.cached_ical_file_overrides is None: self.cached_ical_file_overrides = self._generate_ical_file_overrides() try: self.save(update_fields=["cached_ical_file_overrides"]) except DatabaseError: # schedule may have been deleted from db return return self.cached_ical_file_overrides def _refresh_overrides_ical_file(self): self.prev_ical_file_overrides = self.cached_ical_file_overrides self.cached_ical_file_overrides = self._generate_ical_file_overrides() self.save(update_fields=["cached_ical_file_overrides", "prev_ical_file_overrides"]) # Insight logs @property def insight_logs_type_verbal(self): return "web_schedule" @property def insight_logs_serialized(self): res = super().insight_logs_serialized res["time_zone"] = self.time_zone return res