oncall-engine/engine/apps/schedules/ical_utils.py
Matias Bordese f3f7c17f8b
fix: update missing users / empty shifts check (#5322)
Related to https://github.com/grafana/oncall-private/issues/2950

- Represent missing users in schedule events (so they are displayed in
the web UI)
- Fix schedule checks for gaps/empty shifts so they send notifications
2025-01-02 13:50:09 +00:00

985 lines
39 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import datetime
import logging
import re
import typing
from collections import namedtuple
from typing import TYPE_CHECKING
import pytz
import requests
from django.core.cache import cache
from django.db.models import Q
from icalendar import Calendar
from icalendar import Event as IcalEvent
from apps.api.permissions import RBACPermission
from apps.schedules.constants import (
CALENDAR_TYPE_FINAL,
ICAL_ATTENDEE,
ICAL_DATETIME_END,
ICAL_DATETIME_START,
ICAL_DESCRIPTION,
ICAL_LOCATION,
ICAL_PRIORITY,
ICAL_RECURRENCE_ID,
ICAL_SEQUENCE,
ICAL_STATUS,
ICAL_STATUS_CANCELLED,
ICAL_SUMMARY,
ICAL_UID,
RE_EVENT_UID_EXPORT,
RE_EVENT_UID_V1,
RE_EVENT_UID_V2,
RE_PRIORITY,
SCHEDULE_ONCALL_CACHE_KEY_PREFIX,
SCHEDULE_ONCALL_CACHE_TTL,
)
from apps.schedules.ical_events import ical_events
from common.cache import ensure_cache_key_allocates_to_the_same_hash_slot
from common.timezones import is_valid_timezone
from common.utils import timed_lru_cache
"""
This is a hack to allow us to load models for type checking without circular dependencies.
This module likely needs to refactored to be part of the OnCallSchedule module.
"""
if TYPE_CHECKING:
from apps.schedules.models import OnCallSchedule
from apps.schedules.models.on_call_schedule import OnCallScheduleQuerySet
from apps.user_management.models import Organization, User
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
class MissingUser:
"""Represent a missing user in a rolling users shift."""
DISPLAY_NAME = "(missing)"
def __init__(self, pk):
self.pk = pk
@property
def username(self):
return self.DISPLAY_NAME
EmptyShift = namedtuple(
"EmptyShift",
["start", "end", "summary", "description", "attendee", "all_day", "calendar_type", "calendar_tz", "shift_pk"],
)
EmptyShifts = typing.List[EmptyShift]
DatetimeInterval = namedtuple("DatetimeInterval", ["start", "end"])
DatetimeIntervals = typing.List[DatetimeInterval]
IcalEvents = typing.List[IcalEvent]
SchedulesOnCallUsers = typing.Dict["OnCallSchedule", typing.List["User"]]
def users_in_ical(
usernames_from_ical: typing.List[str],
organization: "Organization",
) -> typing.List["User"]:
"""
This method returns a list of `User` objects, filtered by users whose username, or case-insensitive e-mail,
is present in `usernames_from_ical`.
Additionally, it filters the users by the organization they belong to and checks if they have the required
permission to receive notifications.
Parameters
----------
usernames_from_ical : typing.List[str]
A list of usernames present in the ical feed
organization : apps.user_management.models.organization.Organization
The organization in question
"""
required_permission = RBACPermission.Permissions.NOTIFICATIONS_READ
emails_from_ical = [username.lower() for username in usernames_from_ical]
# NOTE: doing a select_related for organization here, since we will be accessing u.organization for each user
# in the required_permission.user_has_permission calls below
users_found_in_ical = (
organization.users.filter((Q(username__in=usernames_from_ical) | Q(email__lower__in=emails_from_ical)))
.distinct()
.select_related("organization")
)
# it is more efficient to check permissions on the subset of users filtered above
# than performing a regex query for the required permission
return [u for u in users_found_in_ical if required_permission.user_has_permission(u)]
@timed_lru_cache(timeout=100)
def memoized_users_in_ical(usernames_from_ical: typing.Tuple[str], organization: "Organization") -> typing.List["User"]:
# using in-memory cache instead of redis to avoid pickling python objects
return users_in_ical(usernames_from_ical, organization)
# used for display schedule events on web
def list_of_oncall_shifts_from_ical(
schedule: "OnCallSchedule",
datetime_start: datetime.datetime,
datetime_end: datetime.datetime,
with_empty_shifts: bool = False,
with_gaps: bool = False,
filter_by: str | None = None,
from_cached_final: bool = False,
):
"""
Parse the ical file and return list of events with users
This function is used in serializer for api schedules/events/ endpoint
Example output:
[
{
"start": datetime.datetime(2021, 7, 8, 5, 30, tzinfo=<UTC>,
"end": datetime.datetime(2021, 7, 8, 13, 15, tzinfo=<UTC>),
"users": <QuerySet [<User: User object (1)>]>,
"priority": 0,
"source": None,
"calendar_type": 0
}
]
"""
from apps.schedules.models import OnCallSchedule
# get list of iCalendars from current iCal files. If there is more than one calendar, primary calendar will always
# be the first
calendars: typing.Tuple[typing.Optional[Calendar], ...]
if from_cached_final:
calendars = (Calendar.from_ical(schedule.cached_ical_final_schedule),)
else:
calendars = schedule.get_icalendars()
result_datetime = []
result_date = []
for idx, calendar in enumerate(calendars):
if calendar is not None:
calendar_type: str | int
if from_cached_final:
calendar_type = CALENDAR_TYPE_FINAL
elif idx == 0:
calendar_type = OnCallSchedule.PRIMARY
else:
calendar_type = OnCallSchedule.OVERRIDES
if filter_by is not None and filter_by != calendar_type:
continue
tmp_result_datetime, tmp_result_date = get_shifts_dict(
calendar, calendar_type, schedule, datetime_start, datetime_end, with_empty_shifts
)
result_datetime.extend(tmp_result_datetime)
result_date.extend(tmp_result_date)
if with_gaps and len(result_date) == 0:
as_intervals = [DatetimeInterval(shift["start"], shift["end"]) for shift in result_datetime]
gaps = detect_gaps(as_intervals, datetime_start, datetime_end)
for g in gaps:
result_datetime.append(
{
"start": g.start if g.start else datetime_start,
"end": g.end if g.end else datetime_end,
"users": [],
"missing_users": [],
"priority": None,
"source": None,
"calendar_type": None,
"is_gap": True,
"shift_pk": None,
}
)
def event_start_cmp_key(e):
pytz_tz = pytz.timezone("UTC")
return (
datetime.datetime.combine(e["start"], datetime.datetime.min.time(), tzinfo=pytz_tz)
if type(e["start"]) is datetime.date
else e["start"]
)
result = sorted(result_datetime + result_date, key=event_start_cmp_key)
# if there is no events, return None
return result or None
def get_shifts_dict(
calendar: Calendar,
calendar_type: str | int,
schedule: "OnCallSchedule",
datetime_start: datetime.datetime,
datetime_end: datetime.datetime,
with_empty_shifts: bool = False,
):
events = ical_events.get_events_from_ical_between(calendar, datetime_start, datetime_end)
result_datetime = []
result_date = []
for event in events:
status = event.get(ICAL_STATUS)
if status == ICAL_STATUS_CANCELLED:
# ignore cancelled events
continue
sequence = event.get(ICAL_SEQUENCE)
recurrence_id = event.get(ICAL_RECURRENCE_ID)
if recurrence_id:
recurrence_id = recurrence_id.dt.isoformat()
priority = parse_priority_from_string(event.get(ICAL_SUMMARY, "[L0]"))
pk, source = parse_event_uid(event.get(ICAL_UID), sequence=sequence, recurrence_id=recurrence_id)
users = get_users_from_ical_event(event, schedule.organization)
missing_users = get_missing_users_from_ical_event(event, schedule.organization)
event_calendar_type = calendar_type
if calendar_type == CALENDAR_TYPE_FINAL:
event_calendar_type = (
schedule.OVERRIDES
if (
event.get(ICAL_PRIORITY, "") == schedule.OVERRIDES
or
# keep for backwards compatibility (to be removed later once schedules are refreshed)
event.get(ICAL_LOCATION, "") == schedule.CALENDAR_TYPE_VERBAL[schedule.OVERRIDES]
)
else schedule.PRIMARY
)
# Define on-call shift out of ical event that has the actual user
if len(users) > 0 or with_empty_shifts:
if type(event[ICAL_DATETIME_START].dt) is datetime.date:
start = event[ICAL_DATETIME_START].dt
end = event[ICAL_DATETIME_END].dt
result_date.append(
{
"start": start,
"end": end,
"users": users,
"missing_users": missing_users,
"priority": priority,
"source": source,
"calendar_type": event_calendar_type,
"shift_pk": pk,
}
)
else:
start, end = ical_events.get_start_and_end_with_respect_to_event_type(event)
if start < end:
result_datetime.append(
{
"start": start.astimezone(datetime.timezone.utc),
"end": end.astimezone(datetime.timezone.utc),
"users": users,
"missing_users": missing_users,
"priority": priority,
"source": source,
"calendar_type": event_calendar_type,
"shift_pk": pk,
}
)
return result_datetime, result_date
def list_of_empty_shifts_in_schedule(
schedule: "OnCallSchedule", start_date: datetime.date, end_date: datetime.date
) -> EmptyShifts:
# Calculate lookup window in schedule's tz
# If we can't get tz from ical use UTC
from apps.schedules.models import OnCallSchedule
calendars = schedule.get_icalendars()
empty_shifts: EmptyShifts = []
for idx, calendar in enumerate(calendars):
if calendar is not None:
if idx == 0:
calendar_type = OnCallSchedule.PRIMARY
else:
calendar_type = OnCallSchedule.OVERRIDES
calendar_tz = get_icalendar_tz_or_utc(calendar)
# utcoffset can technically return None, but we're confident it is a timedelta here
schedule_timezone_offset: datetime.timedelta = datetime.datetime.now().astimezone(calendar_tz).utcoffset() # type: ignore[assignment]
start_datetime = datetime.datetime.combine(start_date, datetime.time.min) + datetime.timedelta(
milliseconds=1
)
start_datetime_with_offset = (start_datetime - schedule_timezone_offset).astimezone(datetime.timezone.utc)
end_datetime = datetime.datetime.combine(end_date, datetime.time.max)
end_datetime_with_offset = (end_datetime - schedule_timezone_offset).astimezone(datetime.timezone.utc)
events = ical_events.get_events_from_ical_between(
calendar, start_datetime_with_offset, end_datetime_with_offset
)
# Keep hashes of checked events to include only first recurrent event into result
checked_events = set()
empty_shifts_per_calendar = []
for event in events:
users = get_users_from_ical_event(event, schedule.organization)
if len(users) == 0:
summary = event.get(ICAL_SUMMARY, "")
description = event.get(ICAL_DESCRIPTION, "")
attendee = event.get(ICAL_ATTENDEE, "")
pk, _ = parse_event_uid(event.get(ICAL_UID))
event_hash = hash(f"{event[ICAL_UID]}{summary}{description}{attendee}")
if event_hash in checked_events:
continue
checked_events.add(event_hash)
start, end, all_day = event_start_end_all_day_with_respect_to_type(event, calendar_tz)
if not all_day:
start = start.astimezone(datetime.timezone.utc)
end = end.astimezone(datetime.timezone.utc)
empty_shifts_per_calendar.append(
EmptyShift(
start=start,
end=end,
summary=summary,
description=description,
attendee=attendee,
all_day=all_day,
calendar_type=calendar_type,
calendar_tz=calendar_tz,
shift_pk=pk,
)
)
empty_shifts.extend(empty_shifts_per_calendar)
return sorted(empty_shifts, key=lambda dt: dt.start)
def list_users_to_notify_from_ical(
schedule: "OnCallSchedule",
events_datetime: typing.Optional[datetime.datetime] = None,
from_cached_final: bool = False,
) -> typing.List["User"]:
"""
Retrieve on-call users for the current time
"""
events_datetime = events_datetime if events_datetime else datetime.datetime.now(datetime.timezone.utc)
return list_users_to_notify_from_ical_for_period(
schedule,
events_datetime,
events_datetime,
from_cached_final=from_cached_final,
)
def list_users_to_notify_from_ical_for_period(
schedule: "OnCallSchedule",
start_datetime: datetime.datetime,
end_datetime: datetime.datetime,
from_cached_final: bool = False,
) -> typing.List["User"]:
if from_cached_final and schedule.cached_ical_final_schedule:
events = schedule.filter_events(start_datetime, end_datetime, from_cached_final=True)
else:
events = schedule.final_events(start_datetime, end_datetime)
usernames: typing.List[str] = []
for event in events:
usernames += [u["email"] for u in event.get("users", [])]
return memoized_users_in_ical(tuple(usernames), schedule.organization)
def get_oncall_users_for_multiple_schedules(
schedules: typing.List["OnCallSchedule"], events_datetime=None
) -> SchedulesOnCallUsers:
if events_datetime is None:
events_datetime = datetime.datetime.now(datetime.timezone.utc)
# Exit early if there are no schedules
if not schedules:
return {}
# Get on-call users
oncall_users: SchedulesOnCallUsers = {}
for schedule in schedules:
# pass user list to list_users_to_notify_from_ical
schedule_oncall_users = list_users_to_notify_from_ical(schedule, events_datetime=events_datetime)
oncall_users.update({schedule: schedule_oncall_users})
return oncall_users
def _generate_cache_key_for_schedule_oncall_users(schedule: "OnCallSchedule") -> str:
return ensure_cache_key_allocates_to_the_same_hash_slot(
f"{SCHEDULE_ONCALL_CACHE_KEY_PREFIX}{schedule.public_primary_key}", SCHEDULE_ONCALL_CACHE_KEY_PREFIX
)
def update_cached_oncall_users_for_schedule(schedule: "OnCallSchedule"):
oncall_users = get_oncall_users_for_multiple_schedules([schedule])
users = oncall_users.get(schedule, [])
cache.set(
_generate_cache_key_for_schedule_oncall_users(schedule),
[user.public_primary_key for user in users],
timeout=SCHEDULE_ONCALL_CACHE_TTL,
)
def get_cached_oncall_users_for_multiple_schedules(schedules: typing.List["OnCallSchedule"]) -> SchedulesOnCallUsers:
"""
More "performant" version of `apps.schedules.ical_utils.get_oncall_users_for_multiple_schedules`
which caches results, for 15 minutes.
The cache results are stored in the following format:
- `schedule_<schedule_public_primary_key>_oncall_users`: [list of oncall user public_primary_keys for the schedule]
This method will return the cached version of the results, if they exist, otherwise it will calculate
the results and cache them for 15 minutes.
Note: since we cannot cache Python objects we will cache the primary keys of schedules/users and
then fetch these from the db in two queries (one for users, one for schedules).
"""
from apps.schedules.models import OnCallSchedule
from apps.user_management.models import User
def _get_schedule_public_primary_key_from_schedule_oncall_users_cache_key(cache_key: str) -> str:
"""
remove any brackets that might be included in the cache key (when redis cluster is active).
See `_generate_cache_key_for_schedule_oncall_users` just above
"""
cache_key = cache_key.replace("{", "").replace("}", "")
return cache_key.replace(SCHEDULE_ONCALL_CACHE_KEY_PREFIX, "")
cache_keys = [_generate_cache_key_for_schedule_oncall_users(schedule) for schedule in schedules]
# get_many returns a dictionary with all the keys we asked for that actually exist
# in the cache (and havent expired)
cached_results = cache.get_many(cache_keys)
schedule_public_primary_keys_to_fetch_from_db: typing.Set[str] = set()
user_public_primary_keys_to_fetch_from_db: typing.Set[str] = set()
# for the results returned from the cache we need to determine which schedule and user objects
# that we will need to fetch from the database (since we can't cache python objects and are only caching
# the objects primary keys)
for cache_key, oncall_users in cached_results.items():
schedule_public_primary_keys_to_fetch_from_db.add(
_get_schedule_public_primary_key_from_schedule_oncall_users_cache_key(cache_key)
)
user_public_primary_keys_to_fetch_from_db.update(oncall_users)
# calculate results for schedules that weren't in the cache
cached_result_keys = list(cached_results.keys())
schedule_primary_keys_we_need_to_calculate_results_for = [
_get_schedule_public_primary_key_from_schedule_oncall_users_cache_key(cache_key)
for cache_key in cache_keys
if cache_key not in cached_result_keys
]
schedules_we_need_to_calculate_results_for = [
schedule
for schedule in schedules
if schedule.public_primary_key in schedule_primary_keys_we_need_to_calculate_results_for
]
results = get_oncall_users_for_multiple_schedules(schedules_we_need_to_calculate_results_for)
# update the cache with the new results we just got back
new_results_to_update_in_cache: typing.Dict[str, typing.List[str]] = {}
for schedule, oncall_users in results.items():
oncall_user_public_primary_keys = [user.public_primary_key for user in oncall_users]
new_results_to_update_in_cache[
_generate_cache_key_for_schedule_oncall_users(schedule)
] = oncall_user_public_primary_keys
cache.set_many(new_results_to_update_in_cache, timeout=SCHEDULE_ONCALL_CACHE_TTL)
# make two queries to the database, one to fetch the schedule objects we need and the other to fetch
# the user objects we need
schedules = {
schedule.public_primary_key: schedule
for schedule in OnCallSchedule.objects.filter(
public_primary_key__in=schedule_public_primary_keys_to_fetch_from_db
)
}
users = {
user.public_primary_key: user
for user in User.objects.filter(public_primary_key__in=user_public_primary_keys_to_fetch_from_db)
}
# revisit our cached_results and this time populate the results with the actual objects
for cache_key, oncall_users in cached_results.items():
schedule_public_primary_key = _get_schedule_public_primary_key_from_schedule_oncall_users_cache_key(cache_key)
schedule = schedules.get(schedule_public_primary_key)
if schedule is None:
# schedule might have been deleted
continue
oncall_users = [
users[user_public_primary_key]
for user_public_primary_key in oncall_users
# filter out any users that we couldn't find in the database (e.g. deleted)
if user_public_primary_key in users
]
results[schedule] = oncall_users
return results
def parse_username_from_string(string: str) -> str:
"""
Parse on-call shift user from the given string
Example input:
[L1] bob@company.com
Example output:
bob@company.com
"""
return re.sub(RE_PRIORITY, "", string.strip(), count=1).strip()
def parse_priority_from_string(string: str) -> int:
"""
Parse on-call shift priority from the given string
Example input:
[L1] @alex @bob
Example output:
1
"""
priority = 0
priority_matches = re.findall(RE_PRIORITY, string.strip())
if len(priority_matches) > 0:
priority = int(priority_matches[0])
if priority < 1:
priority = 0
return priority
def parse_event_uid(string: str, sequence: str = None, recurrence_id: str = None):
pk = None
source = None
source_verbal = None
match = None
if string.startswith("oncall"):
match = RE_EVENT_UID_V2.match(string)
if match:
_, pk, _, _, source = match.groups()
elif string.startswith("amixr"):
# eventually this path would be automatically deprecated
# once all ical representations are refreshed
match = RE_EVENT_UID_V1.match(string)
if match:
_, _, _, source = match.groups()
else:
match = RE_EVENT_UID_EXPORT.match(string)
if match:
pk, _, _ = match.groups()
if not match:
# fallback to use the UID string as the rotation ID
pk = string
# in ical imported calendars, sequence and/or recurrence_id
# distinguish main recurring event vs instance modification
# (see https://icalendar.org/iCalendar-RFC-5545/3-8-4-4-recurrence-id.html)
if sequence:
pk = f"{pk}_{sequence}"
if recurrence_id:
pk = f"{pk}_{recurrence_id}"
if source is not None:
source = int(source)
from apps.schedules.models import CustomOnCallShift
source_verbal = CustomOnCallShift.SOURCE_CHOICES[source][1]
return pk, source_verbal
def get_usernames_from_ical_event(event):
usernames_found = []
priority = parse_priority_from_string(event.get(ICAL_SUMMARY, "[L0]"))
if ICAL_SUMMARY in event:
usernames_found.append(parse_username_from_string(event[ICAL_SUMMARY]))
if ICAL_DESCRIPTION in event:
usernames_found.append(parse_username_from_string(event[ICAL_DESCRIPTION]))
if ICAL_ATTENDEE in event:
if isinstance(event[ICAL_ATTENDEE], str):
# PagerDuty adds only one attendee and in this case event[ICAL_ATTENDEE] is string.
# If several attendees were added to the event than event[ICAL_ATTENDEE] will be list
# (E.g. several invited in Google cal).
usernames_found.append(parse_username_from_string(event[ICAL_ATTENDEE]))
return usernames_found, priority
def get_missing_users_from_ical_event(event, organization: "Organization"):
all_usernames, _ = get_usernames_from_ical_event(event)
users = list(get_users_from_ical_event(event, organization))
found_usernames = [u.username for u in users]
found_emails = [u.email.lower() for u in users]
return [u for u in all_usernames if u != "" and u not in found_usernames and u.lower() not in found_emails]
def get_users_from_ical_event(event, organization: "Organization") -> typing.List["User"]:
usernames_from_ical, _ = get_usernames_from_ical_event(event)
users = []
if len(usernames_from_ical) != 0:
users = memoized_users_in_ical(tuple(usernames_from_ical), organization)
return users
def is_icals_equal_line_by_line(first, second):
first = first.split("\n")
second = second.split("\n")
if len(first) != len(second):
return False
else:
for idx, first_item in enumerate(first):
if first_item.startswith("DTSTAMP"):
continue
else:
second_item = second[idx]
if first_item != second_item:
return False
return True
def is_icals_equal(first, second):
first_cal = Calendar.from_ical(first)
if first_cal.get("PRODID", None) in ("-//My calendar product//amixr//", "-//web schedule//oncall//"):
# Compare schedules generated by oncall line by line, since they not support SEQUENCE field yet.
# But we are sure that same calendars will have same lines, since we are generating it.
return is_icals_equal_line_by_line(first, second)
else:
# Compare external calendars by events, since sometimes they contain different lines even for equal calendars.
second_cal = Calendar.from_ical(second)
first_subcomponents = first_cal.subcomponents
second_subcomponents = second_cal.subcomponents
# only consider VEVENT components
first_cal_events = {
cmp.get("UID", None): cmp.get("SEQUENCE", None) for cmp in first_subcomponents if cmp.name == "VEVENT"
}
second_cal_events = {
cmp.get("UID", None): cmp.get("SEQUENCE", None) for cmp in second_subcomponents if cmp.name == "VEVENT"
}
# check events and their respective sequences are equal
return first_cal_events == second_cal_events
def ical_date_to_datetime(date, tz, start):
datetime_to_combine = datetime.time.min
all_day = False
if type(date) is datetime.date:
all_day = True
calendar_timezone_offset = datetime.datetime.now().astimezone(tz).utcoffset()
date = datetime.datetime.combine(date, datetime_to_combine).astimezone(tz) - calendar_timezone_offset
if not start:
date -= datetime.timedelta(seconds=1)
return date, all_day
def calculate_shift_diff(shifts: list, prev_shifts: list) -> typing.Tuple[bool, list]:
"""
Get shifts diff comparing with the previous shifts
"""
fields_to_compare = ["users", "end", "start", "all_day", "priority_level", "shift"]
shifts_fields = [{k: v for k, v in shift.items() if k in fields_to_compare} for shift in shifts]
prev_shifts_fields = [{k: v for k, v in shift.items() if k in fields_to_compare} for shift in prev_shifts]
shift_changed = len(shifts) != len(prev_shifts)
diff = []
for idx, shift in enumerate(shifts_fields):
if shift not in prev_shifts_fields:
shift_changed = True
diff.append(shifts[idx])
return shift_changed, diff
def get_icalendar_tz_or_utc(icalendar):
calendar_timezone = icalendar.get("X-WR-TIMEZONE", "UTC")
if pytz_timezone := is_valid_timezone(calendar_timezone):
return pytz_timezone
# try to convert the timezone from windows to iana
if (converted_timezone := convert_windows_timezone_to_iana(calendar_timezone)) is None:
return "UTC"
return pytz.timezone(converted_timezone)
def fetch_ical_file_or_get_error(ical_url: str) -> typing.Tuple[str | None, str | None]:
cached_ical_file: str | None = None
ical_file_error: str | None = None
try:
new_ical_file = fetch_ical_file(ical_url)
Calendar.from_ical(new_ical_file)
cached_ical_file = new_ical_file
except requests.exceptions.RequestException:
ical_file_error = "iCal download failed"
except ValueError:
ical_file_error = "wrong iCal"
# TODO: catch icalendar exceptions
return cached_ical_file, ical_file_error
def fetch_ical_file(ical_url: str) -> str:
# without user-agent header google calendar sometimes returns text/html instead of text/calendar
headers = {"User-Agent": "Grafana OnCall"}
r = requests.get(ical_url, headers=headers, timeout=10)
logger.info(f"fetch_ical_file: content-type={r.headers.get('Content-Type')}")
return r.text
def create_base_icalendar(name: str) -> Calendar:
cal = Calendar()
cal.add("calscale", "GREGORIAN")
cal.add("x-wr-calname", name)
cal.add("x-wr-timezone", "UTC")
cal.add("version", "2.0")
cal.add("prodid", "//Grafana Labs//Grafana On-Call//")
# suggested minimum interval for polling for changes
cal.add("REFRESH-INTERVAL;VALUE=DURATION", "PT1H")
return cal
def get_user_events_from_calendars(
ical_obj: Calendar, calendar: Calendar, user: User, name: typing.Optional[str] = None
) -> None:
if calendar:
for component in calendar.walk():
if component.name == "VEVENT":
event_user = get_usernames_from_ical_event(component)
event_user_value = event_user[0][0]
if event_user_value == user.username or event_user_value.lower() == user.email.lower():
if name:
component["SUMMARY"] = "{}: {}".format(name, component["SUMMARY"])
ical_obj.add_component(component)
def _get_ical_data_final_schedule(schedule: "OnCallSchedule") -> str | None:
ical_data = schedule.cached_ical_final_schedule
if ical_data is None:
schedule.refresh_ical_final_schedule()
# casting is safe here. cached_ical_final_schedule is updated inside of refresh_ical_final_schedule
return typing.cast(str, schedule.cached_ical_final_schedule)
return ical_data
def ical_export_from_schedule(schedule: "OnCallSchedule") -> bytes:
ical_data = _get_ical_data_final_schedule(schedule)
return ical_data.encode()
def user_ical_export(user: "User", schedules: "OnCallScheduleQuerySet") -> bytes:
schedule_name = "On-Call Schedule for {0}".format(user.username)
ical_obj = create_base_icalendar(schedule_name)
for schedule in schedules:
name = schedule.name
ical_data = _get_ical_data_final_schedule(schedule)
get_user_events_from_calendars(ical_obj, Calendar.from_ical(ical_data), user, name=name)
return ical_obj.to_ical()
def detect_gaps(intervals: DatetimeIntervals, start: datetime.datetime, end: datetime.datetime) -> DatetimeIntervals:
gaps: DatetimeIntervals = []
intervals = sorted(intervals, key=lambda dt: dt.start)
if len(intervals) > 0:
base_interval = intervals[0]
if base_interval.start > start:
gaps.append(DatetimeInterval(None, base_interval.start))
for interval in intervals[1:]:
overlaps, new_base_interval = merge_if_overlaps(base_interval, interval)
if not overlaps:
gaps.append(DatetimeInterval(base_interval.end, interval.start))
base_interval = new_base_interval
if base_interval.end < end:
gaps.append(DatetimeInterval(base_interval.end, None))
return gaps
def merge_if_overlaps(a: DatetimeInterval, b: DatetimeInterval) -> typing.Tuple[bool, DatetimeInterval]:
if a.end >= b.end:
return True, DatetimeInterval(a.start, a.end)
if b.start - a.end < datetime.timedelta(minutes=1):
return True, DatetimeInterval(a.start, b.end)
else:
return False, DatetimeInterval(b.start, b.end)
def start_end_with_respect_to_all_day(event: IcalEvent, calendar_tz):
start, _ = ical_date_to_datetime(event[ICAL_DATETIME_START].dt, calendar_tz, start=True)
end, _ = ical_date_to_datetime(event[ICAL_DATETIME_END].dt, calendar_tz, start=False)
return start, end
def event_start_end_all_day_with_respect_to_type(event: IcalEvent, calendar_tz):
all_day = False
if type(event[ICAL_DATETIME_START].dt) is datetime.date:
start, end = start_end_with_respect_to_all_day(event, calendar_tz)
all_day = True
else:
start, end = ical_events.get_start_and_end_with_respect_to_event_type(event)
return start, end, all_day
def convert_windows_timezone_to_iana(tz_name: str) -> str | None:
"""
Conversion info taken from https://raw.githubusercontent.com/unicode-org/cldr/main/common/supplemental/windowsZones.xml
Also see https://gist.github.com/mrled/8d29fde758cfc7dd0b52f3bbf2b8f06e
NOTE: This mapping could be updated, and it's technically a guess.
Also, there could probably be issues with DST for some timezones.
"""
windows_to_iana_map = {
"AUS Central Standard Time": "Australia/Darwin",
"AUS Eastern Standard Time": "Australia/Sydney",
"Afghanistan Standard Time": "Asia/Kabul",
"Alaskan Standard Time": "America/Anchorage",
"Aleutian Standard Time": "America/Adak",
"Altai Standard Time": "Asia/Barnaul",
"Arab Standard Time": "Asia/Riyadh",
"Arabian Standard Time": "Asia/Dubai",
"Arabic Standard Time": "Asia/Baghdad",
"Argentina Standard Time": "America/Buenos_Aires",
"Astrakhan Standard Time": "Europe/Astrakhan",
"Atlantic Standard Time": "America/Halifax",
"Aus Central W. Standard Time": "Australia/Eucla",
"Azerbaijan Standard Time": "Asia/Baku",
"Azores Standard Time": "Atlantic/Azores",
"Bahia Standard Time": "America/Bahia",
"Bangladesh Standard Time": "Asia/Dhaka",
"Belarus Standard Time": "Europe/Minsk",
"Bougainville Standard Time": "Pacific/Bougainville",
"Canada Central Standard Time": "America/Regina",
"Cape Verde Standard Time": "Atlantic/Cape_Verde",
"Caucasus Standard Time": "Asia/Yerevan",
"Cen. Australia Standard Time": "Australia/Adelaide",
"Central America Standard Time": "America/Guatemala",
"Central Asia Standard Time": "Asia/Almaty",
"Central Brazilian Standard Time": "America/Cuiaba",
"Central Europe Standard Time": "Europe/Budapest",
"Central European Standard Time": "Europe/Warsaw",
"Central Pacific Standard Time": "Pacific/Guadalcanal",
"Central Standard Time": "America/Chicago",
"Central Standard Time (Mexico)": "America/Mexico_City",
"Chatham Islands Standard Time": "Pacific/Chatham",
"China Standard Time": "Asia/Shanghai",
"Cuba Standard Time": "America/Havana",
"Dateline Standard Time": "Etc/GMT+12",
"E. Africa Standard Time": "Africa/Nairobi",
"E. Australia Standard Time": "Australia/Brisbane",
"E. Europe Standard Time": "Europe/Chisinau",
"E. South America Standard Time": "America/Sao_Paulo",
"Easter Island Standard Time": "Pacific/Easter",
"Eastern Standard Time": "America/New_York",
"Eastern Standard Time (Mexico)": "America/Cancun",
"Egypt Standard Time": "Africa/Cairo",
"Ekaterinburg Standard Time": "Asia/Yekaterinburg",
"FLE Standard Time": "Europe/Kiev",
"Fiji Standard Time": "Pacific/Fiji",
"GMT Standard Time": "Europe/London",
"GTB Standard Time": "Europe/Bucharest",
"Georgian Standard Time": "Asia/Tbilisi",
"Greenland Standard Time": "America/Godthab",
"Greenwich Standard Time": "Atlantic/Reykjavik",
"Haiti Standard Time": "America/Port-au-Prince",
"Hawaiian Standard Time": "Pacific/Honolulu",
"India Standard Time": "Asia/Calcutta",
"Iran Standard Time": "Asia/Tehran",
"Israel Standard Time": "Asia/Jerusalem",
"Jordan Standard Time": "Asia/Amman",
"Kaliningrad Standard Time": "Europe/Kaliningrad",
"Korea Standard Time": "Asia/Seoul",
"Libya Standard Time": "Africa/Tripoli",
"Line Islands Standard Time": "Pacific/Kiritimati",
"Lord Howe Standard Time": "Australia/Lord_Howe",
"Magadan Standard Time": "Asia/Magadan",
"Magallanes Standard Time": "America/Punta_Arenas",
"Marquesas Standard Time": "Pacific/Marquesas",
"Mauritius Standard Time": "Indian/Mauritius",
"Middle East Standard Time": "Asia/Beirut",
"Montevideo Standard Time": "America/Montevideo",
"Morocco Standard Time": "Africa/Casablanca",
"Mountain Standard Time": "America/Denver",
"Mountain Standard Time (Mexico)": "America/Chihuahua",
"Myanmar Standard Time": "Asia/Rangoon",
"N. Central Asia Standard Time": "Asia/Novosibirsk",
"Namibia Standard Time": "Africa/Windhoek",
"Nepal Standard Time": "Asia/Katmandu",
"New Zealand Standard Time": "Pacific/Auckland",
"Newfoundland Standard Time": "America/St_Johns",
"Norfolk Standard Time": "Pacific/Norfolk",
"North Asia East Standard Time": "Asia/Irkutsk",
"North Asia Standard Time": "Asia/Krasnoyarsk",
"North Korea Standard Time": "Asia/Pyongyang",
"Omsk Standard Time": "Asia/Omsk",
"Pacific SA Standard Time": "America/Santiago",
"Pacific Standard Time": "America/Los_Angeles",
"Pacific Standard Time (Mexico)": "America/Tijuana",
"Pakistan Standard Time": "Asia/Karachi",
"Paraguay Standard Time": "America/Asuncion",
"Qyzylorda Standard Time": "Asia/Qyzylorda",
"Romance Standard Time": "Europe/Paris",
"Russia Time Zone 10": "Asia/Srednekolymsk",
"Russia Time Zone 11": "Asia/Kamchatka",
"Russia Time Zone 3": "Europe/Samara",
"Russian Standard Time": "Europe/Moscow",
"SA Eastern Standard Time": "America/Cayenne",
"SA Pacific Standard Time": "America/Bogota",
"SA Western Standard Time": "America/La_Paz",
"SE Asia Standard Time": "Asia/Bangkok",
"Saint Pierre Standard Time": "America/Miquelon",
"Sakhalin Standard Time": "Asia/Sakhalin",
"Samoa Standard Time": "Pacific/Apia",
"Sao Tome Standard Time": "Africa/Sao_Tome",
"Saratov Standard Time": "Europe/Saratov",
"Singapore Standard Time": "Asia/Singapore",
"South Africa Standard Time": "Africa/Johannesburg",
"South Sudan Standard Time": "Africa/Juba",
"Sri Lanka Standard Time": "Asia/Colombo",
"Sudan Standard Time": "Africa/Khartoum",
"Syria Standard Time": "Asia/Damascus",
"Taipei Standard Time": "Asia/Taipei",
"Tasmania Standard Time": "Australia/Hobart",
"Tocantins Standard Time": "America/Araguaina",
"Tokyo Standard Time": "Asia/Tokyo",
"Tomsk Standard Time": "Asia/Tomsk",
"Tonga Standard Time": "Pacific/Tongatapu",
"Transbaikal Standard Time": "Asia/Chita",
"Turkey Standard Time": "Europe/Istanbul",
"Turks And Caicos Standard Time": "America/Grand_Turk",
"US Eastern Standard Time": "America/Indianapolis",
"US Mountain Standard Time": "America/Phoenix",
"UTC": "Etc/UTC",
"UTC+12": "Etc/GMT-12",
"UTC+13": "Etc/GMT-13",
"UTC-02": "Etc/GMT+2",
"UTC-08": "Etc/GMT+8",
"UTC-09": "Etc/GMT+9",
"UTC-11": "Etc/GMT+11",
"Ulaanbaatar Standard Time": "Asia/Ulaanbaatar",
"Venezuela Standard Time": "America/Caracas",
"Vladivostok Standard Time": "Asia/Vladivostok",
"Volgograd Standard Time": "Europe/Volgograd",
"W. Australia Standard Time": "Australia/Perth",
"W. Central Africa Standard Time": "Africa/Lagos",
"W. Europe Standard Time": "Europe/Berlin",
"W. Mongolia Standard Time": "Asia/Hovd",
"West Asia Standard Time": "Asia/Tashkent",
"West Bank Standard Time": "Asia/Hebron",
"West Pacific Standard Time": "Pacific/Port_Moresby",
"Yakutsk Standard Time": "Asia/Yakutsk",
"Yukon Standard Time": "America/Whitehorse",
}
result = windows_to_iana_map.get(tz_name)
logger.debug("Converting the timezone from Windows to IANA. '{}' -> '{}'".format(tz_name, result))
return result