Merge remote-tracking branch 'origin/on-call-shifts-update-logic' into new-schedules

This commit is contained in:
Maxim 2022-09-01 11:28:30 +03:00
commit abf06833db
6 changed files with 328 additions and 25 deletions

View file

@ -0,0 +1,14 @@
import re
ICAL_DATETIME_START = "DTSTART"
ICAL_DATETIME_END = "DTEND"
ICAL_DATETIME_STAMP = "DTSTAMP"
ICAL_SUMMARY = "SUMMARY"
ICAL_DESCRIPTION = "DESCRIPTION"
ICAL_ATTENDEE = "ATTENDEE"
ICAL_UID = "UID"
ICAL_RRULE = "RRULE"
ICAL_UNTIL = "UNTIL"
RE_PRIORITY = re.compile(r"^\[L(\d)\]")
RE_EVENT_UID_V1 = re.compile(r"amixr-([\w\d-]+)-U(\d+)-E(\d+)-S(\d+)")
RE_EVENT_UID_V2 = re.compile(r"oncall-([\w\d-]+)-PK([\w\d]+)-U(\d+)-E(\d+)-S(\d+)")

View file

@ -1,11 +1,22 @@
from collections import defaultdict
from datetime import datetime
from typing import List
from typing import List, Tuple
from django.apps import apps
from django.utils import timezone
from icalendar import Calendar, Event
from recurring_ical_events import UnfoldableCalendar, compare_greater, is_event, time_span_contains_event
from apps.schedules.constants import (
ICAL_DATETIME_END,
ICAL_DATETIME_STAMP,
ICAL_DATETIME_START,
ICAL_RRULE,
ICAL_UID,
ICAL_UNTIL,
RE_EVENT_UID_V1,
RE_EVENT_UID_V2,
)
from apps.schedules.ical_events.proxy.ical_proxy import IcalService
EXTRA_LOOKUP_DAYS = 16
@ -19,6 +30,17 @@ class AmixrUnfoldableCalendar(UnfoldableCalendar):
So i took part of code from 0.1.20b0 but leave 0.1.16b in requirements.
"""
class RepeatedEvent(UnfoldableCalendar.RepeatedEvent):
class Repetition(UnfoldableCalendar.RepeatedEvent.Repetition):
"""
A repetition of an event. Overridden version of
recurring_ical_events.UnfoldableCalendar.RepeatedEvent.Repetition. This is overridden to remove the 'RRULE'
param from ATTRIBUTES_TO_DELETE_ON_COPY, because the 'UNTIL' param must be stored in repetition events to
calculate its end date.
"""
ATTRIBUTES_TO_DELETE_ON_COPY = ["RDATE", "EXDATE"]
def between(self, start, stop):
"""Return events at a time between start (inclusive) and end (inclusive)"""
span_start = self.to_datetime(start)
@ -83,6 +105,29 @@ class AmixrRecurringIcalEventsAdapter(IcalService):
)
def filter_extra_days(event):
return time_span_contains_event(start_date, end_date, event["DTSTART"].dt, event["DTEND"].dt)
event_start, event_end = self.get_start_and_end_with_respect_to_event_type(event)
return time_span_contains_event(start_date, end_date, event_start, event_end)
return list(filter(filter_extra_days, events))
def get_start_and_end_with_respect_to_event_type(self, event: Event) -> Tuple[timezone.datetime, timezone.datetime]:
"""
Calculate start and end datetime
"""
CustomOnCallShift = apps.get_model("schedules", "CustomOnCallShift")
start = event[ICAL_DATETIME_START].dt
end = event[ICAL_DATETIME_END].dt
match = RE_EVENT_UID_V2.match(event[ICAL_UID]) or RE_EVENT_UID_V1.match(event[ICAL_UID])
# use different calculation rule for events from custom shifts generated at web
if match and int(match.groups()[-1]) == CustomOnCallShift.SOURCE_WEB:
rotation_start = event[ICAL_DATETIME_STAMP].dt
until_rrule = event.get(ICAL_RRULE, {}).get(ICAL_UNTIL)
if until_rrule:
until = until_rrule[0]
end = min(end, until)
start = max(start, rotation_start)
return start, end

View file

@ -1,7 +1,8 @@
from abc import ABC, abstractmethod
from datetime import datetime
from typing import List
from typing import List, Tuple
from django.utils import timezone
from icalendar import Calendar, Event
@ -10,6 +11,10 @@ class IcalService(ABC):
def get_events_from_ical_between(self, calendar: Calendar, start_date: datetime, end_date: datetime) -> List[Event]:
raise NotImplementedError
@abstractmethod
def get_start_and_end_with_respect_to_event_type(self, event: Event) -> Tuple[timezone.datetime, timezone.datetime]:
raise NotImplementedError
class IcalProxy(IcalService):
def __init__(self, ical_adapter: IcalService):
@ -17,3 +22,6 @@ class IcalProxy(IcalService):
def get_events_from_ical_between(self, calendar: Calendar, start_date: datetime, end_date: datetime) -> List[Event]:
return self.ical_adapter.get_events_from_ical_between(calendar, start_date, end_date)
def get_start_and_end_with_respect_to_event_type(self, event: Event) -> Tuple[timezone.datetime, timezone.datetime]:
return self.ical_adapter.get_start_and_end_with_respect_to_event_type(event)

View file

@ -13,6 +13,17 @@ from django.db.models import Q
from django.utils import timezone
from icalendar import Calendar
from apps.schedules.constants import (
ICAL_ATTENDEE,
ICAL_DATETIME_END,
ICAL_DATETIME_START,
ICAL_DESCRIPTION,
ICAL_SUMMARY,
ICAL_UID,
RE_EVENT_UID_V1,
RE_EVENT_UID_V2,
RE_PRIORITY,
)
from apps.schedules.ical_events import ical_events
from common.constants.role import Role
from common.utils import timed_lru_cache
@ -68,15 +79,6 @@ def memoized_users_in_ical(usernames_from_ical, organization):
return users_in_ical(usernames_from_ical, organization)
ICAL_DATETIME_START = "DTSTART"
ICAL_DATETIME_END = "DTEND"
ICAL_SUMMARY = "SUMMARY"
ICAL_DESCRIPTION = "DESCRIPTION"
ICAL_ATTENDEE = "ATTENDEE"
ICAL_UID = "UID"
RE_PRIORITY = re.compile(r"^\[L(\d)\]")
RE_EVENT_UID_V1 = re.compile(r"amixr-([\w\d-]+)-U(\d+)-E(\d+)-S(\d+)")
RE_EVENT_UID_V2 = re.compile(r"oncall-([\w\d-]+)-PK([\w\d]+)-U(\d+)-E(\d+)-S(\d+)")
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
@ -187,13 +189,11 @@ def get_shifts_dict(calendar, calendar_type, schedule, datetime_start, datetime_
}
)
else:
start = event[ICAL_DATETIME_START].dt.astimezone(pytz.UTC)
end = event[ICAL_DATETIME_END].dt.astimezone(pytz.UTC)
start, end = ical_events.get_start_and_end_with_respect_to_event_type(event)
result_datetime.append(
{
"start": start,
"end": end,
"start": start.astimezone(pytz.UTC),
"end": end.astimezone(pytz.UTC),
"users": users,
"missing_users": missing_users,
"priority": priority,

View file

@ -280,7 +280,7 @@ class CustomOnCallShift(models.Model):
# rolling_users shift converts to several ical events
if self.type in (CustomOnCallShift.TYPE_ROLLING_USERS_EVENT, CustomOnCallShift.TYPE_OVERRIDE):
# generate initial iCal for counting rotation start date
event_ical = self.generate_ical(self.start, user_counter=0)
event_ical = self.generate_ical(self.start)
rotations_created = 0
all_rotation_checked = False
@ -301,13 +301,16 @@ class CustomOnCallShift(models.Model):
if not start: # means that rotation ends before next event starts
all_rotation_checked = True
break
elif start >= self.rotation_start: # event has already started, generate iCal for each user
elif (
self.source == CustomOnCallShift.SOURCE_WEB and start + self.duration > self.rotation_start
) or start >= self.rotation_start:
# event has already started, generate iCal for each user
for user_counter, user in enumerate(users, start=1):
event_ical = self.generate_ical(start, user_counter, user, counter, time_zone)
result += event_ical
rotations_created += 1
else: # generate default iCal to calculate the date for the next rotation
event_ical = self.generate_ical(start, user_counter=0)
event_ical = self.generate_ical(start)
if rotations_created == len(users_queue): # means that we generated iCal for every user group
all_rotation_checked = True
@ -319,14 +322,14 @@ class CustomOnCallShift(models.Model):
result += self.generate_ical(self.start, user_counter, user, time_zone=time_zone)
return result
def generate_ical(self, start, user_counter, user=None, counter=1, time_zone="UTC"):
def generate_ical(self, start, user_counter=0, user=None, counter=1, time_zone="UTC"):
event = Event()
event["uid"] = f"oncall-{self.uuid}-PK{self.public_primary_key}-U{user_counter}-E{counter}-S{self.source}"
if user:
event.add("summary", self.get_summary_with_user_for_ical(user))
event.add("dtstart", self.convert_dt_to_schedule_timezone(start, time_zone))
event.add("dtend", self.convert_dt_to_schedule_timezone(start + self.duration, time_zone))
event.add("dtstamp", timezone.now())
event.add("dtstamp", self.rotation_start)
if self.event_ical_rules:
event.add("rrule", self.event_ical_rules)
try:
@ -407,7 +410,9 @@ class CustomOnCallShift(models.Model):
for event in ical_iter:
if end_date: # end_date exists for long events with frequency weekly and monthly
if end_date >= event.start >= next_event_start:
if event.start >= self.rotation_start:
if (
self.source == CustomOnCallShift.SOURCE_WEB and event.stop > self.rotation_start
) or event.start >= self.rotation_start:
next_event = event
break
else:

View file

@ -477,7 +477,7 @@ def test_rolling_users_with_diff_start_and_rotation_start_daily(
"duration": timezone.timedelta(seconds=1800),
"frequency": CustomOnCallShift.FREQUENCY_DAILY,
"schedule": schedule,
"until": now + timezone.timedelta(days=6, minutes=1),
"until": now + timezone.timedelta(days=6, minutes=10),
}
rolling_users = [[user_1], [user_2], [user_3]]
on_call_shift = make_on_call_shift(
@ -535,7 +535,7 @@ def test_rolling_users_with_diff_start_and_rotation_start_weekly(
"duration": timezone.timedelta(seconds=1800),
"frequency": CustomOnCallShift.FREQUENCY_WEEKLY,
"schedule": schedule,
"until": now + timezone.timedelta(days=42, minutes=1),
"until": now + timezone.timedelta(days=42, minutes=10),
}
rolling_users = [[user_1], [user_2], [user_3]]
on_call_shift = make_on_call_shift(
@ -767,6 +767,237 @@ def test_rolling_users_with_diff_start_and_rotation_start_monthly_by_monthday(
assert len(users_on_call) == 0
@pytest.mark.django_db
def test_get_oncall_users_with_respect_to_rotation_start_and_until_dates_hourly(
make_organization_and_user,
make_on_call_shift,
make_schedule,
):
"""Test calculation start and end event dates for one event with respect to rotation start and until"""
organization, user = make_organization_and_user()
schedule = make_schedule(organization, schedule_class=OnCallScheduleWeb)
now = timezone.now().replace(microsecond=0)
data = {
"priority_level": 1,
"start": now,
"rotation_start": now + timezone.timedelta(minutes=10),
"duration": timezone.timedelta(hours=1),
"frequency": CustomOnCallShift.FREQUENCY_HOURLY,
"schedule": schedule,
"until": now + timezone.timedelta(minutes=40),
"source": CustomOnCallShift.SOURCE_WEB,
}
rolling_users = [[user]]
on_call_shift = make_on_call_shift(
organization=organization, shift_type=CustomOnCallShift.TYPE_ROLLING_USERS_EVENT, **data
)
on_call_shift.add_rolling_users(rolling_users)
date = now + timezone.timedelta(minutes=2)
user_on_call_dates = [date + timezone.timedelta(minutes=10), date + timezone.timedelta(minutes=35)]
nobody_on_call_dates = [
date, # less than rotation start
date + timezone.timedelta(minutes=5), # less than rotation start
date + timezone.timedelta(minutes=40), # higher than until
]
for dt in user_on_call_dates:
users_on_call = list_users_to_notify_from_ical(schedule, dt)
assert len(users_on_call) == 1
assert user in users_on_call
for dt in nobody_on_call_dates:
users_on_call = list_users_to_notify_from_ical(schedule, dt)
assert len(users_on_call) == 0
@pytest.mark.django_db
def test_get_oncall_users_with_respect_to_rotation_start_and_until_dates_daily(
make_organization_and_user,
make_on_call_shift,
make_schedule,
):
"""Test calculation start and end event dates for one event with respect to rotation start and until"""
organization, user = make_organization_and_user()
schedule = make_schedule(organization, schedule_class=OnCallScheduleWeb)
now = timezone.now().replace(microsecond=0)
data = {
"priority_level": 1,
"start": now,
"rotation_start": now + timezone.timedelta(hours=5),
"duration": timezone.timedelta(days=1),
"frequency": CustomOnCallShift.FREQUENCY_DAILY,
"schedule": schedule,
"until": now + timezone.timedelta(hours=15),
"source": CustomOnCallShift.SOURCE_WEB,
}
rolling_users = [[user]]
on_call_shift = make_on_call_shift(
organization=organization, shift_type=CustomOnCallShift.TYPE_ROLLING_USERS_EVENT, **data
)
on_call_shift.add_rolling_users(rolling_users)
date = now + timezone.timedelta(minutes=5)
user_on_call_dates = [date + timezone.timedelta(hours=5), date + timezone.timedelta(hours=10)]
nobody_on_call_dates = [
date, # less than rotation start
date + timezone.timedelta(hours=4), # less than rotation start
date + timezone.timedelta(hours=15), # higher than until
]
for dt in user_on_call_dates:
users_on_call = list_users_to_notify_from_ical(schedule, dt)
assert len(users_on_call) == 1
assert user in users_on_call
for dt in nobody_on_call_dates:
users_on_call = list_users_to_notify_from_ical(schedule, dt)
assert len(users_on_call) == 0
@pytest.mark.django_db
def test_get_oncall_users_with_respect_to_rotation_start_and_until_dates_weekly(
make_organization_and_user,
make_on_call_shift,
make_schedule,
):
"""Test calculation start and end event dates for one event with respect to rotation start and until"""
organization, user = make_organization_and_user()
# simple weekly event
schedule = make_schedule(organization, schedule_class=OnCallScheduleWeb)
now = timezone.now().replace(microsecond=0)
data = {
"priority_level": 1,
"start": now,
"rotation_start": now + timezone.timedelta(days=1),
"duration": timezone.timedelta(days=7),
"frequency": CustomOnCallShift.FREQUENCY_WEEKLY,
"schedule": schedule,
"until": now + timezone.timedelta(days=6),
"week_start": now.weekday(),
"source": CustomOnCallShift.SOURCE_WEB,
}
rolling_users = [[user]]
on_call_shift = make_on_call_shift(
organization=organization, shift_type=CustomOnCallShift.TYPE_ROLLING_USERS_EVENT, **data
)
on_call_shift.add_rolling_users(rolling_users)
date = now + timezone.timedelta(minutes=5)
user_on_call_dates = [date + timezone.timedelta(days=1), date + timezone.timedelta(days=5)]
nobody_on_call_dates = [
date, # less than rotation start
date + timezone.timedelta(hours=23), # less than rotation start
date + timezone.timedelta(days=6), # higher than until
]
for dt in user_on_call_dates:
users_on_call = list_users_to_notify_from_ical(schedule, dt)
assert len(users_on_call) == 1
assert user in users_on_call
for dt in nobody_on_call_dates:
users_on_call = list_users_to_notify_from_ical(schedule, dt)
assert len(users_on_call) == 0
# weekly event with by_day
schedule_2 = make_schedule(organization, schedule_class=OnCallScheduleWeb)
today_weekday = now.weekday()
weekdays = [today_weekday, (today_weekday + 1) % 7, (today_weekday + 2) % 7, (today_weekday + 5) % 7]
by_day = [CustomOnCallShift.ICAL_WEEKDAY_MAP[day] for day in weekdays]
data = {
"priority_level": 1,
"start": now,
"rotation_start": now + timezone.timedelta(days=1),
"duration": timezone.timedelta(hours=12),
"frequency": CustomOnCallShift.FREQUENCY_WEEKLY,
"schedule": schedule_2,
"until": now + timezone.timedelta(days=4, hours=23),
"week_start": today_weekday,
"by_day": by_day,
"source": CustomOnCallShift.SOURCE_WEB,
}
on_call_shift_2 = make_on_call_shift(
organization=organization, shift_type=CustomOnCallShift.TYPE_ROLLING_USERS_EVENT, **data
)
on_call_shift_2.add_rolling_users(rolling_users)
date = now + timezone.timedelta(minutes=5)
user_on_call_dates = [date + timezone.timedelta(days=1), date + timezone.timedelta(days=2)]
nobody_on_call_dates = [
date, # less than rotation start
date + timezone.timedelta(hours=23), # less than rotation start
date + timezone.timedelta(days=3), # out of by_day
date + timezone.timedelta(days=4), # out of by_day
date + timezone.timedelta(days=5), # higher than until
]
for dt in user_on_call_dates:
users_on_call = list_users_to_notify_from_ical(schedule_2, dt)
assert len(users_on_call) == 1
assert user in users_on_call
for dt in nobody_on_call_dates:
users_on_call = list_users_to_notify_from_ical(schedule_2, dt)
assert len(users_on_call) == 0
@pytest.mark.django_db
def test_get_oncall_users_with_respect_to_rotation_start_and_until_dates_monthly(
make_organization_and_user,
make_on_call_shift,
make_schedule,
):
"""Test calculation start and end event dates for one event with respect to rotation start and until"""
organization, user = make_organization_and_user()
schedule = make_schedule(organization, schedule_class=OnCallScheduleWeb)
now = timezone.now().replace(microsecond=0)
data = {
"priority_level": 1,
"start": now,
"rotation_start": now + timezone.timedelta(days=5),
"duration": timezone.timedelta(days=30),
"frequency": CustomOnCallShift.FREQUENCY_MONTHLY,
"schedule": schedule,
"until": now + timezone.timedelta(days=15),
"source": CustomOnCallShift.SOURCE_WEB,
}
rolling_users = [[user]]
on_call_shift = make_on_call_shift(
organization=organization, shift_type=CustomOnCallShift.TYPE_ROLLING_USERS_EVENT, **data
)
on_call_shift.add_rolling_users(rolling_users)
date = now + timezone.timedelta(minutes=5)
user_on_call_dates = [date + timezone.timedelta(days=5), date + timezone.timedelta(days=10)]
nobody_on_call_dates = [
date, # less than rotation start
date + timezone.timedelta(days=4), # less than rotation start
date + timezone.timedelta(days=15), # higher than until
]
for dt in user_on_call_dates:
users_on_call = list_users_to_notify_from_ical(schedule, dt)
assert len(users_on_call) == 1
assert user in users_on_call
for dt in nobody_on_call_dates:
users_on_call = list_users_to_notify_from_ical(schedule, dt)
assert len(users_on_call) == 0
@pytest.mark.django_db
def test_get_oncall_users_for_empty_schedule(
make_organization,