Apply shift swap requests to schedule events (#2677)

Reflect swap requests details in schedule events.
This commit is contained in:
Matias Bordese 2023-07-28 15:53:27 -03:00 committed by GitHub
parent c5ec4093b2
commit ddd98e0c3f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 546 additions and 61 deletions

View file

@ -11,6 +11,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Fix one of the latest migrations failing on SQLite by @vadimkerr ([#2680](https://github.com/grafana/oncall/pull/2680))
### Added
- Apply swap requests details to schedule events ([#2677](https://github.com/grafana/oncall/pull/2677))
## v1.3.18 (2023-07-28)
### Changed

View file

@ -61,8 +61,7 @@ IcalEvents = typing.List[IcalEvent]
def users_in_ical(
usernames_from_ical: typing.List[str],
organization: "Organization",
users_to_filter: typing.Optional["UserQuerySet"] = None,
) -> typing.Sequence["User"]:
) -> "UserQuerySet":
"""
This method returns a sequence of `User` objects, filtered by users whose username, or case-insensitive e-mail,
is present in `usernames_from_ical`. If `include_viewers` is set to `True`, users are further filtered down
@ -74,25 +73,11 @@ def users_in_ical(
A list of usernames present in the ical feed
organization : apps.user_management.models.organization.Organization
The organization in question
include_viewers : bool
Whether or not the list should be further filtered to exclude users based on granted permissions
users_to_filter : typing.Optional[UserQuerySet]
Filter users without making SQL queries if users_to_filter arg is provided
users_to_filter is passed in `apps.schedules.ical_utils.get_oncall_users_for_multiple_schedules`
"""
from apps.user_management.models import User
emails_from_ical = [username.lower() for username in usernames_from_ical]
if users_to_filter is not None:
return list(
{
user
for user in users_to_filter
if user.username in usernames_from_ical or user.email.lower() in emails_from_ical
}
)
# users_found_in_ical = organization.users
users_found_in_ical = organization.users.filter(
**User.build_permissions_query(RBACPermission.Permissions.SCHEDULES_WRITE, organization)
@ -106,9 +91,7 @@ def users_in_ical(
@timed_lru_cache(timeout=100)
def memoized_users_in_ical(
usernames_from_ical: typing.List[str], organization: "Organization"
) -> typing.Sequence["User"]:
def memoized_users_in_ical(usernames_from_ical: typing.List[str], organization: "Organization") -> UserQuerySet:
# using in-memory cache instead of redis to avoid pickling python objects
return users_in_ical(usernames_from_ical, organization)
@ -336,7 +319,6 @@ def list_of_empty_shifts_in_schedule(
def list_users_to_notify_from_ical(
schedule: "OnCallSchedule",
events_datetime: typing.Optional[datetime.datetime] = None,
users_to_filter: typing.Optional["UserQuerySet"] = None,
) -> typing.Sequence["User"]:
"""
Retrieve on-call users for the current time
@ -346,7 +328,6 @@ def list_users_to_notify_from_ical(
schedule,
events_datetime,
events_datetime,
users_to_filter=users_to_filter,
)
@ -354,23 +335,20 @@ def list_users_to_notify_from_ical_for_period(
schedule: "OnCallSchedule",
start_datetime: datetime.datetime,
end_datetime: datetime.datetime,
users_to_filter=None,
) -> typing.Sequence["User"]:
) -> UserQuerySet:
users_found_in_ical: typing.Sequence["User"] = []
events = schedule.final_events(start_datetime, end_datetime)
usernames = []
for event in events:
usernames += [u["email"] for u in event.get("users", [])]
users_found_in_ical = users_in_ical(usernames, schedule.organization, users_to_filter=users_to_filter)
users_found_in_ical = users_in_ical(usernames, schedule.organization)
return users_found_in_ical
def get_oncall_users_for_multiple_schedules(
schedules: "OnCallScheduleQuerySet", events_datetime=None
) -> typing.Dict["OnCallSchedule", typing.List[User]]:
from apps.user_management.models import User
) -> typing.Dict["OnCallSchedule", UserQuerySet]:
if events_datetime is None:
events_datetime = datetime.datetime.now(timezone.utc)
@ -378,35 +356,11 @@ def get_oncall_users_for_multiple_schedules(
if not schedules.exists():
return {}
# Assume all schedules from the queryset belong to the same organization
organization = schedules[0].organization
# Gather usernames from all events from all schedules
usernames = set()
for schedule in schedules.all():
calendars = schedule.get_icalendars()
for calendar in calendars:
if calendar is None:
continue
events = ical_events.get_events_from_ical_between(calendar, events_datetime, events_datetime)
for event in events:
current_usernames, _ = get_usernames_from_ical_event(event)
usernames.update(current_usernames)
# Fetch relevant users from the db
emails = [username.lower() for username in usernames]
users = organization.users.filter(
Q(**User.build_permissions_query(RBACPermission.Permissions.SCHEDULES_WRITE, organization))
& (Q(username__in=usernames) | Q(email__lower__in=emails))
)
# Get on-call users
oncall_users = {}
for schedule in schedules.all():
# pass user list to list_users_to_notify_from_ical
schedule_oncall_users = list_users_to_notify_from_ical(
schedule, events_datetime=events_datetime, users_to_filter=users
)
schedule_oncall_users = list_users_to_notify_from_ical(schedule, events_datetime=events_datetime)
oncall_users.update({schedule.pk: schedule_oncall_users})
return oncall_users

View file

@ -1,3 +1,4 @@
import copy
import datetime
import itertools
import re
@ -92,6 +93,15 @@ class ScheduleEventUser(typing.TypedDict):
avatar_full: str
class SwapRequest(typing.TypedDict):
pk: str
user: typing.Optional[ScheduleEventUser]
class MaybeSwappedScheduleEventUser(ScheduleEventUser):
swap_request: typing.Optional[SwapRequest]
class ScheduleEventShift(typing.TypedDict):
pk: str
@ -100,7 +110,7 @@ class ScheduleEvent(typing.TypedDict):
all_day: bool
start: datetime.datetime
end: datetime.datetime
users: typing.List[ScheduleEventUser]
users: typing.List[MaybeSwappedScheduleEventUser]
missing_users: typing.List[str]
priority_level: typing.Optional[int]
source: typing.Optional[str]
@ -379,7 +389,12 @@ class OnCallSchedule(PolymorphicModel):
events.append(shift_json)
# combine multiple-users same-shift events into one
return self._merge_events(events)
events = self._merge_events(events)
# annotate events with swap request details swapping users as needed
events = self._apply_swap_requests(events, datetime_start, datetime_end)
return events
def final_events(self, datetime_start, datetime_end):
"""Return schedule final events, after resolving shifts and overrides."""
@ -601,6 +616,94 @@ class OnCallSchedule(PolymorphicModel):
"overloaded_users": overloaded_users,
}
def _apply_swap_requests(self, events, datetime_start, datetime_end) -> ScheduleEvents:
"""Apply swap requests details to schedule events."""
# get swaps requests affecting this schedule / time range
swaps = self.shift_swap_requests.filter( # starting before but ongoing
swap_start__lt=datetime_start, swap_end__gte=datetime_start
).union(
self.shift_swap_requests.filter( # starting after but before end
swap_start__gte=datetime_start, swap_start__lte=datetime_end
)
)
swaps = swaps.order_by("created_at")
def _insert_event(index, event):
# add event, if any, to events list in the specified index
# return incremented index if the event was added
if event is None:
return index
events.insert(index, event)
return index + 1
# apply swaps sequentially
for swap in swaps:
i = 0
while i < len(events):
event = events.pop(i)
if event["start"] > swap.swap_end or event["end"] < swap.swap_start:
# event outside the swap period, keep as it is and continue
i = _insert_event(i, event)
continue
users = set(u["pk"] for u in event["users"])
if swap.beneficiary.public_primary_key in users:
# swap request affects current event
split_before = None
if event["start"] < swap.swap_start:
# partially included start -> split
split_before = copy.deepcopy(event)
split_before["end"] = swap.swap_start
# update event to swap
event["start"] = swap.swap_start
split_after = None
if event["end"] > swap.swap_end:
# partially included end -> split
split_after = copy.deepcopy(event)
split_after["start"] = swap.swap_end
# update event to swap
event["end"] = swap.swap_end
# identify user to swap
user_to_swap = None
for u in event["users"]:
if u["pk"] == swap.beneficiary.public_primary_key:
user_to_swap = u
break
# apply swap changes to event user
swap_details = {"pk": swap.public_primary_key}
if swap.benefactor:
# swap is taken, update user in shift
user_to_swap["pk"] = swap.benefactor.public_primary_key
user_to_swap["display_name"] = swap.benefactor.username
user_to_swap["email"] = swap.benefactor.email
user_to_swap["avatar_full"] = swap.benefactor.avatar_full_url
# add beneficiary user to details
swap_details["user"] = {
"display_name": swap.beneficiary.username,
"email": swap.beneficiary.email,
"pk": swap.beneficiary.public_primary_key,
"avatar_full": swap.beneficiary.avatar_full_url,
}
user_to_swap["swap_request"] = swap_details
# update events list
# keep first split event in its original index
i = _insert_event(i, split_before)
# insert updated swap-related event
i = _insert_event(i, event)
# keep second split event after swap
i = _insert_event(i, split_after)
else:
# event for different user(s), keep as it is and continue
i = _insert_event(i, event)
return events
def _resolve_schedule(
self, events: ScheduleEvents, datetime_start: datetime.datetime, datetime_end: datetime.datetime
) -> ScheduleEvents:

View file

@ -7,6 +7,7 @@ from django.db import models
from django.utils import timezone
from apps.schedules import exceptions
from apps.schedules.tasks import refresh_ical_final_schedule
from common.public_primary_keys import generate_public_primary_key, increase_public_primary_key_length
if typing.TYPE_CHECKING:
@ -156,9 +157,13 @@ class ShiftSwapRequest(models.Model):
def delete(self):
self.deleted_at = timezone.now()
self.save()
# make sure final schedule ical representation is updated
refresh_ical_final_schedule.apply_async((self.schedule.pk,))
def hard_delete(self):
super().delete()
# make sure final schedule ical representation is updated
refresh_ical_final_schedule.apply_async((self.schedule.pk,))
def take(self, benefactor: "User") -> None:
if benefactor == self.beneficiary:
@ -169,7 +174,8 @@ class ShiftSwapRequest(models.Model):
self.benefactor = benefactor
self.save()
# TODO: implement the actual override logic in https://github.com/grafana/oncall/issues/2590
# make sure final schedule ical representation is updated
refresh_ical_final_schedule.apply_async((self.schedule.pk,))
# Insight logs
@property

View file

@ -1297,7 +1297,7 @@ def test_get_oncall_users_for_empty_schedule(
schedule = make_schedule(organization, schedule_class=OnCallScheduleCalendar)
schedules = OnCallSchedule.objects.filter(pk=schedule.pk)
assert schedules.get_oncall_users()[schedule.pk] == []
assert list(schedules.get_oncall_users()[schedule.pk]) == []
@pytest.mark.django_db
@ -1412,7 +1412,8 @@ def test_get_oncall_users_for_multiple_schedules_emails_case_insensitive(
schedules = OnCallSchedule.objects.filter(pk=schedule.pk)
oncall_users = schedules.get_oncall_users(events_datetime=events_datetime)
assert oncall_users == {schedule.pk: [user]}
assert len(oncall_users) == 1
assert list(oncall_users[schedule.pk]) == [user]
@pytest.mark.django_db

View file

@ -1837,3 +1837,414 @@ def test_event_until_non_utc(make_organization, make_schedule):
# check this works without raising exception
datetime_end = now + timezone.timedelta(days=7)
schedule.final_events(now, datetime_end)
@pytest.mark.django_db
@pytest.mark.parametrize("swap_taken", [False, True])
def test_swap_request_split_start(
make_organization,
make_user_for_organization,
make_schedule,
make_on_call_shift,
make_shift_swap_request,
swap_taken,
):
organization = make_organization()
user = make_user_for_organization(organization)
other_user = make_user_for_organization(organization)
schedule = make_schedule(organization, schedule_class=OnCallScheduleWeb)
today = timezone.now().replace(hour=0, minute=0, second=0, microsecond=0)
start = today + timezone.timedelta(hours=12)
duration = timezone.timedelta(hours=3)
data = {
"start": start,
"rotation_start": start,
"duration": duration,
"priority_level": 1,
"frequency": CustomOnCallShift.FREQUENCY_DAILY,
"schedule": schedule,
}
on_call_shift = make_on_call_shift(
organization=organization, shift_type=CustomOnCallShift.TYPE_ROLLING_USERS_EVENT, **data
)
on_call_shift.add_rolling_users([[user]])
tomorrow = today + timezone.timedelta(days=1)
# setup swap request
swap_request = make_shift_swap_request(
schedule,
user,
swap_start=tomorrow + timezone.timedelta(hours=13),
swap_end=tomorrow + timezone.timedelta(hours=18),
)
if swap_taken:
swap_request.take(other_user)
events = schedule.filter_events(today, today + timezone.timedelta(days=2))
expected = [
# start, end, swap requested
(start, start + duration, False), # today shift unchanged
(start + timezone.timedelta(days=1), start + timezone.timedelta(days=1, hours=1), False), # first split
(
start + timezone.timedelta(days=1, hours=1),
start + timezone.timedelta(days=1, hours=3),
True,
), # second split
]
returned = [(e["start"], e["end"], bool(e["users"][0].get("swap_request", False))) for e in events]
assert returned == expected
# check swap request details
assert events[2]["users"][0]["swap_request"]["pk"] == swap_request.public_primary_key
if swap_taken:
assert events[2]["users"][0]["pk"] == other_user.public_primary_key
assert events[2]["users"][0]["swap_request"]["user"]["pk"] == user.public_primary_key
else:
assert events[2]["users"][0]["pk"] == user.public_primary_key
@pytest.mark.django_db
@pytest.mark.parametrize("swap_taken", [False, True])
def test_swap_request_split_end(
make_organization,
make_user_for_organization,
make_schedule,
make_on_call_shift,
make_shift_swap_request,
swap_taken,
):
organization = make_organization()
user = make_user_for_organization(organization)
other_user = make_user_for_organization(organization)
schedule = make_schedule(organization, schedule_class=OnCallScheduleWeb)
today = timezone.now().replace(hour=0, minute=0, second=0, microsecond=0)
start = today + timezone.timedelta(hours=12)
duration = timezone.timedelta(hours=3)
data = {
"start": start,
"rotation_start": start,
"duration": duration,
"priority_level": 1,
"frequency": CustomOnCallShift.FREQUENCY_DAILY,
"schedule": schedule,
}
on_call_shift = make_on_call_shift(
organization=organization, shift_type=CustomOnCallShift.TYPE_ROLLING_USERS_EVENT, **data
)
on_call_shift.add_rolling_users([[user]])
tomorrow = today + timezone.timedelta(days=1)
# setup swap request
swap_request = make_shift_swap_request(
schedule,
user,
swap_start=tomorrow + timezone.timedelta(hours=10),
swap_end=tomorrow + timezone.timedelta(hours=13),
)
if swap_taken:
swap_request.take(other_user)
events = schedule.filter_events(today, today + timezone.timedelta(days=2))
expected = [
# start, end, swap requested
(start, start + duration, False), # today shift unchanged
(start + timezone.timedelta(days=1), start + timezone.timedelta(days=1, hours=1), True), # first split
(
start + timezone.timedelta(days=1, hours=1),
start + timezone.timedelta(days=1, hours=3),
False,
), # second split
]
returned = [(e["start"], e["end"], bool(e["users"][0].get("swap_request", False))) for e in events]
assert returned == expected
# check swap request details
assert events[1]["users"][0]["swap_request"]["pk"] == swap_request.public_primary_key
if swap_taken:
assert events[1]["users"][0]["pk"] == other_user.public_primary_key
assert events[1]["users"][0]["swap_request"]["user"]["pk"] == user.public_primary_key
else:
assert events[1]["users"][0]["pk"] == user.public_primary_key
@pytest.mark.django_db
@pytest.mark.parametrize("swap_taken", [False, True])
def test_swap_request_split_both(
make_organization,
make_user_for_organization,
make_schedule,
make_on_call_shift,
make_shift_swap_request,
swap_taken,
):
organization = make_organization()
user = make_user_for_organization(organization)
other_user = make_user_for_organization(organization)
schedule = make_schedule(organization, schedule_class=OnCallScheduleWeb)
today = timezone.now().replace(hour=0, minute=0, second=0, microsecond=0)
start = today + timezone.timedelta(hours=12)
duration = timezone.timedelta(hours=3)
data = {
"start": start,
"rotation_start": start,
"duration": duration,
"priority_level": 1,
"frequency": CustomOnCallShift.FREQUENCY_DAILY,
"schedule": schedule,
}
on_call_shift = make_on_call_shift(
organization=organization, shift_type=CustomOnCallShift.TYPE_ROLLING_USERS_EVENT, **data
)
on_call_shift.add_rolling_users([[user]])
tomorrow = today + timezone.timedelta(days=1)
# setup swap request
swap_request = make_shift_swap_request(
schedule,
user,
swap_start=tomorrow + timezone.timedelta(hours=13),
swap_end=tomorrow + timezone.timedelta(hours=14),
)
if swap_taken:
swap_request.take(other_user)
events = schedule.filter_events(today, today + timezone.timedelta(days=2))
expected = [
# start, end, swap requested
(start, start + duration, False), # today shift unchanged
(start + timezone.timedelta(days=1), start + timezone.timedelta(days=1, hours=1), False), # first split
(
start + timezone.timedelta(days=1, hours=1),
start + timezone.timedelta(days=1, hours=2),
True,
), # second split
(
start + timezone.timedelta(days=1, hours=2),
start + timezone.timedelta(days=1, hours=3),
False,
), # third split
]
returned = [(e["start"], e["end"], bool(e["users"][0].get("swap_request", False))) for e in events]
assert returned == expected
# check swap request details
assert events[2]["users"][0]["swap_request"]["pk"] == swap_request.public_primary_key
if swap_taken:
assert events[2]["users"][0]["pk"] == other_user.public_primary_key
assert events[2]["users"][0]["swap_request"]["user"]["pk"] == user.public_primary_key
else:
assert events[2]["users"][0]["pk"] == user.public_primary_key
# check cached final schedule reflects swap
# force final schedule export to consider 2 days only
with patch("apps.schedules.models.on_call_schedule.EXPORT_WINDOW_DAYS_AFTER", 2):
with patch("apps.schedules.models.on_call_schedule.EXPORT_WINDOW_DAYS_BEFORE", 0):
schedule.refresh_ical_final_schedule()
assert schedule.cached_ical_final_schedule
expected_events = [
# start, end, user
(start, start + duration, user.username), # today shift unchanged
(start + timezone.timedelta(days=1), start + timezone.timedelta(days=1, hours=1), user.username), # first split
(
start + timezone.timedelta(days=1, hours=1),
start + timezone.timedelta(days=1, hours=2),
other_user.username if swap_taken else user.username,
), # second split
(
start + timezone.timedelta(days=1, hours=2),
start + timezone.timedelta(days=1, hours=3),
user.username,
), # third split
]
calendar = icalendar.Calendar.from_ical(schedule.cached_ical_final_schedule)
for component in calendar.walk():
if component.name == ICAL_COMPONENT_VEVENT:
event = (
component[ICAL_DATETIME_START].dt,
component[ICAL_DATETIME_END].dt,
component[ICAL_SUMMARY],
)
assert event in expected_events
@pytest.mark.django_db
@pytest.mark.parametrize("swap_taken", [False, True])
def test_swap_request_whole_shift(
make_organization,
make_user_for_organization,
make_schedule,
make_on_call_shift,
make_shift_swap_request,
swap_taken,
):
organization = make_organization()
user = make_user_for_organization(organization)
other_user = make_user_for_organization(organization)
schedule = make_schedule(organization, schedule_class=OnCallScheduleWeb)
today = timezone.now().replace(hour=0, minute=0, second=0, microsecond=0)
start = today + timezone.timedelta(hours=12)
duration = timezone.timedelta(hours=3)
data = {
"start": start,
"rotation_start": start,
"duration": duration,
"priority_level": 1,
"frequency": CustomOnCallShift.FREQUENCY_DAILY,
"schedule": schedule,
}
on_call_shift = make_on_call_shift(
organization=organization, shift_type=CustomOnCallShift.TYPE_ROLLING_USERS_EVENT, **data
)
on_call_shift.add_rolling_users([[user]])
tomorrow = today + timezone.timedelta(days=1)
# setup swap request
swap_request = make_shift_swap_request(
schedule,
user,
swap_start=tomorrow + timezone.timedelta(hours=12),
swap_end=tomorrow + timezone.timedelta(hours=15),
)
if swap_taken:
swap_request.take(other_user)
events = schedule.filter_events(today, today + timezone.timedelta(days=2))
expected = [
# start, end, swap requested
(start, start + duration, False), # today shift unchanged
(start + timezone.timedelta(days=1), start + timezone.timedelta(days=1, hours=3), True), # no splits
]
returned = [(e["start"], e["end"], bool(e["users"][0].get("swap_request", False))) for e in events]
assert returned == expected
# check swap request details
assert events[1]["users"][0]["swap_request"]["pk"] == swap_request.public_primary_key
if swap_taken:
assert events[1]["users"][0]["pk"] == other_user.public_primary_key
assert events[1]["users"][0]["swap_request"]["user"]["pk"] == user.public_primary_key
else:
assert events[1]["users"][0]["pk"] == user.public_primary_key
@pytest.mark.django_db
@pytest.mark.parametrize("swap_taken", [False, True])
def test_swap_request_partial_replace(
make_organization,
make_user_for_organization,
make_schedule,
make_on_call_shift,
make_shift_swap_request,
swap_taken,
):
organization = make_organization()
user = make_user_for_organization(organization)
another_user = make_user_for_organization(organization)
other_user = make_user_for_organization(organization)
schedule = make_schedule(organization, schedule_class=OnCallScheduleWeb)
today = timezone.now().replace(hour=0, minute=0, second=0, microsecond=0)
start = today + timezone.timedelta(hours=12)
duration = timezone.timedelta(hours=3)
data = {
"start": start,
"rotation_start": start,
"duration": duration,
"priority_level": 1,
"frequency": CustomOnCallShift.FREQUENCY_DAILY,
"schedule": schedule,
}
on_call_shift = make_on_call_shift(
organization=organization, shift_type=CustomOnCallShift.TYPE_ROLLING_USERS_EVENT, **data
)
on_call_shift.add_rolling_users([[user, another_user]])
tomorrow = today + timezone.timedelta(days=1)
# setup swap request
swap_request = make_shift_swap_request(
schedule,
user,
swap_start=tomorrow + timezone.timedelta(hours=10),
swap_end=tomorrow + timezone.timedelta(hours=13),
)
if swap_taken:
swap_request.take(other_user)
events = schedule.filter_events(today, today + timezone.timedelta(days=2))
expected = [
# start, end, swap requested
(start, start + duration, False), # today shift unchanged
(start + timezone.timedelta(days=1), start + timezone.timedelta(days=1, hours=1), True), # first split
(
start + timezone.timedelta(days=1, hours=1),
start + timezone.timedelta(days=1, hours=3),
False,
), # second split
]
expected_user = user
if swap_taken:
expected_user = other_user
returned = [
(
e["start"],
e["end"],
bool([u for u in e["users"] if u["pk"] == expected_user.public_primary_key and u.get("swap_request")]),
)
for e in events
]
assert returned == expected
# check swap request details
user_pks = [u["pk"] for u in events[1]["users"]]
assert expected_user.public_primary_key in user_pks
if swap_taken:
for u in events[1]["users"]:
if u["pk"] == expected_user:
assert u["swap_request"]["pk"] == swap_request.public_primary_key
assert u["swap_request"]["user"]["pk"] == user.public_primary_key
@pytest.mark.django_db
def test_swap_request_no_changes(
make_organization,
make_user_for_organization,
make_schedule,
make_on_call_shift,
make_shift_swap_request,
):
organization = make_organization()
user = make_user_for_organization(organization)
other_user = make_user_for_organization(organization)
schedule = make_schedule(organization, schedule_class=OnCallScheduleWeb)
today = timezone.now().replace(hour=0, minute=0, second=0, microsecond=0)
start = today + timezone.timedelta(hours=12)
duration = timezone.timedelta(hours=3)
data = {
"start": start,
"rotation_start": start,
"duration": duration,
"priority_level": 1,
"frequency": CustomOnCallShift.FREQUENCY_DAILY,
"schedule": schedule,
}
on_call_shift = make_on_call_shift(
organization=organization, shift_type=CustomOnCallShift.TYPE_ROLLING_USERS_EVENT, **data
)
on_call_shift.add_rolling_users([[user]])
events_before = schedule.filter_events(today, today + timezone.timedelta(days=2))
# setup swap requests
tomorrow = today + timezone.timedelta(days=1)
make_shift_swap_request(schedule, other_user, swap_start=today, swap_end=tomorrow)
make_shift_swap_request(schedule, user, swap_start=today, swap_end=tomorrow, deleted_at=today)
make_shift_swap_request(
schedule, user, swap_start=today - timezone.timedelta(days=7), swap_end=tomorrow - timezone.timedelta(days=7)
)
events_after = schedule.filter_events(today, today + timezone.timedelta(days=2))
assert events_before == events_after

View file

@ -1,4 +1,5 @@
import datetime
from unittest.mock import patch
import pytest
@ -10,11 +11,15 @@ from apps.schedules.models import ShiftSwapRequest
def test_soft_delete(shift_swap_request_setup):
ssr, _, _ = shift_swap_request_setup()
assert ssr.deleted_at is None
ssr.delete()
with patch("apps.schedules.models.shift_swap_request.refresh_ical_final_schedule") as mock_refresh_final:
ssr.delete()
ssr.refresh_from_db()
assert ssr.deleted_at is not None
assert mock_refresh_final.apply_async.called_with((ssr.schedule.pk,))
assert ShiftSwapRequest.objects.all().count() == 0
assert ShiftSwapRequest.objects_with_deleted.all().count() == 1
@ -65,12 +70,13 @@ def test_take(shift_swap_request_setup) -> None:
ssr, _, benefactor = shift_swap_request_setup()
original_updated_at = ssr.updated_at
ssr.take(benefactor)
with patch("apps.schedules.models.shift_swap_request.refresh_ical_final_schedule") as mock_refresh_final:
ssr.take(benefactor)
assert ssr.benefactor == benefactor
assert ssr.updated_at != original_updated_at
# TODO:
# final schedule refresh was triggered
assert mock_refresh_final.apply_async.called_with((ssr.schedule.pk,))
@pytest.mark.django_db