oncall-engine/engine/apps/schedules/models/on_call_schedule.py
Matias Bordese 4c92826c26
chore: update schedule checks notification period and improve wording (#5412)
Related to https://github.com/grafana/oncall-private/issues/2994

- Extend gaps/empty shift checks to consider 30 days (customizable via
param, eventually make it customizable per schedule?); ie. every week
(per beat schedule), check the schedule next 30 days
- Trigger checks via async task on schedule API updates (instead of a
sync call)
- Update notifications wording / link to schedule
2025-01-16 12:19:16 +00:00

1309 lines
54 KiB
Python

import copy
import datetime
import itertools
import re
import typing
from collections import defaultdict
from enum import Enum
import icalendar
import pytz
from django.conf import settings
from django.core.validators import MinLengthValidator
from django.db import models
from django.db.utils import DatabaseError
from django.utils import timezone
from django.utils.functional import cached_property
from polymorphic.managers import PolymorphicManager
from polymorphic.models import PolymorphicModel
from polymorphic.query import PolymorphicQuerySet
from apps.grafana_plugin.ui_url_builder import UIURLBuilder
from apps.schedules.constants import (
EXPORT_WINDOW_DAYS_AFTER,
EXPORT_WINDOW_DAYS_BEFORE,
ICAL_COMPONENT_VEVENT,
ICAL_DATETIME_END,
ICAL_DATETIME_STAMP,
ICAL_DATETIME_START,
ICAL_LAST_MODIFIED,
ICAL_PRIORITY,
ICAL_STATUS,
ICAL_STATUS_CANCELLED,
ICAL_SUMMARY,
ICAL_UID,
PREFETCHED_SHIFT_SWAPS,
SCHEDULE_CHECK_NEXT_DAYS,
)
from apps.schedules.ical_utils import (
EmptyShifts,
create_base_icalendar,
fetch_ical_file_or_get_error,
get_oncall_users_for_multiple_schedules,
list_of_empty_shifts_in_schedule,
list_of_oncall_shifts_from_ical,
)
from apps.schedules.models import CustomOnCallShift
from apps.user_management.models import User
from common.database import NON_POLYMORPHIC_CASCADE, NON_POLYMORPHIC_SET_NULL
from common.public_primary_keys import generate_public_primary_key, increase_public_primary_key_length
if typing.TYPE_CHECKING:
from django.db.models.manager import RelatedManager
from apps.alerts.models import EscalationPolicy
from apps.auth_token.models import ScheduleExportAuthToken
from apps.schedules.models import ShiftSwapRequest
from apps.slack.models import SlackChannel, SlackUserGroup
from apps.user_management.models import Organization, Team
RE_ICAL_SEARCH_USERNAME = r"SUMMARY:(\[L[0-9]+\] )?{}"
RE_ICAL_FETCH_USERNAME = re.compile(r"SUMMARY:(?:\[L[0-9]+\] )?([^\s]+)")
# Utility classes for schedule quality report
class QualityReportCommentType(str, Enum):
INFO = "info"
WARNING = "warning"
class QualityReportComment(typing.TypedDict):
type: QualityReportCommentType
text: str
class QualityReportOverloadedUser(typing.TypedDict):
id: str
username: str
score: int
QualityReportOverloadedUsers = typing.List[QualityReportOverloadedUser]
QualityReportComments = typing.List[QualityReportComment]
class QualityReport(typing.TypedDict):
total_score: int
comments: QualityReportComments
overloaded_users: QualityReportOverloadedUsers
class ScheduleEventUser(typing.TypedDict):
display_name: str
pk: str
email: str
avatar_full: str
class SwapRequest(typing.TypedDict):
pk: str
user: typing.Optional[ScheduleEventUser]
class MaybeSwappedScheduleEventUser(ScheduleEventUser):
swap_request: typing.Optional[SwapRequest]
class ScheduleEventShift(typing.TypedDict):
pk: str
class ScheduleEvent(typing.TypedDict):
all_day: bool
start: datetime.datetime
end: datetime.datetime
users: typing.List[MaybeSwappedScheduleEventUser]
missing_users: typing.List[str]
priority_level: typing.Optional[int]
source: typing.Optional[str]
calendar_type: typing.Optional[int]
is_empty: bool
is_gap: bool
is_override: bool
shift: ScheduleEventShift
class ScheduleFinalShift(typing.TypedDict):
user_pk: str
user_email: str
user_username: str
shift_start: str
shift_end: str
ScheduleEvents = typing.List[ScheduleEvent]
ScheduleEventIntervals = typing.List[typing.List[datetime.datetime]]
ScheduleFinalShifts = typing.List[ScheduleFinalShift]
DurationMap = typing.Dict[str, datetime.timedelta]
def generate_public_primary_key_for_oncall_schedule_channel():
prefix = "S"
new_public_primary_key = generate_public_primary_key(prefix)
failure_counter = 0
while OnCallSchedule.objects.filter(public_primary_key=new_public_primary_key).exists():
new_public_primary_key = increase_public_primary_key_length(
failure_counter=failure_counter, prefix=prefix, model_name="OnCallSchedule"
)
failure_counter += 1
return new_public_primary_key
class OnCallScheduleQuerySet(PolymorphicQuerySet):
def get_oncall_users(self, events_datetime=None):
return get_oncall_users_for_multiple_schedules(self.all(), events_datetime)
def related_to_user(self, user):
username_regex = RE_ICAL_SEARCH_USERNAME.format(user.username)
return self.filter(
cached_ical_final_schedule__regex=username_regex,
organization=user.organization,
)
class OnCallSchedule(PolymorphicModel):
custom_shifts: "RelatedManager['CustomOnCallShift']"
organization: "Organization"
shift_swap_requests: "RelatedManager['ShiftSwapRequest']"
slack_channel: typing.Optional["SlackChannel"]
team: typing.Optional["Team"]
user_group: typing.Optional["SlackUserGroup"]
objects: models.Manager["OnCallSchedule"] = PolymorphicManager.from_queryset(OnCallScheduleQuerySet)()
# type of calendars in schedule
TYPE_ICAL_PRIMARY, TYPE_ICAL_OVERRIDES, TYPE_CALENDAR = range(
3
) # todo: discuss do we need the third type (this types used for frontend)
PRIMARY, OVERRIDES = range(2)
CALENDAR_TYPE_VERBAL = {PRIMARY: "primary", OVERRIDES: "overrides"}
public_primary_key = models.CharField(
max_length=20,
validators=[MinLengthValidator(settings.PUBLIC_PRIMARY_KEY_MIN_LENGTH + 1)],
unique=True,
default=generate_public_primary_key_for_oncall_schedule_channel,
)
cached_ical_file_primary = models.TextField(null=True, default=None)
prev_ical_file_primary = models.TextField(null=True, default=None)
cached_ical_file_overrides = models.TextField(null=True, default=None)
prev_ical_file_overrides = models.TextField(null=True, default=None)
cached_ical_final_schedule = models.TextField(null=True, default=None)
organization = models.ForeignKey(
"user_management.Organization", on_delete=NON_POLYMORPHIC_CASCADE, related_name="oncall_schedules"
)
team = models.ForeignKey(
"user_management.Team",
on_delete=NON_POLYMORPHIC_SET_NULL,
related_name="oncall_schedules",
null=True,
default=None,
)
name = models.CharField(max_length=200)
slack_channel = models.ForeignKey(
"slack.SlackChannel",
null=True,
default=None,
on_delete=models.SET_NULL,
related_name="+",
)
# Slack user group to be updated when on-call users change for this schedule
user_group = models.ForeignKey(
to="slack.SlackUserGroup", null=True, on_delete=NON_POLYMORPHIC_SET_NULL, related_name="oncall_schedules"
)
# schedule reminder related fields
class NotifyOnCallShiftFreq(models.IntegerChoices):
NEVER = 0, "Never"
EACH_SHIFT = 1, "Each shift"
class NotifyEmptyOnCall(models.IntegerChoices):
ALL = 0, "Notify all people in the channel"
PREV = 1, "Mention person from the previous slot"
NO_ONE = 2, "Inform about empty slot"
current_shifts = models.TextField(null=False, default="{}")
# Used to not drop current_shifts to use them when "Mention person from the previous slot"
empty_oncall = models.BooleanField(default=True)
notify_oncall_shift_freq = models.IntegerField(
null=False,
choices=NotifyOnCallShiftFreq.choices,
default=NotifyOnCallShiftFreq.EACH_SHIFT,
)
mention_oncall_start = models.BooleanField(null=False, default=True)
mention_oncall_next = models.BooleanField(null=False, default=False)
notify_empty_oncall = models.IntegerField(
null=False, choices=NotifyEmptyOnCall.choices, default=NotifyEmptyOnCall.ALL
)
# Gaps-checker related fields
has_gaps = models.BooleanField(default=False)
gaps_report_sent_at = models.DateField(null=True, default=None)
# empty shifts checker related fields
has_empty_shifts = models.BooleanField(default=False)
empty_shifts_report_sent_at = models.DateField(null=True, default=None)
@property
def web_page_link(self) -> str:
return UIURLBuilder(self.organization).schedules()
@property
def web_detail_page_link(self) -> str:
return UIURLBuilder(self.organization).schedule_detail(self.public_primary_key)
@property
def slack_url(self) -> str:
return f"<{self.web_detail_page_link}|{self.name}>"
@property
def slack_channel_slack_id(self) -> typing.Optional[str]:
return self.slack_channel.slack_id if self.slack_channel else None
def get_icalendars(self) -> typing.Tuple[typing.Optional[icalendar.Calendar], typing.Optional[icalendar.Calendar]]:
"""Returns list of calendars. Primary calendar should always be the first"""
# if self._ical_file_(primary|overrides) is None -> no cache, will trigger a refresh
# if self._ical_file_(primary|overrides) == "" -> cached value for an empty schedule
if self._ical_file_primary:
calendar_primary: icalendar.Calendar = icalendar.Calendar.from_ical(self._ical_file_primary)
else:
calendar_primary = None
if self._ical_file_overrides:
calendar_overrides: icalendar.Calendar = icalendar.Calendar.from_ical(self._ical_file_overrides)
else:
calendar_overrides = None
return calendar_primary, calendar_overrides
def get_prev_and_current_ical_files(self):
"""Returns list of tuples with prev and current iCal files for each calendar"""
return [
(self.prev_ical_file_primary, self.cached_ical_file_primary),
(self.prev_ical_file_overrides, self.cached_ical_file_overrides),
]
def check_gaps_and_empty_shifts_for_next_days(self, days=SCHEDULE_CHECK_NEXT_DAYS) -> None:
datetime_start = timezone.now()
datetime_end = datetime_start + datetime.timedelta(days=days)
# get empty shifts from all events and gaps from final events
events = self.filter_events(
datetime_start,
datetime_end,
with_empty=True,
with_gap=True,
all_day_datetime=True,
)
has_empty_shifts = len([event for event in events if event["is_empty"]]) != 0
final_events = self._resolve_schedule(events, datetime_start, datetime_end)
has_gaps = len([final_event for final_event in final_events if final_event["is_gap"]]) != 0
if has_gaps != self.has_gaps or has_empty_shifts != self.has_empty_shifts:
self.has_gaps = has_gaps
self.has_empty_shifts = has_empty_shifts
self.save(update_fields=["has_gaps", "has_empty_shifts"])
def get_gaps_for_next_days(self, days=SCHEDULE_CHECK_NEXT_DAYS) -> ScheduleEvents:
today = timezone.now()
events = self.final_events(today, today + datetime.timedelta(days=days))
return [event for event in events if event["is_gap"]]
def get_empty_shifts_for_next_days(self, days=SCHEDULE_CHECK_NEXT_DAYS) -> EmptyShifts:
today = timezone.now().date()
return list_of_empty_shifts_in_schedule(self, today, today + datetime.timedelta(days=days))
def drop_cached_ical(self):
self._drop_primary_ical_file()
self._drop_overrides_ical_file()
def refresh_ical_file(self):
self._refresh_primary_ical_file()
self._refresh_overrides_ical_file()
@property
def _ical_file_primary(self):
raise NotImplementedError
@property
def _ical_file_overrides(self):
raise NotImplementedError
def _refresh_primary_ical_file(self):
raise NotImplementedError
def _refresh_overrides_ical_file(self):
raise NotImplementedError
def _drop_primary_ical_file(self):
self.prev_ical_file_primary = self.cached_ical_file_primary
self.cached_ical_file_primary = None
self.save(update_fields=["cached_ical_file_primary", "prev_ical_file_primary"])
def _drop_overrides_ical_file(self):
self.prev_ical_file_overrides = self.cached_ical_file_overrides
self.cached_ical_file_overrides = None
self.save(update_fields=["cached_ical_file_overrides", "prev_ical_file_overrides"])
def related_users(self):
"""Return users referenced in the schedule."""
usernames = []
if self.cached_ical_final_schedule:
usernames += RE_ICAL_FETCH_USERNAME.findall(self.cached_ical_final_schedule)
return self.organization.users.filter(username__in=usernames)
def filter_events(
self,
datetime_start: datetime.datetime,
datetime_end: datetime.datetime,
with_empty: bool = False,
with_gap: bool = False,
filter_by: str | None = None,
all_day_datetime: bool = False,
ignore_untaken_swaps: bool = False,
from_cached_final: bool = False,
include_shift_info: bool = False,
) -> ScheduleEvents:
"""Return filtered events from schedule."""
try:
shifts = (
list_of_oncall_shifts_from_ical(
self,
datetime_start,
datetime_end,
with_empty,
with_gap,
filter_by=filter_by,
from_cached_final=from_cached_final,
)
or []
)
except ValueError:
# raised when filtering events on a non-saved/deleted schedule
return []
shifts_data = {}
if include_shift_info:
pks = set(shift["shift_pk"] for shift in shifts)
shifts_from_db = CustomOnCallShift.objects.filter(
organization=self.organization, public_primary_key__in=pks
)
shifts_data = {s.public_primary_key: {"name": s.name, "type": s.type} for s in shifts_from_db}
events: ScheduleEvents = []
for shift in shifts:
start = shift["start"]
all_day = type(start) is datetime.date
# fix confusing end date for all-day event
end = shift["end"] - datetime.timedelta(days=1) if all_day else shift["end"]
if all_day and all_day_datetime:
start = datetime.datetime.combine(start, datetime.datetime.min.time(), tzinfo=pytz.UTC)
end = datetime.datetime.combine(end, datetime.datetime.max.time(), tzinfo=pytz.UTC).replace(
microsecond=0
)
is_gap = shift.get("is_gap", False)
shift_json: ScheduleEvent = {
"all_day": all_day,
"start": start,
"end": end,
"users": [
{
"display_name": user.username,
"email": user.email,
"pk": user.public_primary_key,
"avatar_full": user.avatar_full_url(self.organization),
}
for user in shift["users"]
],
"missing_users": shift["missing_users"],
"priority_level": shift["priority"] or 0,
"source": shift["source"],
"calendar_type": shift["calendar_type"],
"is_empty": len(shift["users"]) == 0 and not is_gap,
"is_gap": is_gap,
"is_override": shift["calendar_type"] == OnCallSchedule.TYPE_ICAL_OVERRIDES,
"shift": {
"pk": shift["shift_pk"],
},
}
if include_shift_info and not is_gap:
no_data = {
"name": None,
"type": CustomOnCallShift.TYPE_OVERRIDE
if shift["calendar_type"] == OnCallSchedule.TYPE_ICAL_OVERRIDES
else None,
}
shift_data = shifts_data.get(shift["shift_pk"], no_data)
shift_json["shift"].update(shift_data)
events.append(shift_json)
# combine multiple-users same-shift events into one
events = self._merge_events(events)
# annotate events with swap request details swapping users as needed
events = self._apply_swap_requests(
events, datetime_start, datetime_end, ignore_untaken_swaps=ignore_untaken_swaps
)
return events
def final_events(
self,
datetime_start: datetime.datetime,
datetime_end: datetime.datetime,
with_empty: bool = True,
with_gap: bool = True,
ignore_untaken_swaps: bool = False,
include_shift_info: bool = False,
) -> ScheduleEvents:
"""Return schedule final events, after resolving shifts and overrides."""
events = self.filter_events(
datetime_start,
datetime_end,
with_empty=with_empty,
with_gap=with_gap,
all_day_datetime=True,
ignore_untaken_swaps=ignore_untaken_swaps,
include_shift_info=include_shift_info,
)
events = self._resolve_schedule(events, datetime_start, datetime_end)
return events
def filter_swap_requests(
self, datetime_start: datetime.datetime, datetime_end: datetime.datetime
) -> "RelatedManager['ShiftSwapRequest']":
swap_requests = self.shift_swap_requests.filter( # starting before but ongoing
swap_start__lt=datetime_start, swap_end__gte=datetime_start
).union(
self.shift_swap_requests.filter( # starting after but before end
swap_start__gte=datetime_start, swap_start__lte=datetime_end
)
)
swap_requests = swap_requests.order_by("created_at")
return swap_requests
def refresh_ical_final_schedule(self):
now = timezone.now()
# window to consider: from now, -15 days + 6 months
delta = EXPORT_WINDOW_DAYS_BEFORE
days = EXPORT_WINDOW_DAYS_AFTER + delta
datetime_start = now.replace(hour=0, minute=0, second=0, microsecond=0) - datetime.timedelta(days=delta)
datetime_end = datetime_start + datetime.timedelta(days=days - 1, hours=23, minutes=59, seconds=59)
# setup calendar with final schedule shift events
calendar = create_base_icalendar(self.name)
events = self.final_events(datetime_start, datetime_end, ignore_untaken_swaps=True)
updated_ids = set()
for e in events:
for u in e["users"]:
event = icalendar.Event()
event.add(ICAL_SUMMARY, u["display_name"])
event.add(ICAL_DATETIME_START, e["start"])
event.add(ICAL_DATETIME_END, e["end"])
event.add(ICAL_DATETIME_STAMP, now)
event.add(ICAL_LAST_MODIFIED, now)
# set priority based on primary/overrides
# 0: undefined priority, 1: high priority
event.add(ICAL_PRIORITY, e["calendar_type"])
event_uid = "{}-{}-{}".format(e["shift"]["pk"], e["start"].strftime("%Y%m%d%H%S"), u["pk"])
event[ICAL_UID] = event_uid
calendar.add_component(event)
updated_ids.add(event_uid)
# check previously cached final schedule for potentially cancelled events
if self.cached_ical_final_schedule:
previous = icalendar.Calendar.from_ical(self.cached_ical_final_schedule)
for component in previous.walk():
if component.name == ICAL_COMPONENT_VEVENT and component[ICAL_UID] not in updated_ids:
# check if event was ended or cancelled, update ical
dtend = component.get(ICAL_DATETIME_END)
dtend_datetime = dtend.dt if dtend else None
if dtend_datetime and type(dtend_datetime) is datetime.date:
# shift or overrides coming from ical calendars can be all day events, change to datetime
dtend_datetime = datetime.datetime.combine(
dtend.dt, datetime.datetime.min.time(), tzinfo=pytz.UTC
)
if dtend_datetime and dtend_datetime < datetime_start:
# event ended before window start
continue
is_cancelled = component.get(ICAL_STATUS)
last_modified = component.get(ICAL_LAST_MODIFIED)
if is_cancelled and last_modified and last_modified.dt < datetime_start:
# drop already ended events older than the window we consider
continue
elif is_cancelled and not last_modified:
# set last_modified if it was missing (e.g. from previous export ical implementation)
component[ICAL_LAST_MODIFIED] = icalendar.vDatetime(now).to_ical()
elif not is_cancelled:
# set the event as cancelled
component[ICAL_DATETIME_END] = component[ICAL_DATETIME_START]
component[ICAL_STATUS] = ICAL_STATUS_CANCELLED
component[ICAL_LAST_MODIFIED] = icalendar.vDatetime(now).to_ical()
# include just cancelled events as well as those that were cancelled during the time window
calendar.add_component(component)
ical_data = calendar.to_ical().decode()
self.cached_ical_final_schedule = ical_data
self.save(update_fields=["cached_ical_final_schedule"])
def shifts_for_user(
self,
user: User,
datetime_start: datetime.datetime,
datetime_end: typing.Optional[datetime.datetime] = None,
days: typing.Optional[int] = None,
) -> typing.Tuple[ScheduleEvents, ScheduleEvents, ScheduleEvents]:
"""
NOTE: must specify at least `datetime_end` or `days`
"""
if not datetime_end and not days:
raise ValueError("Must specify at least `datetime_end` or `days`")
now = timezone.now()
if days is not None:
datetime_end = datetime_start + datetime.timedelta(days=days)
passed_shifts: ScheduleEvents = []
current_shifts: ScheduleEvents = []
upcoming_shifts: ScheduleEvents = []
if self.cached_ical_final_schedule is None:
# no final schedule info available
return passed_shifts, current_shifts, upcoming_shifts
events = self.filter_events(
datetime_start, datetime_end, all_day_datetime=True, from_cached_final=True, include_shift_info=True
)
events.sort(key=lambda e: e["start"])
for event in events:
users = {u["pk"] for u in event["users"]}
if user.public_primary_key in users:
if event["end"] <= now:
passed_shifts.append(event)
elif event["start"] <= now < event["end"]:
current_shifts.append(event)
else:
upcoming_shifts.append(event)
return passed_shifts, current_shifts, upcoming_shifts
def quality_report(self, date: typing.Optional[datetime.datetime], days: typing.Optional[int]) -> QualityReport:
"""
Return schedule quality report to be used by the web UI.
TODO: Add scores on "inside working hours" and "balance outside working hours" when
TODO: working hours editor is implemented in the web UI.
"""
# get events to consider for calculation
if date is None:
today = timezone.now()
date = today - datetime.timedelta(days=7 - today.weekday()) # start of next week in UTC
if days is None:
days = 52 * 7 # consider next 52 weeks (~1 year)
datetime_end = date + datetime.timedelta(days=days - 1, hours=23, minutes=59, seconds=59)
events = self.final_events(date, datetime_end)
# an event is “good” if it's not a gap and not empty
good_events: ScheduleEvents = [event for event in events if not event["is_gap"] and not event["is_empty"]]
if not good_events:
return {
"total_score": 0,
"comments": [{"type": QualityReportCommentType.WARNING, "text": "Schedule is empty"}],
"overloaded_users": [],
}
def event_duration(ev: ScheduleEvent) -> datetime.timedelta:
return ev["end"] - ev["start"]
def timedelta_sum(deltas: typing.Iterable[datetime.timedelta]) -> datetime.timedelta:
return sum(deltas, start=datetime.timedelta())
def score_to_percent(value: float) -> int:
return round(value * 100)
def get_duration_map(evs: ScheduleEvents) -> DurationMap:
"""Return a map of user PKs to total duration of events they are in."""
result: DurationMap = defaultdict(datetime.timedelta)
for ev in evs:
for user in ev["users"]:
user_pk = user["pk"]
result[user_pk] += event_duration(ev)
return result
def get_balance_score_by_duration_map(dur_map: DurationMap) -> float:
"""
Return a score between 0 and 1, based on how balanced the durations are in the duration map.
The formula is taken from https://github.com/grafana/oncall/issues/118#issuecomment-1161787854.
"""
if len(dur_map) <= 1:
return 1
result = 0.0
for key_1, key_2 in itertools.combinations(dur_map, 2):
duration_1 = dur_map[key_1]
duration_2 = dur_map[key_2]
result += min(duration_1, duration_2) / max(duration_1, duration_2)
number_of_pairs = len(dur_map) * (len(dur_map) - 1) // 2
return result / number_of_pairs
# calculate good event score
good_events_duration = timedelta_sum(event_duration(event) for event in good_events)
good_event_score = min(good_events_duration / datetime.timedelta(days=days), 1)
good_event_score = score_to_percent(good_event_score)
# calculate balance score
duration_map = get_duration_map(good_events)
balance_score = get_balance_score_by_duration_map(duration_map)
balance_score = score_to_percent(balance_score)
# calculate overloaded users
overloaded_users: QualityReportOverloadedUsers = []
if balance_score >= 95: # tolerate minor imbalance
balance_score = 100
else:
average_duration = timedelta_sum(duration_map.values()) / len(duration_map)
overloaded_user_pks = [
user_pk
for user_pk, duration in duration_map.items()
if score_to_percent(duration / average_duration) > 100
]
usernames = {
u.public_primary_key: u.username
for u in User.objects.filter(public_primary_key__in=overloaded_user_pks).only(
"public_primary_key", "username"
)
}
for user_pk in overloaded_user_pks:
score = score_to_percent(duration_map[user_pk] / average_duration) - 100
username = usernames.get(user_pk) or "unknown" # fallback to "unknown" if user is not found
overloaded_users.append({"id": user_pk, "username": username, "score": score})
# show most overloaded users first
overloaded_users.sort(key=lambda u: (-u["score"], u["username"]))
# generate comments regarding gaps
comments: QualityReportComments = []
if good_event_score == 100:
comments.append({"type": QualityReportCommentType.INFO, "text": "Schedule has no gaps"})
else:
not_covered = 100 - good_event_score
comments.append(
{"type": QualityReportCommentType.WARNING, "text": f"Schedule has gaps ({not_covered}% not covered)"}
)
# generate comments regarding balance
if balance_score == 100:
comments.append({"type": QualityReportCommentType.INFO, "text": "Schedule is perfectly balanced"})
else:
comments.append(
{"type": QualityReportCommentType.WARNING, "text": "Schedule has balance issues (see overloaded users)"}
)
# calculate total score (weighted sum of good event score and balance score)
total_score = round((good_event_score + balance_score) / 2)
return {
"total_score": total_score,
"comments": comments,
"overloaded_users": overloaded_users,
}
def _apply_swap_requests(
self,
events: ScheduleEvents,
datetime_start: datetime.datetime,
datetime_end: datetime.datetime,
ignore_untaken_swaps: bool = False,
) -> ScheduleEvents:
"""Apply swap requests details to schedule events."""
# get swaps requests affecting this schedule / time range
prefetched_swaps = getattr(self, PREFETCHED_SHIFT_SWAPS, None)
swaps = (
prefetched_swaps
if prefetched_swaps is not None
else self.filter_swap_requests(datetime_start, datetime_end)
)
def _insert_event(index: int, event: ScheduleEvent) -> int:
# add event, if any, to events list in the specified index
# return incremented index if the event was added
if event is None:
return index
events.insert(index, event)
return index + 1
# apply swaps sequentially
for swap in swaps:
if swap.is_past_due or (ignore_untaken_swaps and not swap.is_taken):
# ignore expired requests, or untaken if specified
continue
i = 0
while i < len(events):
event = events.pop(i)
if event["start"] >= swap.swap_end or event["end"] <= swap.swap_start:
# event outside the swap period, keep as it is and continue
i = _insert_event(i, event)
continue
users = set(u["pk"] for u in event["users"])
if swap.beneficiary.public_primary_key in users:
# swap request affects current event
split_before = None
if event["start"] < swap.swap_start:
# partially included start -> split
split_before = copy.deepcopy(event)
split_before["end"] = swap.swap_start
# update event to swap
event["start"] = swap.swap_start
split_after = None
if event["end"] > swap.swap_end:
# partially included end -> split
split_after = copy.deepcopy(event)
split_after["start"] = swap.swap_end
# update event to swap
event["end"] = swap.swap_end
# identify user to swap
user_to_swap = None
for u in event["users"]:
if u["pk"] == swap.beneficiary.public_primary_key:
user_to_swap = u
break
# apply swap changes to event user
swap_details = {"pk": swap.public_primary_key}
if swap.benefactor:
# swap is taken, update user in shift
user_to_swap["pk"] = swap.benefactor.public_primary_key
user_to_swap["display_name"] = swap.benefactor.username
user_to_swap["email"] = swap.benefactor.email
user_to_swap["avatar_full"] = swap.benefactor.avatar_full_url(self.organization)
# add beneficiary user to details
swap_details["user"] = {
"display_name": swap.beneficiary.username,
"email": swap.beneficiary.email,
"pk": swap.beneficiary.public_primary_key,
"avatar_full": swap.beneficiary.avatar_full_url(self.organization),
}
user_to_swap["swap_request"] = swap_details
# update events list
# keep first split event in its original index
i = _insert_event(i, split_before)
# insert updated swap-related event
i = _insert_event(i, event)
# keep second split event after swap
i = _insert_event(i, split_after)
else:
# event for different user(s), keep as it is and continue
i = _insert_event(i, event)
return events
def _resolve_schedule(
self, events: ScheduleEvents, datetime_start: datetime.datetime, datetime_end: datetime.datetime
) -> ScheduleEvents:
"""Calculate final schedule shifts considering rotations and overrides.
Exclude events that after split/update are out of the requested (datetime_start, datetime_end) range.
"""
if not events:
return []
def event_start_cmp_key(e: ScheduleEvent) -> datetime.datetime:
return e["start"]
def event_cmp_key(e: ScheduleEvent) -> typing.Tuple[int, int, datetime.datetime]:
"""Sorting key criteria for events."""
start = event_start_cmp_key(e)
return (
-e["calendar_type"] if e["calendar_type"] else 0, # overrides: 1, shifts: 0, gaps: None
-e["priority_level"] if e["priority_level"] else 0,
start,
)
def insort_event(eventlist: ScheduleEvents, e: ScheduleEvent) -> None:
"""Insert event keeping ordering criteria into already sorted event list."""
idx = 0
for i in eventlist:
if event_cmp_key(e) > event_cmp_key(i):
idx += 1
else:
break
eventlist.insert(idx, e)
def _merge_intervals(evs: ScheduleEvents) -> ScheduleEventIntervals:
"""Keep track of scheduled intervals."""
if not evs:
return []
intervals = [[e["start"], e["end"]] for e in evs]
result = [intervals[0]]
for interval in intervals[1:]:
previous_interval = result[-1]
if previous_interval[0] <= interval[0] <= previous_interval[1]:
previous_interval[1] = max(previous_interval[1], interval[1])
else:
result.append(interval)
return result
# sort schedule events by (type desc, priority desc, start timestamp asc)
events.sort(key=event_cmp_key)
# iterate over events, reserving schedule slots based on their priority
# if the expected slot was already scheduled for a higher priority event,
# split the event, or fix start/end timestamps accordingly
intervals = []
resolved: ScheduleEvents = []
pending: ScheduleEvents = events
current_interval_idx = 0 # current scheduled interval being checked
current_type: typing.Optional[int] = OnCallSchedule.TYPE_ICAL_OVERRIDES # current calendar type
current_priority: typing.Optional[int] = None # current priority level being resolved
while pending:
ev = pending.pop(0)
if ev["is_empty"]:
# exclude events without active users
continue
if ev["start"] >= datetime_end or ev["end"] <= datetime_start:
# avoid including split events which now are outside the requested time range
continue
# api/terraform shifts could be missing a priority; assume None means 0
priority = ev["priority_level"] or 0
if priority != current_priority or current_type != ev["calendar_type"]:
# update scheduled intervals on priority change
# and start from the beginning for the new priority level
# also for calendar event type (overrides first, then apply regular shifts)
resolved.sort(key=event_start_cmp_key)
intervals = _merge_intervals(resolved)
current_interval_idx = 0
current_priority = priority
current_type = ev["calendar_type"]
if current_interval_idx >= len(intervals):
# event outside scheduled intervals, add to resolved
# only if still starts before datetime_end
if ev["start"] < datetime_end:
resolved.append(ev)
elif ev["start"] < intervals[current_interval_idx][0] and ev["end"] <= intervals[current_interval_idx][0]:
# event starts and ends outside an already scheduled interval, add to resolved
resolved.append(ev)
elif ev["start"] < intervals[current_interval_idx][0] and ev["end"] > intervals[current_interval_idx][0]:
# event starts outside interval but overlaps with an already scheduled interval
# 1. add a split event copy to schedule the time before the already scheduled interval
to_add = ev.copy()
to_add["end"] = intervals[current_interval_idx][0]
if to_add["end"] >= datetime_start:
# only include if updated event ends inside the requested time range
resolved.append(to_add)
# 2. check if there is still time to be scheduled after the current scheduled interval ends
if ev["end"] > intervals[current_interval_idx][1]:
# event ends after current interval, update event start timestamp to match the interval end
# and process the updated event as any other event
ev["start"] = intervals[current_interval_idx][1]
if ev["start"] < datetime_end:
# only include event if it is still inside the requested time range
# reorder pending events after updating current event start date
# (ie. insert the event where it should be to keep the order criteria)
# TODO: switch to bisect insert on python 3.10 (or consider heapq)
insort_event(pending, ev)
# done, go to next event
elif ev["start"] >= intervals[current_interval_idx][0] and ev["end"] <= intervals[current_interval_idx][1]:
# event inside an already scheduled interval, ignore (go to next)
continue
elif (
ev["start"] >= intervals[current_interval_idx][0]
and ev["start"] < intervals[current_interval_idx][1]
and ev["end"] > intervals[current_interval_idx][1]
):
# event starts inside a scheduled interval but ends out of it
# update the event start timestamp to match the interval end
ev["start"] = intervals[current_interval_idx][1]
# unresolved, re-add to pending
# TODO: switch to bisect insert on python 3.10 (or consider heapq)
insort_event(pending, ev)
elif ev["start"] >= intervals[current_interval_idx][1]:
# event starts after the current interval, move to next interval and go through it
current_interval_idx += 1
# unresolved, re-add to pending
# TODO: switch to bisect insert on python 3.10 (or consider heapq)
insort_event(pending, ev)
resolved.sort(key=lambda e: (event_start_cmp_key(e), e["shift"]["pk"] or ""))
return resolved
def _merge_events(self, events: ScheduleEvents) -> ScheduleEvents:
"""Merge user groups same-shift events."""
if events:
merged = [events[0]]
current = merged[0]
for next_event in events[1:]:
if (
current["start"] == next_event["start"]
and current["shift"]["pk"] is not None
and current["shift"]["pk"] == next_event["shift"]["pk"]
):
current["users"] += [u for u in next_event["users"] if u not in current["users"]]
current["missing_users"] += [
u for u in next_event["missing_users"] if u not in current["missing_users"]
]
else:
merged.append(next_event)
current = next_event
events = merged
return events
def _generate_ical_file_from_shifts(self, qs, extra_shifts=None, allow_empty_users=False):
"""Generate iCal events file from custom on-call shifts."""
# default to empty string since it is not possible to have a no-events ical file
ical = ""
if qs.exists() or extra_shifts is not None:
if extra_shifts is None:
extra_shifts = []
end_line = "END:VCALENDAR"
calendar = icalendar.Calendar()
calendar.add("prodid", "-//web schedule//oncall//")
calendar.add("version", "2.0")
calendar.add("method", "PUBLISH")
ical_file = calendar.to_ical().decode()
ical = ical_file.replace(end_line, "").strip()
ical = f"{ical}\r\n"
for event in itertools.chain(qs.all(), extra_shifts):
ical += event.convert_to_ical(allow_empty_users=allow_empty_users)
ical += f"{end_line}\r\n"
return ical
def preview_shift(self, custom_shift, datetime_start, datetime_end, updated_shift_pk=None):
"""Return unsaved rotation and final schedule preview events."""
if custom_shift.type == CustomOnCallShift.TYPE_OVERRIDE:
qs = self.custom_shifts.filter(type=CustomOnCallShift.TYPE_OVERRIDE)
ical_attr = "cached_ical_file_overrides"
ical_property = "_ical_file_overrides"
elif custom_shift.type == CustomOnCallShift.TYPE_ROLLING_USERS_EVENT:
qs = self.custom_shifts.exclude(type=CustomOnCallShift.TYPE_OVERRIDE)
ical_attr = "cached_ical_file_primary"
ical_property = "_ical_file_primary"
else:
raise ValueError("Invalid shift type")
def _invalidate_cache(schedule, prop_name):
"""Invalidate cached property cache"""
try:
delattr(schedule, prop_name)
except AttributeError:
pass
extra_shifts = [custom_shift]
if updated_shift_pk is not None:
# only reuse PK for preview when updating a rotation that won't be started after the update
custom_shift.public_primary_key = updated_shift_pk
qs = qs.exclude(public_primary_key=updated_shift_pk)
ical_file = self._generate_ical_file_from_shifts(qs, extra_shifts=extra_shifts, allow_empty_users=True)
original_value = getattr(self, ical_attr)
_invalidate_cache(self, ical_property)
setattr(self, ical_attr, ical_file)
# filter events using a temporal overriden calendar including the not-yet-saved shift
events = self.filter_events(datetime_start, datetime_end, with_empty=True, with_gap=True)
# return preview events for affected shifts
updated_shift_pks = {s.public_primary_key for s in extra_shifts}
shift_events = [e.copy() for e in events if e["shift"]["pk"] in updated_shift_pks]
final_events = self._resolve_schedule(events, datetime_start, datetime_end)
_invalidate_cache(self, ical_property)
setattr(self, ical_attr, original_value)
return shift_events, final_events
# Insight logs
@property
def insight_logs_verbal(self):
return self.name
@property
def insight_logs_serialized(self):
result = {
"name": self.name,
}
if self.team:
result["team"] = self.team.name
result["team_id"] = self.team.public_primary_key
else:
result["team"] = "General"
if self.organization.slack_team_identity:
if self.slack_channel is not None:
result["slack_channel"] = self.slack_channel.name
if self.user_group is not None:
result["user_group"] = self.user_group.handle
result["notification_frequency"] = self.get_notify_oncall_shift_freq_display()
result["current_shift_notification"] = self.mention_oncall_start
result["next_shift_notification"] = self.mention_oncall_next
result["notify_empty_oncall"] = self.get_notify_empty_oncall_display()
return result
@property
def insight_logs_metadata(self):
result = {}
if self.team:
result["team"] = self.team.name
result["team_id"] = self.team.public_primary_key
else:
result["team"] = "General"
return result
class OnCallScheduleICal(OnCallSchedule):
escalation_policies: "RelatedManager['EscalationPolicy']"
objects: models.Manager["OnCallScheduleICal"]
schedule_export_token: "RelatedManager['ScheduleExportAuthToken']"
# For the ical schedule both primary and overrides icals are imported via ical url
ical_url_primary = models.CharField(max_length=500, null=True, default=None)
ical_file_error_primary = models.CharField(max_length=200, null=True, default=None)
ical_url_overrides = models.CharField(max_length=500, null=True, default=None)
ical_file_error_overrides = models.CharField(max_length=200, null=True, default=None)
@cached_property
def _ical_file_primary(self):
"""
Download iCal file imported from calendar
"""
cached_ical_file = self.cached_ical_file_primary
if self.ical_url_primary is not None and self.cached_ical_file_primary is None:
self.cached_ical_file_primary, self.ical_file_error_primary = fetch_ical_file_or_get_error(
self.ical_url_primary
)
self.save(update_fields=["cached_ical_file_primary", "ical_file_error_primary"])
cached_ical_file = self.cached_ical_file_primary
return cached_ical_file
@cached_property
def _ical_file_overrides(self):
"""
Download iCal file imported from calendar
"""
cached_ical_file = self.cached_ical_file_overrides
if self.ical_url_overrides is not None and self.cached_ical_file_overrides is None:
self.cached_ical_file_overrides, self.ical_file_error_overrides = fetch_ical_file_or_get_error(
self.ical_url_overrides
)
self.save(update_fields=["cached_ical_file_overrides", "ical_file_error_overrides"])
cached_ical_file = self.cached_ical_file_overrides
return cached_ical_file
def _refresh_primary_ical_file(self):
self.prev_ical_file_primary = self.cached_ical_file_primary
if self.ical_url_primary is not None:
self.cached_ical_file_primary, self.ical_file_error_primary = fetch_ical_file_or_get_error(
self.ical_url_primary,
)
self.save(update_fields=["cached_ical_file_primary", "prev_ical_file_primary", "ical_file_error_primary"])
def _refresh_overrides_ical_file(self):
self.prev_ical_file_overrides = self.cached_ical_file_overrides
if self.ical_url_overrides is not None:
self.cached_ical_file_overrides, self.ical_file_error_overrides = fetch_ical_file_or_get_error(
self.ical_url_overrides,
)
self.save(update_fields=["cached_ical_file_overrides", "prev_ical_file_overrides", "ical_file_error_overrides"])
# Insight logs
@property
def insight_logs_serialized(self):
res = super().insight_logs_serialized
res["primary_calendar_url"] = self.ical_url_primary
res["overrides_calendar_url"] = self.ical_url_overrides
return res
@property
def insight_logs_type_verbal(self):
return "ical_schedule"
class OnCallScheduleCalendar(OnCallSchedule):
escalation_policies: "RelatedManager['EscalationPolicy']"
objects: models.Manager["OnCallScheduleCalendar"]
schedule_export_token: "RelatedManager['ScheduleExportAuthToken']"
# For the calendar schedule only overrides ical is imported via ical url.
ical_url_overrides = models.CharField(max_length=500, null=True, default=None)
ical_file_error_overrides = models.CharField(max_length=200, null=True, default=None)
# Primary ical is generated from custom_on_call_shifts.
time_zone = models.CharField(max_length=100, default="UTC")
custom_on_call_shifts = models.ManyToManyField("schedules.CustomOnCallShift", related_name="schedules")
enable_web_overrides = models.BooleanField(default=False, null=True)
@cached_property
def _ical_file_primary(self):
"""
Return cached ical file with iCal events from custom on-call shifts
"""
if self.cached_ical_file_primary is None:
self.cached_ical_file_primary = self._generate_ical_file_primary()
self.save(update_fields=["cached_ical_file_primary"])
return self.cached_ical_file_primary
@cached_property
def _ical_file_overrides(self):
"""
Download iCal file imported from calendar
"""
if self.cached_ical_file_overrides is not None:
return self.cached_ical_file_overrides
self._refresh_overrides_ical_file()
return self.cached_ical_file_overrides
def _refresh_primary_ical_file(self):
self.prev_ical_file_primary = self.cached_ical_file_primary
self.cached_ical_file_primary = self._generate_ical_file_primary()
self.save(
update_fields=[
"cached_ical_file_primary",
"prev_ical_file_primary",
]
)
def _refresh_overrides_ical_file(self):
self.prev_ical_file_overrides = self.cached_ical_file_overrides
if self.enable_web_overrides:
# web overrides
qs = self.custom_shifts.filter(type=CustomOnCallShift.TYPE_OVERRIDE)
self.cached_ical_file_overrides = self._generate_ical_file_from_shifts(qs)
elif self.ical_url_overrides is not None:
self.cached_ical_file_overrides, self.ical_file_error_overrides = fetch_ical_file_or_get_error(
self.ical_url_overrides,
)
self.save(update_fields=["cached_ical_file_overrides", "prev_ical_file_overrides", "ical_file_error_overrides"])
def _generate_ical_file_primary(self):
"""
Generate iCal events file from custom on-call shifts (created via API)
"""
# default to empty string since it is not possible to have a no-events ical file
ical = ""
if self.custom_on_call_shifts.exists():
end_line = "END:VCALENDAR"
calendar = icalendar.Calendar()
calendar.add("prodid", "-//My calendar product//amixr//")
calendar.add("version", "2.0")
calendar.add("method", "PUBLISH")
ical_file = calendar.to_ical().decode()
ical = ical_file.replace(end_line, "").strip()
ical = f"{ical}\r\n"
for event in self.custom_on_call_shifts.all():
ical += event.convert_to_ical(self.time_zone)
ical += f"{end_line}\r\n"
return ical
def preview_shift(self, custom_shift, datetime_start, datetime_end, updated_shift_pk=None):
"""Return unsaved rotation and final schedule preview events."""
if custom_shift.type != CustomOnCallShift.TYPE_OVERRIDE:
raise ValueError("Invalid shift type")
return super().preview_shift(custom_shift, datetime_start, datetime_end, updated_shift_pk=updated_shift_pk)
@property
def insight_logs_type_verbal(self):
return "calendar_schedule"
@property
def insight_logs_serialized(self):
res = super().insight_logs_serialized
res["overrides_calendar_url"] = self.ical_url_overrides
return res
class OnCallScheduleWeb(OnCallSchedule):
escalation_policies: "RelatedManager['EscalationPolicy']"
objects: models.Manager["OnCallScheduleWeb"]
schedule_export_token: "RelatedManager['ScheduleExportAuthToken']"
time_zone = models.CharField(max_length=100, default="UTC")
def _generate_ical_file_primary(self):
qs = self.custom_shifts.exclude(type=CustomOnCallShift.TYPE_OVERRIDE)
return self._generate_ical_file_from_shifts(qs)
def _generate_ical_file_overrides(self):
qs = self.custom_shifts.filter(type=CustomOnCallShift.TYPE_OVERRIDE)
return self._generate_ical_file_from_shifts(qs)
@cached_property
def _ical_file_primary(self):
"""Return cached ical file with iCal events from custom on-call shifts."""
if self.cached_ical_file_primary is None:
self.cached_ical_file_primary = self._generate_ical_file_primary()
try:
self.save(update_fields=["cached_ical_file_primary"])
except DatabaseError:
# schedule may have been deleted from db
return
return self.cached_ical_file_primary
def _refresh_primary_ical_file(self):
self.prev_ical_file_primary = self.cached_ical_file_primary
self.cached_ical_file_primary = self._generate_ical_file_primary()
self.save(update_fields=["cached_ical_file_primary", "prev_ical_file_primary"])
@cached_property
def _ical_file_overrides(self):
"""Return cached ical file with iCal events from custom on-call overrides shifts."""
if self.cached_ical_file_overrides is None:
self.cached_ical_file_overrides = self._generate_ical_file_overrides()
try:
self.save(update_fields=["cached_ical_file_overrides"])
except DatabaseError:
# schedule may have been deleted from db
return
return self.cached_ical_file_overrides
def _refresh_overrides_ical_file(self):
self.prev_ical_file_overrides = self.cached_ical_file_overrides
self.cached_ical_file_overrides = self._generate_ical_file_overrides()
self.save(update_fields=["cached_ical_file_overrides", "prev_ical_file_overrides"])
# Insight logs
@property
def insight_logs_type_verbal(self):
return "web_schedule"
@property
def insight_logs_serialized(self):
res = super().insight_logs_serialized
res["time_zone"] = self.time_zone
return res