From 4a35d2522adea60cebca92392120867fe41b5da0 Mon Sep 17 00:00:00 2001 From: Matias Bordese Date: Thu, 4 Aug 2022 17:00:09 -0300 Subject: [PATCH] Move schedule helpers (filter events, final) to model --- engine/apps/api/views/schedule.py | 156 +-------- engine/apps/schedules/ical_utils.py | 11 +- .../apps/schedules/models/on_call_schedule.py | 145 ++++++++ .../schedules/tests/test_on_call_schedule.py | 322 ++++++++++++++++++ 4 files changed, 486 insertions(+), 148 deletions(-) create mode 100644 engine/apps/schedules/tests/test_on_call_schedule.py diff --git a/engine/apps/api/views/schedule.py b/engine/apps/api/views/schedule.py index 25c35f10..4af90abf 100644 --- a/engine/apps/api/views/schedule.py +++ b/engine/apps/api/views/schedule.py @@ -24,7 +24,6 @@ from apps.api.serializers.schedule_polymorphic import ( from apps.auth_token.auth import PluginAuthentication from apps.auth_token.constants import SCHEDULE_EXPORT_TOKEN_NAME from apps.auth_token.models import ScheduleExportAuthToken -from apps.schedules.ical_utils import list_of_oncall_shifts_from_ical from apps.schedules.models import OnCallSchedule from apps.slack.models import SlackChannel from apps.slack.tasks import update_slack_user_group_for_schedules @@ -195,43 +194,6 @@ class ScheduleView( return user_tz, date - def _filter_events(self, schedule, user_timezone, starting_date, days, with_empty, with_gap): - shifts = ( - list_of_oncall_shifts_from_ical(schedule, starting_date, user_timezone, with_empty, with_gap, days=days) - or [] - ) - events = [] - # for start, end, users, priority_level, source in shifts: - for shift in shifts: - all_day = type(shift["start"]) == datetime.date - is_gap = shift.get("is_gap", False) - shift_json = { - "all_day": all_day, - "start": shift["start"], - # fix confusing end date for all-day event - "end": shift["end"] - timezone.timedelta(days=1) if all_day else shift["end"], - "users": [ - { - "display_name": user.username, - "pk": user.public_primary_key, - } - for user in shift["users"] - ], - "missing_users": shift["missing_users"], - "priority_level": shift["priority"] if shift["priority"] != 0 else None, - "source": shift["source"], - "calendar_type": shift["calendar_type"], - "is_empty": len(shift["users"]) == 0 and not is_gap, - "is_gap": is_gap, - "is_override": shift["calendar_type"] == OnCallSchedule.TYPE_ICAL_OVERRIDES, - "shift": { - "pk": shift["shift_pk"], - }, - } - events.append(shift_json) - - return events - @action(detail=True, methods=["get"]) def events(self, request, pk): user_tz, date = self.get_request_timezone() @@ -239,7 +201,7 @@ class ScheduleView( with_gap = self.request.query_params.get("with_gap", False) == "true" schedule = self.original_get_object() - events = self._filter_events(schedule, user_tz, date, days=1, with_empty=with_empty, with_gap=with_gap) + events = schedule.filter_events(user_tz, date, days=1, with_empty=with_empty, with_gap=with_gap) slack_channel = ( { @@ -281,16 +243,14 @@ class ScheduleView( raise BadRequest(detail="Invalid days format") schedule = self.original_get_object() - events = self._filter_events( - schedule, user_tz, starting_date, days=days, with_empty=True, with_gap=resolve_schedule - ) - if filter_by == EVENTS_FILTER_BY_OVERRIDE: - events = [e for e in events if e["calendar_type"] == OnCallSchedule.OVERRIDES] - elif filter_by == EVENTS_FILTER_BY_ROTATION: - events = [e for e in events if e["calendar_type"] == OnCallSchedule.PRIMARY] - else: # resolve_schedule - events = self._resolve_schedule(events) + if filter_by is not None: + filter_by = OnCallSchedule.PRIMARY if filter_by == EVENTS_FILTER_BY_ROTATION else OnCallSchedule.OVERRIDES + events = schedule.filter_events( + user_tz, starting_date, days=days, with_empty=True, with_gap=resolve_schedule, filter_by=filter_by + ) + else: # return final schedule + events = schedule.final_events(user_tz, starting_date, days) result = { "id": schedule.public_primary_key, @@ -300,103 +260,6 @@ class ScheduleView( } return Response(result, status=status.HTTP_200_OK) - def _resolve_schedule(self, events): - """Calculate final schedule shifts considering rotations and overrides.""" - if not events: - return [] - - # sort schedule events by (type desc, priority desc, start timestamp asc) - events.sort( - key=lambda e: ( - -e["calendar_type"] if e["calendar_type"] else 0, # overrides: 1, shifts: 0, gaps: None - -e["priority_level"] if e["priority_level"] else 0, - e["start"], - ) - ) - - def _merge_intervals(evs): - """Keep track of scheduled intervals.""" - if not evs: - return [] - intervals = [[e["start"], e["end"]] for e in evs] - result = [intervals[0]] - for interval in intervals[1:]: - previous_interval = result[-1] - if previous_interval[0] <= interval[0] <= previous_interval[1]: - previous_interval[1] = max(previous_interval[1], interval[1]) - else: - result.append(interval) - return result - - # iterate over events, reserving schedule slots based on their priority - # if the expected slot was already scheduled for a higher priority event, - # split the event, or fix start/end timestamps accordingly - - # include overrides from start - resolved = [e for e in events if e["calendar_type"] == OnCallSchedule.TYPE_ICAL_OVERRIDES] - intervals = _merge_intervals(resolved) - - pending = events[len(resolved) :] - if not pending: - return resolved - - current_event_idx = 0 # current event to resolve - current_interval_idx = 0 # current scheduled interval being checked - current_priority = pending[0]["priority_level"] # current priority level being resolved - - while current_event_idx < len(pending): - ev = pending[current_event_idx] - - if ev["priority_level"] != current_priority: - # update scheduled intervals on priority change - # and start from the beginning for the new priority level - resolved.sort(key=lambda e: e["start"]) - intervals = _merge_intervals(resolved) - current_interval_idx = 0 - current_priority = ev["priority_level"] - - if current_interval_idx >= len(intervals): - # event outside scheduled intervals, add to resolved - resolved.append(ev) - current_event_idx += 1 - elif ev["start"] < intervals[current_interval_idx][0] and ev["end"] <= intervals[current_interval_idx][0]: - # event starts and ends outside an already scheduled interval, add to resolved - resolved.append(ev) - current_event_idx += 1 - elif ev["start"] < intervals[current_interval_idx][0] and ev["end"] > intervals[current_interval_idx][0]: - # event starts outside interval but overlaps with an already scheduled interval - # 1. add a split event copy to schedule the time before the already scheduled interval - to_add = ev.copy() - to_add["end"] = intervals[current_interval_idx][0] - resolved.append(to_add) - # 2. check if there is still time to be scheduled after the current scheduled interval ends - if ev["end"] > intervals[current_interval_idx][1]: - # event ends after current interval, update event start timestamp to match the interval end - # and process the updated event as any other event - ev["start"] = intervals[current_interval_idx][1] - else: - # done, go to next event - current_event_idx += 1 - elif ev["start"] >= intervals[current_interval_idx][0] and ev["end"] <= intervals[current_interval_idx][1]: - # event inside an already scheduled interval, ignore (go to next) - current_event_idx += 1 - elif ( - ev["start"] >= intervals[current_interval_idx][0] - and ev["start"] < intervals[current_interval_idx][1] - and ev["end"] > intervals[current_interval_idx][1] - ): - # event starts inside a scheduled interval but ends out of it - # update the event start timestamp to match the interval end - ev["start"] = intervals[current_interval_idx][1] - # move to next interval and process the updated event as any other event - current_interval_idx += 1 - elif ev["start"] >= intervals[current_interval_idx][1]: - # event starts after the current interval, move to next interval and go through it - current_interval_idx += 1 - - resolved.sort(key=lambda e: e["start"]) - return resolved - @action(detail=True, methods=["get"]) def next_shifts_per_user(self, request, pk): """Return next shift for users in schedule.""" @@ -404,8 +267,7 @@ class ScheduleView( now = timezone.now() starting_date = now.date() schedule = self.original_get_object() - shift_events = self._filter_events(schedule, user_tz, starting_date, days=30, with_empty=False, with_gap=False) - events = self._resolve_schedule(shift_events) + events = schedule.final_events(user_tz, starting_date, days=30) users = {} for e in events: diff --git a/engine/apps/schedules/ical_utils.py b/engine/apps/schedules/ical_utils.py index a22ddee4..93092cc3 100644 --- a/engine/apps/schedules/ical_utils.py +++ b/engine/apps/schedules/ical_utils.py @@ -83,7 +83,13 @@ logger.setLevel(logging.DEBUG) # used for display schedule events on web def list_of_oncall_shifts_from_ical( - schedule, date, user_timezone="UTC", with_empty_shifts=False, with_gaps=False, days=1 + schedule, + date, + user_timezone="UTC", + with_empty_shifts=False, + with_gaps=False, + days=1, + filter_by=None, ): """ Parse the ical file and return list of events with users @@ -122,6 +128,9 @@ def list_of_oncall_shifts_from_ical( else: calendar_type = OnCallSchedule.OVERRIDES + if filter_by is not None and filter_by != calendar_type: + continue + tmp_result_datetime, tmp_result_date = get_shifts_dict( calendar, calendar_type, schedule, datetime_start, datetime_end, date, with_empty_shifts ) diff --git a/engine/apps/schedules/models/on_call_schedule.py b/engine/apps/schedules/models/on_call_schedule.py index 05493789..1589757b 100644 --- a/engine/apps/schedules/models/on_call_schedule.py +++ b/engine/apps/schedules/models/on_call_schedule.py @@ -1,3 +1,5 @@ +import datetime + import icalendar from django.apps import apps from django.conf import settings @@ -14,6 +16,7 @@ from apps.schedules.ical_utils import ( fetch_ical_file_or_get_error, list_of_empty_shifts_in_schedule, list_of_gaps_in_schedule, + list_of_oncall_shifts_from_ical, list_users_to_notify_from_ical, ) from apps.schedules.models import CustomOnCallShift @@ -222,6 +225,148 @@ class OnCallSchedule(PolymorphicModel): self.cached_ical_file_overrides = None self.save(update_fields=["cached_ical_file_overrides", "prev_ical_file_overrides"]) + def filter_events(self, user_timezone, starting_date, days, with_empty=False, with_gap=False, filter_by=None): + """Return filtered events from schedule.""" + shifts = ( + list_of_oncall_shifts_from_ical( + self, starting_date, user_timezone, with_empty, with_gap, days=days, filter_by=filter_by + ) + or [] + ) + events = [] + for shift in shifts: + all_day = type(shift["start"]) == datetime.date + is_gap = shift.get("is_gap", False) + shift_json = { + "all_day": all_day, + "start": shift["start"], + # fix confusing end date for all-day event + "end": shift["end"] - timezone.timedelta(days=1) if all_day else shift["end"], + "users": [ + { + "display_name": user.username, + "pk": user.public_primary_key, + } + for user in shift["users"] + ], + "missing_users": shift["missing_users"], + "priority_level": shift["priority"] if shift["priority"] != 0 else None, + "source": shift["source"], + "calendar_type": shift["calendar_type"], + "is_empty": len(shift["users"]) == 0 and not is_gap, + "is_gap": is_gap, + "is_override": shift["calendar_type"] == OnCallSchedule.TYPE_ICAL_OVERRIDES, + "shift": { + "pk": shift["shift_pk"], + }, + } + events.append(shift_json) + + return events + + def final_events(self, user_tz, starting_date, days): + """Return schedule final events, after resolving shifts and overrides.""" + events = self.filter_events(user_tz, starting_date, days=days, with_empty=True, with_gap=True) + events = self._resolve_schedule(events) + return events + + def _resolve_schedule(self, events): + """Calculate final schedule shifts considering rotations and overrides.""" + if not events: + return [] + + # sort schedule events by (type desc, priority desc, start timestamp asc) + events.sort( + key=lambda e: ( + -e["calendar_type"] if e["calendar_type"] else 0, # overrides: 1, shifts: 0, gaps: None + -e["priority_level"] if e["priority_level"] else 0, + e["start"], + ) + ) + + def _merge_intervals(evs): + """Keep track of scheduled intervals.""" + if not evs: + return [] + intervals = [[e["start"], e["end"]] for e in evs] + result = [intervals[0]] + for interval in intervals[1:]: + previous_interval = result[-1] + if previous_interval[0] <= interval[0] <= previous_interval[1]: + previous_interval[1] = max(previous_interval[1], interval[1]) + else: + result.append(interval) + return result + + # iterate over events, reserving schedule slots based on their priority + # if the expected slot was already scheduled for a higher priority event, + # split the event, or fix start/end timestamps accordingly + + # include overrides from start + resolved = [e for e in events if e["calendar_type"] == OnCallSchedule.TYPE_ICAL_OVERRIDES] + intervals = _merge_intervals(resolved) + + pending = events[len(resolved) :] + if not pending: + return resolved + + current_event_idx = 0 # current event to resolve + current_interval_idx = 0 # current scheduled interval being checked + current_priority = pending[0]["priority_level"] # current priority level being resolved + + while current_event_idx < len(pending): + ev = pending[current_event_idx] + + if ev["priority_level"] != current_priority: + # update scheduled intervals on priority change + # and start from the beginning for the new priority level + resolved.sort(key=lambda e: e["start"]) + intervals = _merge_intervals(resolved) + current_interval_idx = 0 + current_priority = ev["priority_level"] + + if current_interval_idx >= len(intervals): + # event outside scheduled intervals, add to resolved + resolved.append(ev) + current_event_idx += 1 + elif ev["start"] < intervals[current_interval_idx][0] and ev["end"] <= intervals[current_interval_idx][0]: + # event starts and ends outside an already scheduled interval, add to resolved + resolved.append(ev) + current_event_idx += 1 + elif ev["start"] < intervals[current_interval_idx][0] and ev["end"] > intervals[current_interval_idx][0]: + # event starts outside interval but overlaps with an already scheduled interval + # 1. add a split event copy to schedule the time before the already scheduled interval + to_add = ev.copy() + to_add["end"] = intervals[current_interval_idx][0] + resolved.append(to_add) + # 2. check if there is still time to be scheduled after the current scheduled interval ends + if ev["end"] > intervals[current_interval_idx][1]: + # event ends after current interval, update event start timestamp to match the interval end + # and process the updated event as any other event + ev["start"] = intervals[current_interval_idx][1] + else: + # done, go to next event + current_event_idx += 1 + elif ev["start"] >= intervals[current_interval_idx][0] and ev["end"] <= intervals[current_interval_idx][1]: + # event inside an already scheduled interval, ignore (go to next) + current_event_idx += 1 + elif ( + ev["start"] >= intervals[current_interval_idx][0] + and ev["start"] < intervals[current_interval_idx][1] + and ev["end"] > intervals[current_interval_idx][1] + ): + # event starts inside a scheduled interval but ends out of it + # update the event start timestamp to match the interval end + ev["start"] = intervals[current_interval_idx][1] + # move to next interval and process the updated event as any other event + current_interval_idx += 1 + elif ev["start"] >= intervals[current_interval_idx][1]: + # event starts after the current interval, move to next interval and go through it + current_interval_idx += 1 + + resolved.sort(key=lambda e: e["start"]) + return resolved + class OnCallScheduleICal(OnCallSchedule): # For the ical schedule both primary and overrides icals are imported via ical url diff --git a/engine/apps/schedules/tests/test_on_call_schedule.py b/engine/apps/schedules/tests/test_on_call_schedule.py new file mode 100644 index 00000000..11f4be13 --- /dev/null +++ b/engine/apps/schedules/tests/test_on_call_schedule.py @@ -0,0 +1,322 @@ +import pytest +from django.utils import timezone + +from apps.schedules.models import CustomOnCallShift, OnCallSchedule, OnCallScheduleWeb +from common.constants.role import Role + + +@pytest.mark.django_db +def test_filter_events(make_organization, make_user_for_organization, make_schedule, make_on_call_shift): + organization = make_organization() + schedule = make_schedule( + organization, + schedule_class=OnCallScheduleWeb, + name="test_web_schedule", + ) + user = make_user_for_organization(organization) + viewer = make_user_for_organization(organization, role=Role.VIEWER) + now = timezone.now().replace(hour=0, minute=0, second=0, microsecond=0) + start_date = now - timezone.timedelta(days=7) + + data = { + "start": start_date + timezone.timedelta(days=1, hours=10), + "rotation_start": start_date + timezone.timedelta(days=1, hours=10), + "duration": timezone.timedelta(hours=4), + "priority_level": 1, + "frequency": CustomOnCallShift.FREQUENCY_DAILY, + "schedule": schedule, + } + on_call_shift = make_on_call_shift( + organization=organization, shift_type=CustomOnCallShift.TYPE_ROLLING_USERS_EVENT, **data + ) + on_call_shift.add_rolling_users([[user]]) + + # add empty shift + data = { + "start": start_date + timezone.timedelta(days=1, hours=20), + "rotation_start": start_date + timezone.timedelta(days=1, hours=20), + "duration": timezone.timedelta(hours=2), + "priority_level": 1, + "frequency": CustomOnCallShift.FREQUENCY_WEEKLY, + "schedule": schedule, + } + empty_shift = make_on_call_shift( + organization=organization, shift_type=CustomOnCallShift.TYPE_ROLLING_USERS_EVENT, **data + ) + empty_shift.add_rolling_users([[viewer]]) + + # override: 22-23 + override_data = { + "start": start_date + timezone.timedelta(hours=22), + "rotation_start": start_date + timezone.timedelta(hours=22), + "duration": timezone.timedelta(hours=1), + "schedule": schedule, + } + override = make_on_call_shift( + organization=organization, shift_type=CustomOnCallShift.TYPE_OVERRIDE, **override_data + ) + override.add_rolling_users([[user]]) + + # filter primary non-empty shifts only + events = schedule.filter_events("UTC", start_date, days=3, filter_by=OnCallSchedule.TYPE_ICAL_PRIMARY) + expected = [ + { + "calendar_type": OnCallSchedule.TYPE_ICAL_PRIMARY, + "start": on_call_shift.start + timezone.timedelta(days=i), + "end": on_call_shift.start + timezone.timedelta(days=i) + on_call_shift.duration, + "all_day": False, + "is_override": False, + "is_empty": False, + "is_gap": False, + "priority_level": on_call_shift.priority_level, + "missing_users": [], + "users": [{"display_name": user.username, "pk": user.public_primary_key}], + "shift": {"pk": on_call_shift.public_primary_key}, + "source": "api", + } + for i in range(2) + ] + assert events == expected + + # filter overrides only + events = schedule.filter_events("UTC", start_date, days=3, filter_by=OnCallSchedule.TYPE_ICAL_OVERRIDES) + expected_override = [ + { + "calendar_type": OnCallSchedule.TYPE_ICAL_OVERRIDES, + "start": override.start, + "end": override.start + override.duration, + "all_day": False, + "is_override": True, + "is_empty": False, + "is_gap": False, + "priority_level": None, + "missing_users": [], + "users": [{"display_name": user.username, "pk": user.public_primary_key}], + "shift": {"pk": override.public_primary_key}, + "source": "api", + } + ] + assert events == expected_override + + # no type filter + events = schedule.filter_events("UTC", start_date, days=3) + assert events == expected_override + expected + + +@pytest.mark.django_db +def test_filter_events_include_gaps(make_organization, make_user_for_organization, make_schedule, make_on_call_shift): + organization = make_organization() + schedule = make_schedule( + organization, + schedule_class=OnCallScheduleWeb, + name="test_web_schedule", + ) + user = make_user_for_organization(organization) + now = timezone.now().replace(hour=0, minute=0, second=0, microsecond=0) + start_date = now - timezone.timedelta(days=7) + + data = { + "start": start_date + timezone.timedelta(hours=10), + "rotation_start": start_date + timezone.timedelta(days=1, hours=10), + "duration": timezone.timedelta(hours=8), + "priority_level": 1, + "frequency": CustomOnCallShift.FREQUENCY_DAILY, + "schedule": schedule, + } + on_call_shift = make_on_call_shift( + organization=organization, shift_type=CustomOnCallShift.TYPE_ROLLING_USERS_EVENT, **data + ) + on_call_shift.add_rolling_users([[user]]) + + events = schedule.filter_events( + "UTC", start_date, days=1, filter_by=OnCallSchedule.TYPE_ICAL_PRIMARY, with_gap=True + ) + expected = [ + { + "calendar_type": None, + "start": start_date + timezone.timedelta(milliseconds=1), + "end": on_call_shift.start, + "all_day": False, + "is_override": False, + "is_empty": False, + "is_gap": True, + "priority_level": None, + "missing_users": [], + "users": [], + "shift": {"pk": None}, + "source": None, + }, + { + "calendar_type": OnCallSchedule.TYPE_ICAL_PRIMARY, + "start": on_call_shift.start, + "end": on_call_shift.start + on_call_shift.duration, + "all_day": False, + "is_override": False, + "is_empty": False, + "is_gap": False, + "priority_level": on_call_shift.priority_level, + "missing_users": [], + "users": [{"display_name": user.username, "pk": user.public_primary_key}], + "shift": {"pk": on_call_shift.public_primary_key}, + "source": "api", + }, + { + "calendar_type": None, + "start": on_call_shift.start + on_call_shift.duration, + "end": on_call_shift.start + timezone.timedelta(hours=13, minutes=59, seconds=59, milliseconds=1), + "all_day": False, + "is_override": False, + "is_empty": False, + "is_gap": True, + "priority_level": None, + "missing_users": [], + "users": [], + "shift": {"pk": None}, + "source": None, + }, + ] + assert events == expected + + +@pytest.mark.django_db +def test_filter_events_include_empty(make_organization, make_user_for_organization, make_schedule, make_on_call_shift): + organization = make_organization() + schedule = make_schedule( + organization, + schedule_class=OnCallScheduleWeb, + name="test_web_schedule", + ) + user = make_user_for_organization(organization, role=Role.VIEWER) + now = timezone.now().replace(hour=0, minute=0, second=0, microsecond=0) + start_date = now - timezone.timedelta(days=7) + + data = { + "start": start_date + timezone.timedelta(hours=10), + "rotation_start": start_date + timezone.timedelta(days=1, hours=10), + "duration": timezone.timedelta(hours=8), + "priority_level": 1, + "frequency": CustomOnCallShift.FREQUENCY_DAILY, + "schedule": schedule, + } + on_call_shift = make_on_call_shift( + organization=organization, shift_type=CustomOnCallShift.TYPE_ROLLING_USERS_EVENT, **data + ) + on_call_shift.add_rolling_users([[user]]) + + events = schedule.filter_events( + "UTC", start_date, days=1, filter_by=OnCallSchedule.TYPE_ICAL_PRIMARY, with_empty=True + ) + expected = [ + { + "calendar_type": OnCallSchedule.TYPE_ICAL_PRIMARY, + "start": on_call_shift.start, + "end": on_call_shift.start + on_call_shift.duration, + "all_day": False, + "is_override": False, + "is_empty": True, + "is_gap": False, + "priority_level": on_call_shift.priority_level, + "missing_users": [user.username], + "users": [], + "shift": {"pk": on_call_shift.public_primary_key}, + "source": "api", + } + ] + assert events == expected + + +@pytest.mark.django_db +def test_final_schedule_events(make_organization, make_user_for_organization, make_on_call_shift, make_schedule): + organization = make_organization() + schedule = make_schedule( + organization, + schedule_class=OnCallScheduleWeb, + name="test_web_schedule", + ) + + now = timezone.now().replace(hour=0, minute=0, second=0, microsecond=0) + start_date = now - timezone.timedelta(days=7) + + user_a, user_b, user_c, user_d, user_e = (make_user_for_organization(organization, username=i) for i in "ABCDE") + + shifts = ( + # user, priority, start time (h), duration (hs) + (user_a, 1, 10, 5), # r1-1: 10-15 / A + (user_b, 1, 11, 2), # r1-2: 11-13 / B + (user_a, 1, 16, 3), # r1-3: 16-19 / A + (user_a, 1, 21, 1), # r1-4: 21-22 / A + (user_b, 1, 22, 2), # r1-5: 22-00 / B + (user_c, 2, 12, 2), # r2-1: 12-14 / C + (user_d, 2, 14, 1), # r2-2: 14-15 / D + (user_d, 2, 17, 1), # r2-3: 17-18 / D + (user_d, 2, 20, 3), # r2-4: 20-23 / D + ) + for user, priority, start_h, duration in shifts: + data = { + "start": start_date + timezone.timedelta(hours=start_h), + "rotation_start": start_date + timezone.timedelta(hours=start_h), + "duration": timezone.timedelta(hours=duration), + "priority_level": priority, + "frequency": CustomOnCallShift.FREQUENCY_DAILY, + "schedule": schedule, + } + on_call_shift = make_on_call_shift( + organization=organization, shift_type=CustomOnCallShift.TYPE_RECURRENT_EVENT, **data + ) + on_call_shift.users.add(user) + + # override: 22-23 / E + override_data = { + "start": start_date + timezone.timedelta(hours=22), + "rotation_start": start_date + timezone.timedelta(hours=22), + "duration": timezone.timedelta(hours=1), + "schedule": schedule, + } + override = make_on_call_shift( + organization=organization, shift_type=CustomOnCallShift.TYPE_OVERRIDE, **override_data + ) + override.add_rolling_users([[user_e]]) + + returned_events = schedule.final_events("UTC", start_date, days=1) + + expected = ( + # start (h), duration (H), user, priority, is_gap, is_override + (0, 10, None, None, True, False), # 0-10 gap + (10, 2, "A", 1, False, False), # 10-12 A + (11, 1, "B", 1, False, False), # 11-12 B + (12, 2, "C", 2, False, False), # 12-14 C + (14, 1, "D", 2, False, False), # 14-15 D + (15, 1, None, None, True, False), # 15-16 gap + (16, 1, "A", 1, False, False), # 16-17 A + (17, 1, "D", 2, False, False), # 17-18 D + (18, 1, "A", 1, False, False), # 18-19 A + (19, 1, None, None, True, False), # 19-20 gap + (20, 2, "D", 2, False, False), # 20-22 D + (22, 1, "E", None, False, True), # 22-23 E (override) + (23, 1, "B", 1, False, False), # 23-00 B + ) + expected_events = [ + { + "calendar_type": 1 if is_override else None if is_gap else 0, + "end": start_date + timezone.timedelta(hours=start + duration), + "is_gap": is_gap, + "is_override": is_override, + "priority_level": priority, + "start": start_date + timezone.timedelta(hours=start, milliseconds=1 if start == 0 else 0), + "user": user, + } + for start, duration, user, priority, is_gap, is_override in expected + ] + returned_events = [ + { + "calendar_type": e["calendar_type"], + "end": e["end"], + "is_gap": e["is_gap"], + "is_override": e["is_override"], + "priority_level": e["priority_level"], + "start": e["start"], + "user": e["users"][0]["display_name"] if e["users"] else None, + } + for e in returned_events + ] + assert returned_events == expected_events