shift swap requests model + CRUD endpoints (#2597)

# What this PR does

This PR should allow us to start working on _most_ of the remaining
tasks for this feature set.
- Adds a basic `ShiftSwapRequest` model + CRUD endpoints. 
- Adds a `POST /api/internal/v1/shift_swaps/<id>/take` endpoint which
allows a benefactor to take a request (only when certain conditions
about the ssr are met)

Closes #2587 

## Checklist

- [x] Unit, integration, and e2e (if applicable) tests updated
- [x] Documentation added (or `pr:no public docs` PR label added if not
required) will be done in #2589
- [x] `CHANGELOG.md` updated (or `pr:no changelog` PR label added if not
required) (will update once we ship the finalized feature set)
This commit is contained in:
Joey Orlando 2023-07-21 21:35:19 +02:00 committed by GitHub
parent 3d708767dc
commit 74b919ee3e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
29 changed files with 1410 additions and 80 deletions

View file

@ -73,6 +73,12 @@ define run_ui_docker_command
$(call run_docker_compose_command,run --rm oncall_ui sh -c '$(1)')
endef
# always use settings.ci-test django settings file when running the tests
# if we use settings.dev it's very possible that some fail just based on the settings alone
define run_backend_tests
$(call run_engine_docker_command,pytest --ds=settings.ci-test $(1))
endef
# touch SQLITE_DB_FILE if it does not exist and DB is eqaul to SQLITE_PROFILE
start: ## start all of the docker containers
ifeq ($(DB),$(SQLITE_PROFILE))
@ -121,9 +127,11 @@ install-precommit-hook: install-pre-commit
pre-commit install
test: ## run backend tests
# always use settings.ci-test django settings file when running the tests
# if we use settings.dev it's very possible that some fail just based on the settings alone
$(call run_engine_docker_command,pytest --ds=settings.ci-test)
$(call run_backend_tests)
test-dev: ## very similar to `test` command, but allows you to pass arbitray args to pytest
## for example, `make test-dev ARGS="--last-failed --pdb"
$(call run_backend_tests,$(ARGS))
start-celery-beat: ## start celery beat
$(call run_engine_docker_command,celery -A engine beat -l info)

View file

@ -265,7 +265,9 @@ class RBACPermission(permissions.BasePermission):
for permission_class, actions in rbac_object_permissions.items():
if action in actions:
return permission_class.has_object_permission(request, view, obj)
return False
# Note: if an endpoint is not found within the rbac_object_permissions dictionary,
# that means object permissions are not relevant to this endpoint. Return True (authorized)
# has_object_permission is called after has_permission, so return True if in view there is not
# RBAC_OBJECT_PERMISSIONS_ATTR attr which mean no additional check involving object required

View file

@ -4,9 +4,8 @@ from apps.api.serializers.schedule_base import ScheduleBaseSerializer
from apps.schedules.models import OnCallScheduleCalendar
from apps.schedules.tasks import schedule_notify_about_empty_shifts_in_schedule, schedule_notify_about_gaps_in_schedule
from apps.slack.models import SlackChannel, SlackUserGroup
from common.api_helpers.custom_fields import OrganizationFilteredPrimaryKeyRelatedField
from common.api_helpers.custom_fields import OrganizationFilteredPrimaryKeyRelatedField, TimeZoneField
from common.api_helpers.utils import validate_ical_url
from common.timezones import TimeZoneField
class ScheduleCalendarSerializer(ScheduleBaseSerializer):

View file

@ -2,8 +2,7 @@ from apps.api.serializers.schedule_base import ScheduleBaseSerializer
from apps.schedules.models import OnCallScheduleWeb
from apps.schedules.tasks import schedule_notify_about_empty_shifts_in_schedule, schedule_notify_about_gaps_in_schedule
from apps.slack.models import SlackChannel, SlackUserGroup
from common.api_helpers.custom_fields import OrganizationFilteredPrimaryKeyRelatedField
from common.timezones import TimeZoneField
from common.api_helpers.custom_fields import OrganizationFilteredPrimaryKeyRelatedField, TimeZoneField
class ScheduleWebSerializer(ScheduleBaseSerializer):

View file

@ -0,0 +1,80 @@
import datetime
from django.utils import timezone
from rest_framework import serializers
from apps.schedules.models import OnCallSchedule, ShiftSwapRequest
from common.api_helpers.custom_fields import OrganizationFilteredPrimaryKeyRelatedField, TimeZoneAwareDatetimeField
from common.api_helpers.mixins import EagerLoadingMixin
class ShiftSwapRequestSerializer(EagerLoadingMixin, serializers.ModelSerializer):
id = serializers.CharField(read_only=True, source="public_primary_key")
schedule = OrganizationFilteredPrimaryKeyRelatedField(queryset=OnCallSchedule.objects)
created_at = TimeZoneAwareDatetimeField(read_only=True)
updated_at = TimeZoneAwareDatetimeField(read_only=True)
swap_start = TimeZoneAwareDatetimeField()
swap_end = TimeZoneAwareDatetimeField()
beneficiary = serializers.CharField(read_only=True, source="beneficiary.public_primary_key")
benefactor = serializers.SerializerMethodField(read_only=True)
SELECT_RELATED = [
"schedule",
"beneficiary",
"benefactor",
]
class Meta:
model = ShiftSwapRequest
fields = [
"id",
"created_at",
"updated_at",
"status",
"schedule",
"swap_start",
"swap_end",
"description",
"beneficiary",
"benefactor",
]
read_only_fields = [
"status",
]
def get_benefactor(self, obj) -> str | None:
return obj.benefactor.public_primary_key if obj.benefactor else None
@staticmethod
def validate_start_and_end_times(swap_start: datetime.datetime, swap_end: datetime.datetime) -> None:
if timezone.now() > swap_start:
raise serializers.ValidationError("swap_start must be a datetime in the future")
if swap_start > swap_end:
raise serializers.ValidationError("swap_end must occur after swap_start")
def validate(self, data):
swap_start = data.get("swap_start", None)
swap_end = data.get("swap_end", None)
if self.partial: # self.partial is true when it's a "partial update" aka PATCH
# if any time related field is specified then we will enforce that they must all be specified
time_fields = [swap_start, swap_end]
any_time_fields_specified = any(time_fields)
all_time_fields_specified = all(time_fields)
if any_time_fields_specified and not all_time_fields_specified:
raise serializers.ValidationError(
"when doing a partial update on time related fields, both start and end times must be specified"
)
elif all_time_fields_specified:
self.validate_start_and_end_times(swap_start, swap_end)
else:
self.validate_start_and_end_times(swap_start, swap_end)
# TODO: we should validate that the beneficiary actually has shifts for the specified schedule
# between swap_start and swap_end
return data

View file

@ -11,10 +11,9 @@ from apps.base.utils import live_settings
from apps.oss_installation.utils import cloud_user_identity_status
from apps.user_management.models import User
from apps.user_management.models.user import default_working_hours
from common.api_helpers.custom_fields import TeamPrimaryKeyRelatedField
from common.api_helpers.custom_fields import TeamPrimaryKeyRelatedField, TimeZoneField
from common.api_helpers.mixins import EagerLoadingMixin
from common.api_helpers.utils import check_phone_number_is_valid
from common.timezones import TimeZoneField
from .custom_serializers import DynamicFieldsModelSerializer
from .organization import FastOrganizationSerializer

View file

@ -286,6 +286,25 @@ class TestRBACPermission:
assert RBACPermission().has_object_permission(request, apiview, None) is True
assert RBACPermission().has_object_permission(request, viewset, None) is True
def test_has_object_permission_returns_true_if_action_omitted_from_rbac_object_permissions(self) -> None:
action1 = "hello"
action2 = "world"
class MockedPermissionClass:
def has_object_permission(self, _req, _view, _obj) -> None:
return True
rbac_object_permissions = {MockedPermissionClass(): (action1,)}
# only action1 is specified in rbac_object_permissions, lets make a request with action2
# we should get back authorized
request = MockedRequest(None, action2)
apiview = MockedAPIView(rbac_object_permissions=rbac_object_permissions)
viewset = MockedViewSet(action2, rbac_object_permissions=rbac_object_permissions)
assert RBACPermission().has_object_permission(request, apiview, None) is True
assert RBACPermission().has_object_permission(request, viewset, None) is True
def test_has_object_permission_works_when_permission_class_specified_for_action(self) -> None:
action = "hello"
mocked_permission_class_response = "asdfasdfasdf"

View file

@ -0,0 +1,696 @@
import datetime
import json
from unittest.mock import patch
import pytest
from django.urls import reverse
from django.utils import timezone
from rest_framework import status
from rest_framework.response import Response
from rest_framework.test import APIClient
from apps.api.permissions import LegacyAccessControlRole
from apps.schedules.models import OnCallScheduleWeb, ShiftSwapRequest
from common.insight_log import EntityEvent
description = "my shift swap request"
tomorrow = timezone.now() + datetime.timedelta(days=1)
two_days_from_now = tomorrow + datetime.timedelta(days=1)
mock_success_response = Response(status=status.HTTP_200_OK)
@pytest.fixture
def ssr_setup(
make_schedule, make_organization_and_user_with_plugin_token, make_user_for_organization, make_shift_swap_request
):
def _ssr_setup(beneficiary_role=None, benefactor_role=None, **kwargs):
organization, beneficiary, token = make_organization_and_user_with_plugin_token(role=beneficiary_role)
benefactor = make_user_for_organization(organization, role=benefactor_role)
schedule = make_schedule(organization, schedule_class=OnCallScheduleWeb)
ssr = make_shift_swap_request(schedule, beneficiary, swap_start=tomorrow, swap_end=two_days_from_now, **kwargs)
return ssr, beneficiary, token, benefactor
return _ssr_setup
def _convert_dt_to_sr(dt: datetime.datetime) -> str:
return dt.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
def _construct_serialized_object(ssr: ShiftSwapRequest, status="open", description=None, benefactor=None):
return {
"id": ssr.public_primary_key,
"created_at": _convert_dt_to_sr(ssr.created_at),
"updated_at": _convert_dt_to_sr(ssr.updated_at),
"schedule": ssr.schedule.public_primary_key,
"swap_start": _convert_dt_to_sr(ssr.swap_start),
"swap_end": _convert_dt_to_sr(ssr.swap_end),
"beneficiary": ssr.beneficiary.public_primary_key,
"status": status,
"benefactor": benefactor,
"description": description,
}
def _build_expected_update_response(ssr, modified_data, updated_at_ts, **kwargs):
"""
updated_at timestamp will obviously be bumped when we do a PUT/PATCH
"""
return _construct_serialized_object(ssr, **kwargs) | modified_data | {"updated_at": updated_at_ts}
@pytest.mark.django_db
def test_list(ssr_setup, make_user_auth_headers):
ssr, beneficiary, token, _ = ssr_setup(description=description)
client = APIClient()
url = reverse("api-internal:shift_swap-list")
expected_payload = {
"next": None,
"previous": None,
"page_size": 50,
"count": 1,
"current_page_number": 1,
"total_pages": 1,
"results": [
_construct_serialized_object(ssr, description=description),
],
}
response = client.get(url, format="json", **make_user_auth_headers(beneficiary, token))
assert response.status_code == status.HTTP_200_OK
assert response.json() == expected_payload
@patch("apps.api.views.shift_swap.ShiftSwapViewSet.list", return_value=mock_success_response)
@pytest.mark.django_db
@pytest.mark.parametrize(
"role,expected_status",
[
(LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
(LegacyAccessControlRole.EDITOR, status.HTTP_200_OK),
(LegacyAccessControlRole.VIEWER, status.HTTP_200_OK),
],
)
def test_list_permissions(
mock_endpoint_handler,
ssr_setup,
make_user_auth_headers,
role,
expected_status,
):
_, beneficiary, token, _ = ssr_setup(beneficiary_role=role)
client = APIClient()
url = reverse("api-internal:shift_swap-list")
response = client.get(url, format="json", **make_user_auth_headers(beneficiary, token))
assert response.status_code == expected_status
@pytest.mark.django_db
def test_retrieve(ssr_setup, make_user_auth_headers):
ssr, beneficiary, token, _ = ssr_setup(description=description)
client = APIClient()
url = reverse("api-internal:shift_swap-detail", kwargs={"pk": ssr.public_primary_key})
response = client.get(url, format="json", **make_user_auth_headers(beneficiary, token))
assert response.status_code == status.HTTP_200_OK
assert response.json() == _construct_serialized_object(ssr, description=description)
@patch("apps.api.views.shift_swap.ShiftSwapViewSet.retrieve", return_value=mock_success_response)
@pytest.mark.django_db
@pytest.mark.parametrize(
"role,expected_status",
[
(LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
(LegacyAccessControlRole.EDITOR, status.HTTP_200_OK),
(LegacyAccessControlRole.VIEWER, status.HTTP_200_OK),
],
)
def test_retrieve_permissions(
mock_endpoint_handler,
ssr_setup,
make_user_auth_headers,
role,
expected_status,
):
ssr, _, token, benefactor = ssr_setup(benefactor_role=role)
client = APIClient()
url = reverse("api-internal:shift_swap-detail", kwargs={"pk": ssr.public_primary_key})
response = client.get(url, format="json", **make_user_auth_headers(benefactor, token))
assert response.status_code == expected_status
@patch("apps.api.views.shift_swap.write_resource_insight_log")
@pytest.mark.django_db
def test_create(
mock_write_resource_insight_log, make_organization_and_user_with_plugin_token, make_schedule, make_user_auth_headers
):
organization, user, token = make_organization_and_user_with_plugin_token()
schedule = make_schedule(organization, schedule_class=OnCallScheduleWeb)
client = APIClient()
url = reverse("api-internal:shift_swap-list")
data = {
"schedule": schedule.public_primary_key,
"description": "hellooooo world",
"swap_start": tomorrow,
"swap_end": two_days_from_now,
}
response = client.post(url, data, format="json", **make_user_auth_headers(user, token))
ssr = ShiftSwapRequest.objects.get(public_primary_key=response.json()["id"])
expected_response = _construct_serialized_object(ssr) | {
**data,
"swap_start": _convert_dt_to_sr(tomorrow),
"swap_end": _convert_dt_to_sr(two_days_from_now),
}
assert response.status_code == status.HTTP_201_CREATED
assert response.json() == expected_response
mock_write_resource_insight_log.assert_called_once_with(instance=ssr, author=user, event=EntityEvent.CREATED)
@pytest.mark.django_db
@pytest.mark.parametrize(
"swap_start,expected_persisted_value",
[
# UTC format
("2285-07-20T12:00:00Z", "2285-07-20T12:00:00.000000Z"),
# UTC format w/ microseconds
("2285-07-20T12:00:00.245652Z", "2285-07-20T12:00:00.245652Z"),
# UTC offset w/ colons + no microseconds
("2285-07-20T12:00:00+07:00", "2285-07-20T05:00:00.000000Z"),
# UTC offset w/ colons + microseconds
("2285-07-20T12:00:00.245652+07:00", "2285-07-20T05:00:00.245652Z"),
# UTC offset w/ no colons + no microseconds
("2285-07-20T12:00:00+0700", "2285-07-20T05:00:00.000000Z"),
# UTC offset w/ no colons + microseconds
("2285-07-20T12:00:00.245652+0700", "2285-07-20T05:00:00.245652Z"),
("2285-07-20 12:00:00", None),
("22850720T120000Z", None),
],
)
def test_create_swap_start_and_swap_end_must_include_time_zone(
make_organization_and_user_with_plugin_token,
make_schedule,
make_user_auth_headers,
swap_start,
expected_persisted_value,
):
organization, user, token = make_organization_and_user_with_plugin_token()
schedule = make_schedule(organization, schedule_class=OnCallScheduleWeb)
client = APIClient()
url = reverse("api-internal:shift_swap-list")
start_year = "2285"
end_year = "2286"
swap_end = swap_start.replace(start_year, end_year)
data = {
"schedule": schedule.public_primary_key,
"description": "hellooooo world",
"swap_start": swap_start,
"swap_end": swap_end,
}
response = client.post(url, data, format="json", **make_user_auth_headers(user, token))
if expected_persisted_value:
ssr = ShiftSwapRequest.objects.get(public_primary_key=response.json()["id"])
assert response.status_code == status.HTTP_201_CREATED
assert response.json() == _construct_serialized_object(ssr) | {
**data,
"swap_start": expected_persisted_value,
"swap_end": expected_persisted_value.replace(start_year, end_year),
}
else:
assert response.status_code == status.HTTP_400_BAD_REQUEST
@patch("apps.api.views.shift_swap.ShiftSwapViewSet.create", return_value=mock_success_response)
@pytest.mark.django_db
@pytest.mark.parametrize(
"role,expected_status",
[
(LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
(LegacyAccessControlRole.EDITOR, status.HTTP_200_OK),
(LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN),
],
)
def test_create_permissions(
mock_endpoint_handler,
ssr_setup,
make_user_auth_headers,
role,
expected_status,
):
_, _, token, benefactor = ssr_setup(benefactor_role=role)
client = APIClient()
url = reverse("api-internal:shift_swap-list")
response = client.post(url, format="json", **make_user_auth_headers(benefactor, token))
assert response.status_code == expected_status
@patch("apps.api.views.shift_swap.write_resource_insight_log")
@pytest.mark.django_db
def test_update(mock_write_resource_insight_log, ssr_setup, make_user_auth_headers):
ssr, beneficiary, token, _ = ssr_setup(description=description)
insights_log_prev_state = ssr.insight_logs_serialized
client = APIClient()
url = reverse("api-internal:shift_swap-detail", kwargs={"pk": ssr.public_primary_key})
auth_headers = make_user_auth_headers(beneficiary, token)
data = {
"description": "hellooooo world",
"schedule": ssr.schedule.public_primary_key,
"swap_start": _convert_dt_to_sr(ssr.swap_start),
"swap_end": _convert_dt_to_sr(ssr.swap_end),
}
response = client.put(url, data=json.dumps(data), content_type="application/json", **auth_headers)
response_json = response.json()
expected_response = _build_expected_update_response(ssr, data, response_json["updated_at"], description=description)
assert response.status_code == status.HTTP_200_OK
assert response_json == expected_response
response = client.get(url, format="json", **auth_headers)
assert response.status_code == status.HTTP_200_OK
assert response.json() == expected_response
ssr.refresh_from_db()
mock_write_resource_insight_log.assert_called_once_with(
instance=ssr,
author=beneficiary,
event=EntityEvent.UPDATED,
prev_state=insights_log_prev_state,
new_state=ssr.insight_logs_serialized,
)
@pytest.mark.django_db
@pytest.mark.parametrize(
"swap_start,expected_persisted_value",
[
# UTC format
("2285-07-20T12:00:00Z", "2285-07-20T12:00:00.000000Z"),
# UTC format w/ microseconds
("2285-07-20T12:00:00.245652Z", "2285-07-20T12:00:00.245652Z"),
# UTC offset w/ colons + no microseconds
("2285-07-20T12:00:00+07:00", "2285-07-20T05:00:00.000000Z"),
# UTC offset w/ colons + microseconds
("2285-07-20T12:00:00.245652+07:00", "2285-07-20T05:00:00.245652Z"),
# UTC offset w/ no colons + no microseconds
("2285-07-20T12:00:00+0700", "2285-07-20T05:00:00.000000Z"),
# UTC offset w/ no colons + microseconds
("2285-07-20T12:00:00.245652+0700", "2285-07-20T05:00:00.245652Z"),
("2285-07-20 12:00:00", None),
("22850720T120000Z", None),
],
)
def test_update_swap_start_and_swap_end_must_include_time_zone(
ssr_setup,
make_user_auth_headers,
swap_start,
expected_persisted_value,
):
ssr, beneficiary, token, _ = ssr_setup()
client = APIClient()
url = reverse("api-internal:shift_swap-detail", kwargs={"pk": ssr.public_primary_key})
start_year = "2285"
end_year = "2286"
swap_end = swap_start.replace(start_year, end_year)
data = {
"schedule": ssr.schedule.public_primary_key,
"swap_start": swap_start,
"swap_end": swap_end,
}
response = client.put(url, data, format="json", **make_user_auth_headers(beneficiary, token))
if expected_persisted_value:
ssr = ShiftSwapRequest.objects.get(public_primary_key=response.json()["id"])
assert response.status_code == status.HTTP_200_OK
assert response.json() == _construct_serialized_object(ssr) | {
**data,
"swap_start": expected_persisted_value,
"swap_end": expected_persisted_value.replace(start_year, end_year),
}
else:
assert response.status_code == status.HTTP_400_BAD_REQUEST
@pytest.mark.django_db
@pytest.mark.parametrize(
"role,expected_status",
[
(LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
(LegacyAccessControlRole.EDITOR, status.HTTP_200_OK),
(LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN),
],
)
def test_update_own_ssr_permissions(ssr_setup, make_user_auth_headers, role, expected_status):
ssr, beneficiary, token, _ = ssr_setup(beneficiary_role=role)
client = APIClient()
url = reverse("api-internal:shift_swap-detail", kwargs={"pk": ssr.public_primary_key})
data = {
"description": "hellooooo world",
"schedule": ssr.schedule.public_primary_key,
"swap_start": _convert_dt_to_sr(ssr.swap_start),
"swap_end": _convert_dt_to_sr(ssr.swap_end),
}
response = client.put(
url, data=json.dumps(data), content_type="application/json", **make_user_auth_headers(beneficiary, token)
)
assert response.status_code == expected_status
@pytest.mark.django_db
def test_update_others_ssr_permissions(ssr_setup, make_user_auth_headers):
ssr, _, token, benefactor = ssr_setup()
assert benefactor.role == LegacyAccessControlRole.ADMIN
client = APIClient()
url = reverse("api-internal:shift_swap-detail", kwargs={"pk": ssr.public_primary_key})
response = client.put(
url, data=json.dumps({}), content_type="application/json", **make_user_auth_headers(benefactor, token)
)
assert response.status_code == status.HTTP_403_FORBIDDEN
@patch("apps.api.views.shift_swap.write_resource_insight_log")
@pytest.mark.django_db
def test_partial_update(mock_write_resource_insight_log, ssr_setup, make_user_auth_headers):
ssr, beneficiary, token, _ = ssr_setup(description=description)
insights_log_prev_state = ssr.insight_logs_serialized
client = APIClient()
url = reverse("api-internal:shift_swap-detail", kwargs={"pk": ssr.public_primary_key})
auth_headers = make_user_auth_headers(beneficiary, token)
data = {"description": "this is a shift swap request"}
response = client.patch(url, data=json.dumps(data), content_type="application/json", **auth_headers)
response_json = response.json()
expected_response = _build_expected_update_response(ssr, data, response_json["updated_at"], description=description)
assert response.status_code == status.HTTP_200_OK
assert response.json() == expected_response
response = client.get(url, format="json", **auth_headers)
assert response.status_code == status.HTTP_200_OK
assert response.json() == expected_response
ssr.refresh_from_db()
mock_write_resource_insight_log.assert_called_once_with(
instance=ssr,
author=beneficiary,
event=EntityEvent.UPDATED,
prev_state=insights_log_prev_state,
new_state=ssr.insight_logs_serialized,
)
@pytest.mark.django_db
def test_partial_update_time_related_fields(ssr_setup, make_user_auth_headers):
ssr, beneficiary, token, _ = ssr_setup()
client = APIClient()
url = reverse("api-internal:shift_swap-detail", kwargs={"pk": ssr.public_primary_key})
auth_headers = make_user_auth_headers(beneficiary, token)
# but if we do PATCH a time related field, we must specify all the time fields
swap_start = {"swap_start": _convert_dt_to_sr(tomorrow + datetime.timedelta(days=5))}
swap_end = {"swap_end": _convert_dt_to_sr(tomorrow + datetime.timedelta(days=10))}
valid = swap_start | swap_end
for case in [swap_start, swap_end]:
response = client.patch(url, data=json.dumps(case), content_type="application/json", **auth_headers)
assert response.status_code == status.HTTP_400_BAD_REQUEST
# valid way to patch time related fields
response = client.patch(url, data=json.dumps(valid), content_type="application/json", **auth_headers)
response_json = response.json()
expected_response = _build_expected_update_response(ssr, valid, response_json["updated_at"])
assert response.status_code == status.HTTP_200_OK
assert response.json() == expected_response
response = client.get(url, format="json", **auth_headers)
assert response.status_code == status.HTTP_200_OK
assert response.json() == expected_response
@pytest.mark.django_db
@pytest.mark.parametrize(
"role,expected_status",
[
(LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
(LegacyAccessControlRole.EDITOR, status.HTTP_200_OK),
(LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN),
],
)
def test_partial_update_own_ssr_permissions(ssr_setup, make_user_auth_headers, role, expected_status):
ssr, beneficiary, token, _ = ssr_setup(beneficiary_role=role)
client = APIClient()
url = reverse("api-internal:shift_swap-detail", kwargs={"pk": ssr.public_primary_key})
response = client.patch(
url,
data=json.dumps({"description": "foo"}),
content_type="application/json",
**make_user_auth_headers(beneficiary, token),
)
assert response.status_code == expected_status
@pytest.mark.django_db
def test_partial_update_others_ssr_permissions(ssr_setup, make_user_auth_headers):
ssr, _, token, benefactor = ssr_setup()
assert benefactor.role == LegacyAccessControlRole.ADMIN
client = APIClient()
url = reverse("api-internal:shift_swap-detail", kwargs={"pk": ssr.public_primary_key})
response = client.patch(
url, data=json.dumps({}), content_type="application/json", **make_user_auth_headers(benefactor, token)
)
assert response.status_code == status.HTTP_403_FORBIDDEN
@pytest.mark.django_db
def test_benefactor_and_beneficiary_are_read_only_fields(ssr_setup, make_user_auth_headers):
ssr, beneficiary, token, benefactor = ssr_setup(description=description)
client = APIClient()
list_url = reverse("api-internal:shift_swap-list")
detail_url = reverse("api-internal:shift_swap-detail", kwargs={"pk": ssr.public_primary_key})
auth_headers = make_user_auth_headers(beneficiary, token)
base_data = {
"description": "hellooooo world",
"schedule": ssr.schedule.public_primary_key,
"swap_start": _convert_dt_to_sr(ssr.swap_start),
"swap_end": _convert_dt_to_sr(ssr.swap_end),
}
update_beneficiary = {"beneficiary": benefactor.public_primary_key}
update_benefactor = {"benefactor": beneficiary.public_primary_key}
def _assert_beneficiary_hasnt_changed(resp):
assert resp.json()["beneficiary"] == beneficiary.public_primary_key
def _assert_benefactor_is_still_none(resp):
assert resp.json()["benefactor"] is None
response = client.post(
list_url, data=json.dumps(base_data | update_beneficiary), content_type="application/json", **auth_headers
)
_assert_beneficiary_hasnt_changed(response)
response = client.post(
list_url, data=json.dumps(base_data | update_benefactor), content_type="application/json", **auth_headers
)
_assert_benefactor_is_still_none(response)
response = client.put(
detail_url, data=json.dumps(base_data | update_beneficiary), content_type="application/json", **auth_headers
)
_assert_beneficiary_hasnt_changed(response)
response = client.put(
detail_url, data=json.dumps(base_data | update_benefactor), content_type="application/json", **auth_headers
)
_assert_benefactor_is_still_none(response)
response = client.patch(
detail_url, data=json.dumps(base_data | update_beneficiary), content_type="application/json", **auth_headers
)
_assert_beneficiary_hasnt_changed(response)
response = client.patch(
detail_url, data=json.dumps(base_data | update_benefactor), content_type="application/json", **auth_headers
)
_assert_benefactor_is_still_none(response)
@patch("apps.api.views.shift_swap.write_resource_insight_log")
@pytest.mark.django_db
def test_delete(mock_write_resource_insight_log, ssr_setup, make_user_auth_headers):
ssr, beneficiary, token, _ = ssr_setup()
client = APIClient()
url = reverse("api-internal:shift_swap-detail", kwargs={"pk": ssr.public_primary_key})
auth_headers = make_user_auth_headers(beneficiary, token)
response = client.delete(url, **auth_headers)
assert response.status_code == status.HTTP_204_NO_CONTENT
response = client.get(url, format="json", **auth_headers)
assert response.status_code == status.HTTP_404_NOT_FOUND
mock_write_resource_insight_log.assert_called_once_with(
instance=ssr,
author=beneficiary,
event=EntityEvent.DELETED,
)
@pytest.mark.django_db
@pytest.mark.parametrize(
"role,expected_status",
[
(LegacyAccessControlRole.ADMIN, status.HTTP_204_NO_CONTENT),
(LegacyAccessControlRole.EDITOR, status.HTTP_204_NO_CONTENT),
(LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN),
],
)
def test_delete_own_ssr_permissions(ssr_setup, make_user_auth_headers, role, expected_status):
ssr, beneficiary, token, _ = ssr_setup(beneficiary_role=role)
client = APIClient()
url = reverse("api-internal:shift_swap-detail", kwargs={"pk": ssr.public_primary_key})
response = client.delete(url, format="json", **make_user_auth_headers(beneficiary, token))
assert response.status_code == expected_status
@pytest.mark.django_db
def test_delete_others_ssr_permissions(ssr_setup, make_user_auth_headers):
ssr, _, token, benefactor = ssr_setup()
assert benefactor.role == LegacyAccessControlRole.ADMIN
client = APIClient()
url = reverse("api-internal:shift_swap-detail", kwargs={"pk": ssr.public_primary_key})
response = client.delete(url, format="json", **make_user_auth_headers(benefactor, token))
assert response.status_code == status.HTTP_403_FORBIDDEN
@pytest.mark.django_db
def test_take(ssr_setup, make_user_auth_headers):
ssr, _, token, benefactor = ssr_setup()
client = APIClient()
url = reverse("api-internal:shift_swap-take", kwargs={"pk": ssr.public_primary_key})
auth_headers = make_user_auth_headers(benefactor, token)
response = client.post(url, format="json", **auth_headers)
response_json = response.json()
updated_at = response_json["updated_at"]
expected_response = _build_expected_update_response(
ssr, {}, updated_at, status="taken", benefactor=benefactor.public_primary_key
)
assert response.status_code == status.HTTP_200_OK
assert response_json == expected_response
assert updated_at != _convert_dt_to_sr(ssr.updated_at) # validate that updated_at is auto-updated on take
url = reverse("api-internal:shift_swap-detail", kwargs={"pk": ssr.public_primary_key})
response = client.get(url, format="json", **auth_headers)
response_json = response.json()
assert response.status_code == status.HTTP_200_OK
assert response_json == expected_response
@pytest.mark.django_db
def test_benficiary_tries_to_take_their_own_ssr(ssr_setup, make_user_auth_headers):
ssr, beneficiary, token, _ = ssr_setup()
client = APIClient()
url = reverse("api-internal:shift_swap-take", kwargs={"pk": ssr.public_primary_key})
response = client.post(url, format="json", **make_user_auth_headers(beneficiary, token))
assert response.status_code == status.HTTP_400_BAD_REQUEST
@pytest.mark.django_db
def test_take_already_taken_ssr(ssr_setup, make_user_auth_headers):
ssr, _, token, benefactor = ssr_setup()
client = APIClient()
url = reverse("api-internal:shift_swap-take", kwargs={"pk": ssr.public_primary_key})
response = client.post(url, format="json", **make_user_auth_headers(benefactor, token))
assert response.status_code == status.HTTP_200_OK
response = client.post(url, format="json", **make_user_auth_headers(benefactor, token))
assert response.status_code == status.HTTP_400_BAD_REQUEST
@pytest.mark.django_db
def test_take_past_due_ssr(ssr_setup, make_user_auth_headers):
ssr, _, token, benefactor = ssr_setup()
client = APIClient()
url = reverse("api-internal:shift_swap-take", kwargs={"pk": ssr.public_primary_key})
ssr.swap_start = tomorrow - datetime.timedelta(days=5)
ssr.save()
response = client.post(url, format="json", **make_user_auth_headers(benefactor, token))
assert response.status_code == status.HTTP_400_BAD_REQUEST
@pytest.mark.django_db
def test_take_deleted_ssr(ssr_setup, make_user_auth_headers):
ssr, _, token, benefactor = ssr_setup()
client = APIClient()
url = reverse("api-internal:shift_swap-take", kwargs={"pk": ssr.public_primary_key})
ssr.delete()
response = client.post(url, format="json", **make_user_auth_headers(benefactor, token))
assert response.status_code == status.HTTP_404_NOT_FOUND
@patch("apps.api.views.shift_swap.ShiftSwapViewSet.take", return_value=mock_success_response)
@pytest.mark.django_db
@pytest.mark.parametrize(
"role,expected_status",
[
(LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
(LegacyAccessControlRole.EDITOR, status.HTTP_200_OK),
(LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN),
],
)
def test_take_permissions(
mock_endpoint_handler,
ssr_setup,
make_user_auth_headers,
role,
expected_status,
):
ssr, _, token, benefactor = ssr_setup(benefactor_role=role)
client = APIClient()
url = reverse("api-internal:shift_swap-take", kwargs={"pk": ssr.public_primary_key})
response = client.post(url, format="json", **make_user_auth_headers(benefactor, token))
assert response.status_code == expected_status

View file

@ -1,3 +1,4 @@
from django.conf import settings
from django.urls import include, path, re_path
from common.api_helpers.optional_slash_router import OptionalSlashRouter, optional_slash_path
@ -27,6 +28,7 @@ from .views.public_api_tokens import PublicApiTokenView
from .views.resolution_note import ResolutionNoteView
from .views.route_regex_debugger import RouteRegexDebuggerView
from .views.schedule import ScheduleView
from .views.shift_swap import ShiftSwapViewSet
from .views.slack_channel import SlackChannelView
from .views.slack_team_settings import (
AcknowledgeReminderOptionsAPIView,
@ -65,6 +67,9 @@ router.register(r"tokens", PublicApiTokenView, basename="api_token")
router.register(r"live_settings", LiveSettingViewSet, basename="live_settings")
router.register(r"oncall_shifts", OnCallShiftView, basename="oncall_shifts")
if settings.FEATURE_SHIFT_SWAPS_ENABLED:
router.register(r"shift_swaps", ShiftSwapViewSet, basename="shift_swap")
urlpatterns = [
path("", include(router.urls)),
optional_slash_path("user", CurrentUserView.as_view(), name="api-user"),

View file

@ -0,0 +1,90 @@
import logging
from rest_framework import status
from rest_framework.decorators import action
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.viewsets import ModelViewSet
from apps.api.permissions import IsOwner, RBACPermission
from apps.api.serializers.shift_swap import ShiftSwapRequestSerializer
from apps.auth_token.auth import PluginAuthentication
from apps.mobile_app.auth import MobileAppAuthTokenAuthentication
from apps.schedules import exceptions
from apps.schedules.models import ShiftSwapRequest
from common.api_helpers.exceptions import BadRequest
from common.api_helpers.mixins import PublicPrimaryKeyMixin
from common.api_helpers.paginators import FiftyPageSizePaginator
from common.insight_log import EntityEvent, write_resource_insight_log
logger = logging.getLogger(__name__)
class ShiftSwapViewSet(PublicPrimaryKeyMixin, ModelViewSet):
authentication_classes = (PluginAuthentication, MobileAppAuthTokenAuthentication)
permission_classes = (IsAuthenticated, RBACPermission)
rbac_permissions = {
# TODO: add note to public documentation about these permissions also giving access to shift swaps
# unless we want to make a separate resource type for them?
"metadata": [RBACPermission.Permissions.SCHEDULES_READ],
"list": [RBACPermission.Permissions.SCHEDULES_READ],
"retrieve": [RBACPermission.Permissions.SCHEDULES_READ],
"create": [RBACPermission.Permissions.SCHEDULES_WRITE],
"update": [RBACPermission.Permissions.SCHEDULES_WRITE],
"partial_update": [RBACPermission.Permissions.SCHEDULES_WRITE],
"destroy": [RBACPermission.Permissions.SCHEDULES_WRITE],
"take": [RBACPermission.Permissions.SCHEDULES_WRITE],
}
is_beneficiary = IsOwner(ownership_field="beneficiary")
rbac_object_permissions = {
is_beneficiary: [
"update",
"partial_update",
"destroy",
],
}
model = ShiftSwapRequest
serializer_class = ShiftSwapRequestSerializer
pagination_class = FiftyPageSizePaginator
def get_queryset(self):
queryset = ShiftSwapRequest.objects.filter(schedule__organization=self.request.auth.organization)
return self.serializer_class.setup_eager_loading(queryset)
def perform_destroy(self, instance):
super().perform_destroy(instance)
write_resource_insight_log(instance=instance, author=self.request.user, event=EntityEvent.DELETED)
def perform_create(self, serializer):
beneficiary = self.request.user
serializer.save(beneficiary=beneficiary)
write_resource_insight_log(instance=serializer.instance, author=beneficiary, event=EntityEvent.CREATED)
def perform_update(self, serializer):
prev_state = serializer.instance.insight_logs_serialized
serializer.save()
new_state = serializer.instance.insight_logs_serialized
write_resource_insight_log(
instance=serializer.instance,
author=self.request.user,
event=EntityEvent.UPDATED,
prev_state=prev_state,
new_state=new_state,
)
@action(methods=["post"], detail=True)
def take(self, request, pk) -> Response:
shift_swap = self.get_object()
try:
shift_swap.take(request.user)
except exceptions.ShiftSwapRequestNotOpenForTaking:
raise BadRequest(detail="The shift swap request is not in a state which allows it to be taken")
except exceptions.BeneficiaryCannotTakeOwnShiftSwapRequest:
raise BadRequest(detail="A shift swap request cannot be created and taken by the same user")
return Response(ShiftSwapRequestSerializer(shift_swap).data, status=status.HTTP_200_OK)

View file

@ -1,7 +1,7 @@
from rest_framework import serializers
from apps.mobile_app.models import MobileAppUserSettings
from common.timezones import TimeZoneField
from common.api_helpers.custom_fields import TimeZoneField
class MobileAppUserSettingsSerializer(serializers.ModelSerializer):

View file

@ -7,12 +7,12 @@ from apps.user_management.models import User
from common.api_helpers.custom_fields import (
RollingUsersField,
TeamPrimaryKeyRelatedField,
TimeZoneField,
UsersFilteredByOrganizationField,
)
from common.api_helpers.exceptions import BadRequest
from common.api_helpers.mixins import EagerLoadingMixin
from common.api_helpers.utils import CurrentOrganizationDefault
from common.timezones import TimeZoneField
class CustomOnCallShiftTypeField(fields.CharField):

View file

@ -5,9 +5,8 @@ from apps.schedules.tasks import (
schedule_notify_about_empty_shifts_in_schedule,
schedule_notify_about_gaps_in_schedule,
)
from common.api_helpers.custom_fields import UsersFilteredByOrganizationField
from common.api_helpers.custom_fields import TimeZoneField, UsersFilteredByOrganizationField
from common.api_helpers.exceptions import BadRequest
from common.timezones import TimeZoneField
class ScheduleCalendarSerializer(ScheduleBaseSerializer):

View file

@ -5,8 +5,7 @@ from apps.schedules.tasks import (
schedule_notify_about_empty_shifts_in_schedule,
schedule_notify_about_gaps_in_schedule,
)
from common.api_helpers.custom_fields import TeamPrimaryKeyRelatedField, UsersFilteredByOrganizationField
from common.timezones import TimeZoneField
from common.api_helpers.custom_fields import TeamPrimaryKeyRelatedField, TimeZoneField, UsersFilteredByOrganizationField
class ScheduleWebSerializer(ScheduleBaseSerializer):

View file

@ -0,0 +1,10 @@
class BeneficiaryCannotTakeOwnShiftSwapRequest(Exception):
"""
Raised when a beneficiary tries to 'take' their own shift swap request
"""
class ShiftSwapRequestNotOpenForTaking(Exception):
"""
Indicates that the shift swap request is not in a state which allows it to be assigned to a benefactor (aka "taken")
"""

View file

@ -0,0 +1,33 @@
# Generated by Django 3.2.20 on 2023-07-21 18:55
import apps.schedules.models.shift_swap_request
import django.core.validators
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('user_management', '0013_alter_organization_acknowledge_remind_timeout'),
('schedules', '0013_auto_20230517_0510'),
]
operations = [
migrations.CreateModel(
name='ShiftSwapRequest',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('public_primary_key', models.CharField(default=apps.schedules.models.shift_swap_request.generate_public_primary_key_for_shift_swap_request, max_length=20, unique=True, validators=[django.core.validators.MinLengthValidator(13)])),
('created_at', models.DateTimeField(auto_now_add=True)),
('updated_at', models.DateTimeField(auto_now=True)),
('deleted_at', models.DateTimeField(null=True)),
('swap_start', models.DateTimeField()),
('swap_end', models.DateTimeField()),
('description', models.TextField(default=None, max_length=3000, null=True)),
('benefactor', models.ForeignKey(null=True, on_delete=django.db.models.deletion.CASCADE, related_name='taken_shift_swap_requests', to='user_management.user')),
('beneficiary', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='created_shift_swap_requests', to='user_management.user')),
('schedule', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='shift_swap_requests', to='schedules.oncallschedule')),
],
),
]

View file

@ -5,3 +5,4 @@ from .on_call_schedule import ( # noqa: F401
OnCallScheduleICal,
OnCallScheduleWeb,
)
from .shift_swap_request import ShiftSwapRequest # noqa: F401

View file

@ -0,0 +1,151 @@
import enum
import typing
from django.conf import settings
from django.core.validators import MinLengthValidator
from django.db import models
from django.utils import timezone
from apps.schedules import exceptions
from common.public_primary_keys import generate_public_primary_key, increase_public_primary_key_length
if typing.TYPE_CHECKING:
from apps.user_management.models import User
def generate_public_primary_key_for_shift_swap_request() -> str:
prefix = "SSR"
new_public_primary_key = generate_public_primary_key(prefix)
failure_counter = 0
while ShiftSwapRequest.objects.filter(public_primary_key=new_public_primary_key).exists():
new_public_primary_key = increase_public_primary_key_length(
failure_counter=failure_counter, prefix=prefix, model_name="ShiftSwapRequest"
)
failure_counter += 1
return new_public_primary_key
class ShiftSwapRequestQueryset(models.QuerySet):
def delete(self):
self.update(deleted_at=timezone.now())
class ShiftSwapRequestManager(models.Manager):
def get_queryset(self):
return ShiftSwapRequestQueryset(self.model, using=self._db).filter(deleted_at=None)
def hard_delete(self):
return self.get_queryset().hard_delete()
class ShiftSwapRequest(models.Model):
objects = ShiftSwapRequestManager()
objects_with_deleted = models.Manager()
public_primary_key = models.CharField(
max_length=20,
validators=[MinLengthValidator(settings.PUBLIC_PRIMARY_KEY_MIN_LENGTH + 1)],
unique=True,
default=generate_public_primary_key_for_shift_swap_request,
)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
deleted_at = models.DateTimeField(null=True)
schedule = models.ForeignKey(
to="schedules.OnCallSchedule", null=False, on_delete=models.CASCADE, related_name="shift_swap_requests"
)
swap_start = models.DateTimeField()
"""
so long as objects are created through the internal API, `swap_start` is guaranteed to be in UTC
(see the internal API serializer for more details)
"""
swap_end = models.DateTimeField()
"""
so long as objects are created through the internal API, `swap_end` is guaranteed to be in UTC
(see the internal API serializer for more details)
"""
description = models.TextField(max_length=3000, default=None, null=True)
beneficiary = models.ForeignKey(
to="user_management.User", null=False, on_delete=models.CASCADE, related_name="created_shift_swap_requests"
)
"""
the person who is relieved from (part of) their shift(s)
"""
benefactor = models.ForeignKey(
to="user_management.User", null=True, on_delete=models.CASCADE, related_name="taken_shift_swap_requests"
)
"""
the person taking on shift workload from the beneficiary
"""
class Statuses(enum.StrEnum):
OPEN = "open"
TAKEN = "taken"
PAST_DUE = "past_due"
DELETED = "deleted"
def __str__(self) -> str:
return f"{self.schedule.name} {self.beneficiary.username} {self.swap_start} - {self.swap_end}"
def delete(self):
self.deleted_at = timezone.now()
self.save()
def hard_delete(self):
super().delete()
@property
def status(self) -> str:
if self.deleted_at is not None:
return self.Statuses.DELETED
elif self.benefactor is not None:
return self.Statuses.TAKEN
elif timezone.now() > self.swap_start:
return self.Statuses.PAST_DUE
return self.Statuses.OPEN
def take(self, benefactor: "User") -> None:
if benefactor == self.beneficiary:
raise exceptions.BeneficiaryCannotTakeOwnShiftSwapRequest()
if self.status != self.Statuses.OPEN:
raise exceptions.ShiftSwapRequestNotOpenForTaking()
self.benefactor = benefactor
self.save()
# TODO: implement the actual override logic in https://github.com/grafana/oncall/issues/2590
# Insight logs
@property
def insight_logs_verbal(self):
return str(self)
@property
def insight_logs_type_verbal(self):
# TODO: add this resource type to the insight logs public docs
return "shift_swap_request"
@property
def insight_logs_serialized(self):
return {
"description": self.description,
"schedule": self.schedule.name,
"swap_start": self.swap_start,
"swap_end": self.swap_end,
"beneficiary": self.beneficiary.username,
"benefactor": self.benefactor.username if self.benefactor else None,
}
@property
def insight_logs_metadata(self):
return {}

View file

@ -1,6 +1,12 @@
import factory
from apps.schedules.models import CustomOnCallShift, OnCallScheduleCalendar, OnCallScheduleICal, OnCallScheduleWeb
from apps.schedules.models import (
CustomOnCallShift,
OnCallScheduleCalendar,
OnCallScheduleICal,
OnCallScheduleWeb,
ShiftSwapRequest,
)
from common.utils import UniqueFaker
@ -35,3 +41,8 @@ class CustomOnCallShiftFactory(factory.DjangoModelFactory):
class Meta:
model = CustomOnCallShift
class ShiftSwapRequestFactory(factory.DjangoModelFactory):
class Meta:
model = ShiftSwapRequest

View file

@ -0,0 +1,124 @@
import datetime
import pytest
from django.utils import timezone
from apps.schedules import exceptions
from apps.schedules.models import OnCallScheduleWeb, ShiftSwapRequest
@pytest.fixture
def ssr_setup(make_schedule, make_organization_and_user, make_user_for_organization, make_shift_swap_request):
def _ssr_setup():
organization, beneficiary = make_organization_and_user()
benefactor = make_user_for_organization(organization)
schedule = make_schedule(organization, schedule_class=OnCallScheduleWeb)
tomorrow = timezone.now() + datetime.timedelta(days=1)
two_days_from_now = tomorrow + datetime.timedelta(days=1)
ssr = make_shift_swap_request(schedule, beneficiary, swap_start=tomorrow, swap_end=two_days_from_now)
return ssr, beneficiary, benefactor
return _ssr_setup
@pytest.mark.django_db
def test_soft_delete(ssr_setup):
ssr, _, _ = ssr_setup()
assert ssr.deleted_at is None
ssr.delete()
ssr.refresh_from_db()
assert ssr.deleted_at is not None
assert ShiftSwapRequest.objects.all().count() == 0
assert ShiftSwapRequest.objects_with_deleted.all().count() == 1
@pytest.mark.django_db
def test_status_open(ssr_setup) -> None:
ssr, _, _ = ssr_setup()
assert ssr.status == ShiftSwapRequest.Statuses.OPEN
@pytest.mark.django_db
def test_status_taken(ssr_setup) -> None:
ssr, _, benefactor = ssr_setup()
assert ssr.status == ShiftSwapRequest.Statuses.OPEN
ssr.benefactor = benefactor
ssr.save()
assert ssr.status == ShiftSwapRequest.Statuses.TAKEN
@pytest.mark.django_db
def test_status_past_due(ssr_setup) -> None:
ssr, _, _ = ssr_setup()
assert ssr.status == ShiftSwapRequest.Statuses.OPEN
ssr.swap_start = ssr.swap_start - datetime.timedelta(days=5)
ssr.save()
assert ssr.status == ShiftSwapRequest.Statuses.PAST_DUE
@pytest.mark.django_db
def test_status_deleted(ssr_setup) -> None:
ssr, _, _ = ssr_setup()
assert ssr.status == ShiftSwapRequest.Statuses.OPEN
ssr.delete()
assert ssr.status == ShiftSwapRequest.Statuses.DELETED
@pytest.mark.django_db
def test_take(ssr_setup) -> None:
ssr, _, benefactor = ssr_setup()
original_updated_at = ssr.updated_at
ssr.take(benefactor)
assert ssr.benefactor == benefactor
assert ssr.updated_at != original_updated_at
# TODO:
@pytest.mark.django_db
def test_take_only_works_for_open_requests(ssr_setup) -> None:
# already taken
ssr, _, benefactor = ssr_setup()
ssr.benefactor = benefactor
ssr.save()
assert ssr.status == ShiftSwapRequest.Statuses.TAKEN
with pytest.raises(exceptions.ShiftSwapRequestNotOpenForTaking):
ssr.take(benefactor)
# past due
ssr, _, benefactor = ssr_setup()
ssr.swap_start = ssr.swap_start - datetime.timedelta(days=5)
ssr.save()
assert ssr.status == ShiftSwapRequest.Statuses.PAST_DUE
with pytest.raises(exceptions.ShiftSwapRequestNotOpenForTaking):
ssr.take(benefactor)
# deleted
ssr, _, benefactor = ssr_setup()
ssr.delete()
assert ssr.status == ShiftSwapRequest.Statuses.DELETED
with pytest.raises(exceptions.ShiftSwapRequestNotOpenForTaking):
ssr.take(benefactor)
@pytest.mark.django_db
def test_take_own_ssr(ssr_setup) -> None:
ssr, beneficiary, _ = ssr_setup()
with pytest.raises(exceptions.BeneficiaryCannotTakeOwnShiftSwapRequest):
ssr.take(beneficiary)

View file

View file

@ -8,6 +8,7 @@ from rest_framework.relations import RelatedField
from apps.alerts.models import ChannelFilter
from apps.user_management.models import User
from common.api_helpers.exceptions import BadRequest
from common.timezones import raise_exception_if_not_valid_timezone
class OrganizationFilteredPrimaryKeyRelatedField(RelatedField):
@ -153,3 +154,53 @@ class RollingUsersField(serializers.ListField):
def to_representation(self, value):
result = [list(d.values()) for d in value]
return result
class TimeZoneField(serializers.CharField):
def _validator(self, value: str):
raise_exception_if_not_valid_timezone(value, serializers.ValidationError)
def __init__(self, **kwargs):
super().__init__(validators=[self._validator], **kwargs)
class TimeZoneAwareDatetimeField(serializers.DateTimeField):
"""
This serializer field ensures that datetimes are always
passed in ISO-8601 format (https://en.wikipedia.org/wiki/ISO_8601) with one caveat, timezone information MUST
be passed in. ISO-8601 allows timezone information to be optional.
All of the following would be considered valid datetimes by this field:
2023-07-20T18:35:19+00:00
2023-07-20T18:35:19Z
These are not valid:
2023-07-20 12:00:00
20230720T120000Z
This allows us to capture timezone information at insert/update time. Django converts/persists this information
in UTC, and then when it is read back, you can be 100% sure that you are working with a UTC timezone aware datetime.
Additionally, it standardizes how we format returned datetime strings.
"""
UTC_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
UTC_FORMAT_WITH_MICROSECONDS = "%Y-%m-%dT%H:%M:%S.%fZ"
UTC_OFFSET_FORMAT = "%Y-%m-%dT%H:%M:%S%z"
UTC_OFFSET_FORMAT_WITH_MICROSECONDS = "%Y-%m-%dT%H:%M:%S.%f%z"
"`%z` = UTC offset in the form +HHMM or -HHMM. (a colon separator can optionally be included)"
def __init__(self, **kwargs):
# we could use 'iso-8601' as a valid value to input_formats, however, see the note above about it
# allowing timezone naive datetimes
super().__init__(
format=self.UTC_FORMAT_WITH_MICROSECONDS,
input_formats=[
self.UTC_FORMAT,
self.UTC_FORMAT_WITH_MICROSECONDS,
self.UTC_OFFSET_FORMAT,
self.UTC_OFFSET_FORMAT_WITH_MICROSECONDS,
],
**kwargs,
)

View file

@ -0,0 +1,102 @@
import datetime
from zoneinfo import ZoneInfo
import pytest
import pytz
from rest_framework import serializers
import common.api_helpers.custom_fields as cf
class TestTimeZoneField:
@pytest.mark.parametrize("tz", pytz.all_timezones)
def test_valid_timezones(self, tz):
class MySerializer(serializers.Serializer):
tz = cf.TimeZoneField()
try:
serializer = MySerializer(data={"tz": tz})
serializer.is_valid(raise_exception=True)
assert serializer.validated_data["tz"] == tz
except Exception:
pytest.fail()
def test_invalid_timezone(self):
class MySerializer(serializers.Serializer):
tz = cf.TimeZoneField()
with pytest.raises(serializers.ValidationError, match="Invalid timezone"):
serializer = MySerializer(data={"tz": "potato"})
serializer.is_valid(raise_exception=True)
def test_it_works_with_allow_null(self):
class MySerializer(serializers.Serializer):
tz = cf.TimeZoneField(allow_null=True)
try:
serializer = MySerializer(data={"tz": None})
serializer.is_valid(raise_exception=True)
assert serializer.validated_data["tz"] is None
serializer = MySerializer(data={"tz": "UTC"})
serializer.is_valid(raise_exception=True)
assert serializer.validated_data["tz"] == "UTC"
except Exception:
pytest.fail()
def test_it_works_with_required(self):
class MySerializer(serializers.Serializer):
tz = cf.TimeZoneField(required=True)
with pytest.raises(serializers.ValidationError, match="This field is required"):
serializer = MySerializer(data={})
serializer.is_valid(raise_exception=True)
try:
serializer = MySerializer(data={"tz": "UTC"})
serializer.is_valid(raise_exception=True)
assert serializer.validated_data["tz"] == "UTC"
except Exception:
pytest.fail()
class TestTimeZoneAwareDatetimeField:
@pytest.mark.parametrize(
"test_case,expected_persisted_value",
[
# UTC format
("2023-07-20T12:00:00Z", datetime.datetime(2023, 7, 20, 12, 0, 0, tzinfo=ZoneInfo("UTC"))),
# UTC format w/ microseconds
("2023-07-20T12:00:00.245652Z", datetime.datetime(2023, 7, 20, 12, 0, 0, 245652, tzinfo=ZoneInfo("UTC"))),
# UTC offset w/ colons + no microseconds
("2023-07-20T12:00:00+07:00", datetime.datetime(2023, 7, 20, 5, 0, 0, tzinfo=ZoneInfo("UTC"))),
# UTC offset w/ colons + microseconds
(
"2023-07-20T12:00:00.245652+07:00",
datetime.datetime(2023, 7, 20, 5, 0, 0, 245652, tzinfo=ZoneInfo("UTC")),
),
# UTC offset w/ no colons + no microseconds
("2023-07-20T12:00:00+0700", datetime.datetime(2023, 7, 20, 5, 0, 0, tzinfo=ZoneInfo("UTC"))),
# UTC offset w/ no colons + microseconds
(
"2023-07-20T12:00:00.245652+0700",
datetime.datetime(2023, 7, 20, 5, 0, 0, 245652, tzinfo=ZoneInfo("UTC")),
),
("2023-07-20 12:00:00", None),
("20230720T120000Z", None),
],
)
def test_various_datetimes(self, test_case, expected_persisted_value):
class MySerializer(serializers.Serializer):
dt = cf.TimeZoneAwareDatetimeField()
serializer = MySerializer(data={"dt": test_case})
if expected_persisted_value:
serializer.is_valid(raise_exception=True)
assert serializer.validated_data["dt"] == expected_persisted_value
else:
with pytest.raises(serializers.ValidationError):
serializer.is_valid(raise_exception=True)

View file

@ -8,6 +8,8 @@ from common.ordered_model.ordered_model import OrderedModel
class TestOrderedModel(OrderedModel):
__test__ = False
test_field = models.CharField(max_length=255)
extra_field = models.IntegerField(null=True, default=None)
order_with_respect_to = ["test_field"]

View file

@ -1,6 +1,5 @@
import pytest
import pytz
from rest_framework import serializers
from rest_framework.exceptions import APIException
import common.timezones as tz
@ -42,56 +41,3 @@ def test_raise_exception_if_not_valid_timezone_custom_exception():
with pytest.raises(MyCustomException, match="Invalid timezone"):
tz.raise_exception_if_not_valid_timezone("asdfasfd", exception_class=MyCustomException)
class TestTimeZoneField:
@pytest.mark.parametrize("tz", pytz.all_timezones)
def test_valid_timezones(self, tz):
class MySerializer(serializers.Serializer):
tz = tz.TimeZoneField()
try:
serializer = MySerializer(data={"tz": tz})
serializer.is_valid(raise_exception=True)
assert serializer.validated_data["tz"] == tz
except Exception:
pytest.fail()
def test_invalid_timezone(self):
class MySerializer(serializers.Serializer):
tz = tz.TimeZoneField()
with pytest.raises(serializers.ValidationError, match="Invalid timezone"):
serializer = MySerializer(data={"tz": "potato"})
serializer.is_valid(raise_exception=True)
def test_it_works_with_allow_null(self):
class MySerializer(serializers.Serializer):
tz = tz.TimeZoneField(allow_null=True)
try:
serializer = MySerializer(data={"tz": None})
serializer.is_valid(raise_exception=True)
assert serializer.validated_data["tz"] is None
serializer = MySerializer(data={"tz": "UTC"})
serializer.is_valid(raise_exception=True)
assert serializer.validated_data["tz"] == "UTC"
except Exception:
pytest.fail()
def test_it_works_with_required(self):
class MySerializer(serializers.Serializer):
tz = tz.TimeZoneField(required=True)
with pytest.raises(serializers.ValidationError, match="This field is required"):
serializer = MySerializer(data={})
serializer.is_valid(raise_exception=True)
try:
serializer = MySerializer(data={"tz": "UTC"})
serializer.is_valid(raise_exception=True)
assert serializer.validated_data["tz"] == "UTC"
except Exception:
pytest.fail()

View file

@ -1,5 +1,4 @@
import pytz
from rest_framework import serializers
from common.api_helpers.exceptions import BadRequest
@ -20,11 +19,3 @@ def raise_exception_if_not_valid_timezone(timezone, exception_class=BadRequest):
"""
if not is_valid_timezone(timezone):
raise exception_class(detail="Invalid timezone")
class TimeZoneField(serializers.CharField):
def _validator(self, value: str):
raise_exception_if_not_valid_timezone(value, serializers.ValidationError)
def __init__(self, **kwargs):
super().__init__(validators=[self._validator], **kwargs)

View file

@ -65,6 +65,7 @@ from apps.schedules.tests.factories import (
OnCallScheduleCalendarFactory,
OnCallScheduleFactory,
OnCallScheduleICalFactory,
ShiftSwapRequestFactory,
)
from apps.slack.slack_client import SlackClientWithErrorHandling
from apps.slack.tests.factories import (
@ -96,6 +97,7 @@ register(EscalationPolicyFactory)
register(OnCallScheduleICalFactory)
register(OnCallScheduleCalendarFactory)
register(CustomOnCallShiftFactory)
register(ShiftSwapRequestFactory)
register(AlertFactory)
register(AlertGroupFactory)
register(AlertGroupLogRecordFactory)
@ -880,3 +882,11 @@ def make_organization_and_user_with_token(make_organization_and_user, make_publi
return organization, user, token
return _make_organization_and_user_with_token
@pytest.fixture
def make_shift_swap_request():
def _make_shift_swap_request(schedule, beneficiary, **kwargs):
return ShiftSwapRequestFactory(schedule=schedule, beneficiary=beneficiary, **kwargs)
return _make_shift_swap_request

View file

@ -65,6 +65,7 @@ FEATURE_MULTIREGION_ENABLED = getenv_boolean("FEATURE_MULTIREGION_ENABLED", defa
FEATURE_INBOUND_EMAIL_ENABLED = getenv_boolean("FEATURE_INBOUND_EMAIL_ENABLED", default=False)
FEATURE_PROMETHEUS_EXPORTER_ENABLED = getenv_boolean("FEATURE_PROMETHEUS_EXPORTER_ENABLED", default=False)
FEATURE_WEBHOOKS_2_ENABLED = getenv_boolean("FEATURE_WEBHOOKS_2_ENABLED", default=True)
FEATURE_SHIFT_SWAPS_ENABLED = getenv_boolean("FEATURE_SHIFT_SWAPS_ENABLED", default=False)
GRAFANA_CLOUD_ONCALL_HEARTBEAT_ENABLED = getenv_boolean("GRAFANA_CLOUD_ONCALL_HEARTBEAT_ENABLED", default=True)
GRAFANA_CLOUD_NOTIFICATIONS_ENABLED = getenv_boolean("GRAFANA_CLOUD_NOTIFICATIONS_ENABLED", default=True)

View file

@ -40,3 +40,5 @@ TWILIO_ACCOUNT_SID = "dummy_twilio_account_sid"
TWILIO_AUTH_TOKEN = "dummy_twilio_auth_token"
EXTRA_MESSAGING_BACKENDS = [("apps.base.tests.messaging_backend.TestOnlyBackend", 42)]
FEATURE_SHIFT_SWAPS_ENABLED = True