PD migrator: migrate on-call shifts using public API (#1317)
Allow PD migrator tool to migrate on-call shifts when migrating schedules (currently it migrates schedules using PD ICal file): https://github.com/grafana/oncall/issues/1283. This PR will allow to select the mode of schedule migration via `SCHEDULE_MIGRATION_MODE_WEB` env variable (`ical` or `web`). Due to differences in the scheduling systems of PD and OnCall, it's not always possible to migrate shifts automatically (migration plan will show any schedules and layers that can't be migrated). PD rotations that will be possible to migrate: - Any rotation without restrictions ("restriction" is a PD term for describing active periods for rotation) - Daily rotations with daily restrictions - Weekly rotations with weekly restrictions - Some weekly rotations with daily restrictions - Some daily rotations with weekly restrictions There will be a separate PR to update the [instruction](https://github.com/grafana/oncall/tree/dev/tools/pagerduty-migrator#readme) since this one is pretty huge already.
This commit is contained in:
parent
53af4783de
commit
a2eed312f9
13 changed files with 2723 additions and 46 deletions
|
|
@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
|
|||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## Unreleased
|
||||
|
||||
### Changed
|
||||
|
||||
- Allow creating schedules with type "web" using public API
|
||||
|
||||
## v1.1.28 (2023-02-23)
|
||||
|
||||
### Fixed
|
||||
|
|
|
|||
|
|
@ -238,7 +238,7 @@ class CustomOnCallShiftSerializer(EagerLoadingMixin, serializers.ModelSerializer
|
|||
data["users"] = []
|
||||
if data.get("rolling_users", []) is None: # terraform case
|
||||
data["rolling_users"] = []
|
||||
if data.get("source") != CustomOnCallShift.SOURCE_TERRAFORM:
|
||||
if data.get("source") not in (CustomOnCallShift.SOURCE_TERRAFORM, CustomOnCallShift.SOURCE_WEB):
|
||||
data["source"] = CustomOnCallShift.SOURCE_API
|
||||
if data.get("start") is not None:
|
||||
self._validate_start(data["start"])
|
||||
|
|
|
|||
|
|
@ -232,25 +232,6 @@ def test_get_web_schedule(
|
|||
assert response.json() == result
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_create_web_schedule(make_organization_and_user_with_token):
|
||||
_, _, token = make_organization_and_user_with_token()
|
||||
client = APIClient()
|
||||
|
||||
url = reverse("api-public:schedules-list")
|
||||
|
||||
data = {
|
||||
"team_id": None,
|
||||
"name": "schedule test name",
|
||||
"time_zone": "Europe/Moscow",
|
||||
"type": "web",
|
||||
}
|
||||
|
||||
response = client.post(url, data=data, format="json", HTTP_AUTHORIZATION=f"{token}")
|
||||
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
||||
assert response.json() == {"detail": "Web schedule creation is not enabled through API"}
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_update_web_schedule(
|
||||
make_organization_and_user_with_token,
|
||||
|
|
|
|||
|
|
@ -56,9 +56,6 @@ class OnCallScheduleChannelView(RateLimitHeadersMixin, UpdateSerializerMixin, Mo
|
|||
raise NotFound
|
||||
|
||||
def perform_create(self, serializer):
|
||||
if serializer.validated_data["type"] == "web":
|
||||
raise BadRequest(detail="Web schedule creation is not enabled through API")
|
||||
|
||||
serializer.save()
|
||||
instance = serializer.instance
|
||||
|
||||
|
|
|
|||
|
|
@ -49,7 +49,7 @@ def main() -> None:
|
|||
]
|
||||
|
||||
print("▶ Fetching schedules...")
|
||||
schedules = session.list_all("schedules")
|
||||
schedules = session.list_all("schedules", params={"include[]": "schedule_layers"})
|
||||
oncall_schedules = oncall_api_client.list_all("schedules")
|
||||
|
||||
print("▶ Fetching escalation policies...")
|
||||
|
|
@ -72,8 +72,12 @@ def main() -> None:
|
|||
for user in users:
|
||||
match_user(user, oncall_users)
|
||||
|
||||
user_id_map = {
|
||||
u["id"]: u["oncall_user"]["id"] if u["oncall_user"] else None for u in users
|
||||
}
|
||||
|
||||
for schedule in schedules:
|
||||
match_schedule(schedule, oncall_schedules)
|
||||
match_schedule(schedule, oncall_schedules, user_id_map)
|
||||
match_users_for_schedule(schedule, users)
|
||||
|
||||
for policy in escalation_policies:
|
||||
|
|
@ -105,8 +109,8 @@ def main() -> None:
|
|||
|
||||
print("▶ Migrating schedules...")
|
||||
for schedule in schedules:
|
||||
if not schedule["unmatched_users"]:
|
||||
migrate_schedule(schedule)
|
||||
if not schedule["unmatched_users"] and not schedule["migration_errors"]:
|
||||
migrate_schedule(schedule, user_id_map)
|
||||
print(TAB + format_schedule(schedule))
|
||||
|
||||
print("▶ Migrating escalation policies...")
|
||||
|
|
|
|||
|
|
@ -35,3 +35,9 @@ PAGERDUTY_TO_ONCALL_VENDOR_MAP = {
|
|||
"Elastic Alerts": "elastalert",
|
||||
"Firebase": "fabric",
|
||||
}
|
||||
|
||||
SCHEDULE_MIGRATION_MODE_ICAL = "ical"
|
||||
SCHEDULE_MIGRATION_MODE_WEB = "web"
|
||||
SCHEDULE_MIGRATION_MODE = os.getenv(
|
||||
"SCHEDULE_MIGRATION_MODE", SCHEDULE_MIGRATION_MODE_ICAL
|
||||
)
|
||||
|
|
|
|||
|
|
@ -17,10 +17,18 @@ def format_user(user: dict) -> str:
|
|||
|
||||
|
||||
def format_schedule(schedule: dict) -> str:
|
||||
if schedule["unmatched_users"]:
|
||||
if schedule["unmatched_users"] and schedule["migration_errors"]:
|
||||
result = "{} {} — schedule references unmatched users and some layers cannot be migrated".format(
|
||||
ERROR_SIGN, schedule["name"]
|
||||
)
|
||||
elif schedule["unmatched_users"]:
|
||||
result = "{} {} — schedule references unmatched users".format(
|
||||
ERROR_SIGN, schedule["name"]
|
||||
)
|
||||
elif schedule["migration_errors"]:
|
||||
result = "{} {} — some layers cannot be migrated".format(
|
||||
ERROR_SIGN, schedule["name"]
|
||||
)
|
||||
else:
|
||||
result = "{} {}".format(SUCCESS_SIGN, schedule["name"])
|
||||
|
||||
|
|
@ -29,7 +37,7 @@ def format_schedule(schedule: dict) -> str:
|
|||
|
||||
def format_escalation_policy(policy: dict) -> str:
|
||||
if policy["unmatched_users"] and policy["flawed_schedules"]:
|
||||
result = "{} {} — policy references unmatched users and schedules with unmatched users".format(
|
||||
result = "{} {} — policy references unmatched users and schedules that cannot be migrated".format(
|
||||
ERROR_SIGN, policy["name"]
|
||||
)
|
||||
elif policy["unmatched_users"]:
|
||||
|
|
@ -37,7 +45,7 @@ def format_escalation_policy(policy: dict) -> str:
|
|||
ERROR_SIGN, policy["name"]
|
||||
)
|
||||
elif policy["flawed_schedules"]:
|
||||
result = "{} {} — policy references schedules with unmatched users".format(
|
||||
result = "{} {} — policy references schedules that cannot be migrated".format(
|
||||
ERROR_SIGN, policy["name"]
|
||||
)
|
||||
else:
|
||||
|
|
@ -61,7 +69,7 @@ def format_integration(integration: dict) -> str:
|
|||
|
||||
elif integration["is_escalation_policy_flawed"]:
|
||||
policy_name = integration["service"]["escalation_policy"]["summary"]
|
||||
result = "{} {} — escalation policy '{}' references unmatched users or schedules with unmatched users".format(
|
||||
result = "{} {} — escalation policy '{}' references unmatched users or schedules that cannot be migrated".format(
|
||||
ERROR_SIGN, result, policy_name
|
||||
)
|
||||
else:
|
||||
|
|
@ -85,10 +93,16 @@ def user_report(users: list[dict]) -> str:
|
|||
def schedule_report(schedules: list[dict]) -> str:
|
||||
result = "Schedule report:"
|
||||
|
||||
for schedule in sorted(schedules, key=lambda s: bool(s["unmatched_users"])):
|
||||
for schedule in sorted(
|
||||
schedules, key=lambda s: bool(s["unmatched_users"] or s["migration_errors"])
|
||||
):
|
||||
result += "\n" + TAB + format_schedule(schedule)
|
||||
|
||||
if not schedule["unmatched_users"] and schedule["oncall_schedule"]:
|
||||
if (
|
||||
not schedule["unmatched_users"]
|
||||
and schedule["oncall_schedule"]
|
||||
and not schedule["migration_errors"]
|
||||
):
|
||||
result += " (existing schedule with name '{}' will be deleted)".format(
|
||||
schedule["oncall_schedule"]["name"]
|
||||
)
|
||||
|
|
@ -96,6 +110,9 @@ def schedule_report(schedules: list[dict]) -> str:
|
|||
for user in schedule["unmatched_users"]:
|
||||
result += "\n" + TAB * 2 + format_user(user)
|
||||
|
||||
for error in schedule["migration_errors"]:
|
||||
result += "\n" + TAB * 2 + "{} {}".format(ERROR_SIGN, error)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -1,27 +1,563 @@
|
|||
import datetime
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
from uuid import uuid4
|
||||
|
||||
from migrator import oncall_api_client
|
||||
from migrator.config import (
|
||||
SCHEDULE_MIGRATION_MODE,
|
||||
SCHEDULE_MIGRATION_MODE_ICAL,
|
||||
SCHEDULE_MIGRATION_MODE_WEB,
|
||||
)
|
||||
|
||||
|
||||
def match_schedule(schedule: dict, oncall_schedules: list[dict]) -> None:
|
||||
def match_schedule(
|
||||
schedule: dict, oncall_schedules: list[dict], user_id_map: dict[str, str]
|
||||
) -> None:
|
||||
oncall_schedule = None
|
||||
for candidate in oncall_schedules:
|
||||
if schedule["name"].lower().strip() == candidate["name"].lower().strip():
|
||||
oncall_schedule = candidate
|
||||
|
||||
schedule["migration_errors"] = []
|
||||
if SCHEDULE_MIGRATION_MODE == SCHEDULE_MIGRATION_MODE_WEB:
|
||||
_, errors = Schedule.from_dict(schedule).to_oncall_schedule(user_id_map)
|
||||
schedule["migration_errors"] = errors
|
||||
|
||||
schedule["oncall_schedule"] = oncall_schedule
|
||||
|
||||
|
||||
def migrate_schedule(schedule: dict) -> None:
|
||||
def migrate_schedule(schedule: dict, user_id_map: dict[str, str]) -> None:
|
||||
if schedule["oncall_schedule"]:
|
||||
oncall_api_client.delete(
|
||||
"schedules/{}".format(schedule["oncall_schedule"]["id"])
|
||||
)
|
||||
|
||||
payload = {
|
||||
"name": schedule["name"],
|
||||
"type": "ical",
|
||||
"ical_url_primary": schedule["http_cal_url"],
|
||||
"team_id": None,
|
||||
}
|
||||
oncall_schedule = oncall_api_client.create("schedules", payload)
|
||||
if SCHEDULE_MIGRATION_MODE == SCHEDULE_MIGRATION_MODE_WEB:
|
||||
# Migrate shifts
|
||||
oncall_schedule = Schedule.from_dict(schedule).migrate(user_id_map)
|
||||
elif SCHEDULE_MIGRATION_MODE == SCHEDULE_MIGRATION_MODE_ICAL:
|
||||
# Migrate using ICal URL
|
||||
payload = {
|
||||
"name": schedule["name"],
|
||||
"type": "ical",
|
||||
"ical_url_primary": schedule["http_cal_url"],
|
||||
"team_id": None,
|
||||
}
|
||||
oncall_schedule = oncall_api_client.create("schedules", payload)
|
||||
else:
|
||||
raise ValueError("Invalid schedule migration mode")
|
||||
|
||||
schedule["oncall_schedule"] = oncall_schedule
|
||||
|
||||
|
||||
def duration_to_frequency_and_interval(duration: datetime.timedelta) -> tuple[str, int]:
|
||||
"""
|
||||
Convert a duration to shift frequency and interval.
|
||||
For example, 1 day duration returns ("daily", 1), 14 days returns ("weekly", 2),
|
||||
"""
|
||||
seconds = int(duration.total_seconds())
|
||||
|
||||
assert seconds >= 3600, "Rotation must be at least 1 hour"
|
||||
hours = seconds // 3600
|
||||
|
||||
if hours >= 24 and hours % 24 == 0:
|
||||
days = hours // 24
|
||||
if days >= 7 and days % 7 == 0:
|
||||
weeks = days // 7
|
||||
return "weekly", weeks
|
||||
else:
|
||||
return "daily", days
|
||||
else:
|
||||
return "hourly", hours
|
||||
|
||||
|
||||
def _pd_datetime_to_dt(text: str) -> datetime.datetime:
|
||||
"""
|
||||
Convert a PagerDuty datetime string to a datetime object.
|
||||
"""
|
||||
return datetime.datetime.fromisoformat(text)
|
||||
|
||||
|
||||
def _dt_to_oncall_datetime(dt: datetime.datetime) -> str:
|
||||
"""
|
||||
Convert a datetime object to an OnCall datetime string.
|
||||
"""
|
||||
return dt.strftime("%Y-%m-%dT%H:%M:%S")
|
||||
|
||||
|
||||
@dataclass
|
||||
class Schedule:
|
||||
"""
|
||||
Utility class for converting a PagerDuty schedule to an OnCall schedule.
|
||||
A PagerDuty schedule has multiple layers, each with a rotation of users.
|
||||
"""
|
||||
|
||||
name: str
|
||||
time_zone: str
|
||||
layers: list["Layer"]
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, schedule: dict) -> "Schedule":
|
||||
"""
|
||||
Create a Schedule object from a PagerDuty API response for a schedule.
|
||||
"""
|
||||
|
||||
layers = []
|
||||
# PagerDuty API returns layers in reverse order (e.g. Layer 3, Layer 2, Layer 1)
|
||||
for level, layer_dict in enumerate(reversed(schedule["schedule_layers"])):
|
||||
layer = Layer.from_dict(layer_dict, level)
|
||||
|
||||
# skip any layers that have already ended
|
||||
if layer.end and layer.end < datetime.datetime.now(datetime.timezone.utc):
|
||||
continue
|
||||
|
||||
layers.append(layer)
|
||||
|
||||
return cls(
|
||||
name=schedule["name"],
|
||||
time_zone=schedule["time_zone"],
|
||||
layers=layers,
|
||||
)
|
||||
|
||||
def to_oncall_schedule(
|
||||
self, user_id_map: dict[str, str]
|
||||
) -> tuple[Optional[dict], list[str]]:
|
||||
"""
|
||||
Convert a Schedule object to an OnCall schedule.
|
||||
Note that it also returns shifts, but these are not created at the same time as the schedule (see migrate method for more info).
|
||||
"""
|
||||
|
||||
shifts = []
|
||||
errors = []
|
||||
for layer in self.layers:
|
||||
# A single PagerDuty layer can result in multiple OnCall shifts
|
||||
layer_shifts, error = layer.to_oncall_shifts(user_id_map)
|
||||
|
||||
if layer_shifts:
|
||||
shifts += layer_shifts
|
||||
|
||||
if error:
|
||||
error_text = f"{layer.name}: {error}"
|
||||
|
||||
# If a layer has a single user, it's likely can be easily tweaked in PD to make it possible to migrate
|
||||
if len(set(layer.user_ids)) == 1:
|
||||
error_text += " Layer has a single user, consider simplifying the rotation in PD."
|
||||
|
||||
errors.append(error_text)
|
||||
|
||||
if errors:
|
||||
return None, errors
|
||||
|
||||
return {
|
||||
"name": self.name,
|
||||
"type": "web",
|
||||
"team_id": None,
|
||||
"time_zone": self.time_zone,
|
||||
"shifts": shifts,
|
||||
}, []
|
||||
|
||||
def migrate(self, user_id_map: dict[str, str]) -> dict:
|
||||
"""
|
||||
Create an OnCall schedule and its shifts.
|
||||
First create the shifts, then create a schedule with shift IDs provided.
|
||||
"""
|
||||
|
||||
schedule, errors = self.to_oncall_schedule(user_id_map)
|
||||
assert not errors, "Unexpected errors: {}".format(errors)
|
||||
|
||||
# Create shifts in OnCall
|
||||
shift_ids = []
|
||||
for shift in schedule["shifts"]:
|
||||
created_shift = oncall_api_client.create("on_call_shifts", shift)
|
||||
shift_ids.append(created_shift["id"])
|
||||
|
||||
# Create schedule in OnCall with shift IDs provided
|
||||
schedule["shifts"] = shift_ids
|
||||
new_schedule = oncall_api_client.create("schedules", schedule)
|
||||
|
||||
return new_schedule
|
||||
|
||||
|
||||
@dataclass
|
||||
class Layer:
|
||||
"""
|
||||
Utility class for converting a PagerDuty schedule layer to OnCall shifts.
|
||||
"""
|
||||
|
||||
name: str
|
||||
level: int
|
||||
|
||||
rotation_virtual_start: datetime.datetime
|
||||
rotation_turn_length: datetime.timedelta
|
||||
|
||||
start: datetime.datetime
|
||||
end: Optional[datetime.datetime]
|
||||
|
||||
user_ids: list[str]
|
||||
restrictions: list["Restriction"]
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, layer: dict, level: int) -> "Layer":
|
||||
"""
|
||||
Create a Layer object from a PagerDuty API response for a schedule layer.
|
||||
Converts PagerDuty datetime strings to datetime objects for easier manipulation.
|
||||
"""
|
||||
return cls(
|
||||
name=layer["name"],
|
||||
level=level,
|
||||
rotation_virtual_start=_pd_datetime_to_dt(layer["rotation_virtual_start"]),
|
||||
rotation_turn_length=datetime.timedelta(
|
||||
seconds=layer["rotation_turn_length_seconds"]
|
||||
),
|
||||
start=_pd_datetime_to_dt(layer["start"]),
|
||||
end=_pd_datetime_to_dt(layer["end"]) if layer["end"] else None,
|
||||
user_ids=[u["user"]["id"] for u in layer["users"]],
|
||||
restrictions=[Restriction.from_dict(r) for r in layer["restrictions"]],
|
||||
)
|
||||
|
||||
def to_oncall_shifts(
|
||||
self, user_id_map: dict[str, str]
|
||||
) -> tuple[Optional[list[dict]], Optional[str]]:
|
||||
frequency, interval = duration_to_frequency_and_interval(
|
||||
self.rotation_turn_length
|
||||
)
|
||||
rolling_users = []
|
||||
for user_id in self.user_ids:
|
||||
oncall_user_id = user_id_map[user_id]
|
||||
rolling_users.append([oncall_user_id])
|
||||
|
||||
if not self.restrictions:
|
||||
return [
|
||||
{
|
||||
"name": uuid4().hex,
|
||||
"level": self.level,
|
||||
"type": "rolling_users",
|
||||
"rotation_start": _dt_to_oncall_datetime(self.start),
|
||||
"until": _dt_to_oncall_datetime(self.end) if self.end else None,
|
||||
"start": _dt_to_oncall_datetime(self.rotation_virtual_start),
|
||||
"duration": int(self.rotation_turn_length.total_seconds()),
|
||||
"frequency": frequency,
|
||||
"interval": interval,
|
||||
"rolling_users": rolling_users,
|
||||
"start_rotation_from_user_index": 0,
|
||||
"week_start": "MO",
|
||||
"time_zone": self.rotation_virtual_start.tzname(),
|
||||
"source": 0, # 0 is alias for "web"
|
||||
}
|
||||
], None
|
||||
|
||||
restrictions_type = self.restrictions[0].type
|
||||
|
||||
if (frequency, restrictions_type) in (
|
||||
("daily", Restriction.Type.DAILY),
|
||||
("weekly", Restriction.Type.WEEKLY),
|
||||
):
|
||||
# TODO: some of this can use by_day?
|
||||
shifts, _ = self._generate_shifts(unique=False)
|
||||
shifts = [(s[0], s[1], "MO", None) for s in shifts]
|
||||
|
||||
elif frequency == "weekly" and restrictions_type == Restriction.Type.DAILY:
|
||||
shifts, is_split = self._generate_shifts(unique=True)
|
||||
|
||||
if is_split:
|
||||
return (
|
||||
None,
|
||||
f"Cannot migrate {interval}-weekly rotation with daily restrictions that are split by handoff.",
|
||||
)
|
||||
|
||||
# repeat ["MO", "TU", "WE", "TH", "FR", "SA", "SU"] shift for the number of weeks in the rotation
|
||||
shifts_for_multiple_weeks = []
|
||||
for shift in shifts:
|
||||
for week in range(interval):
|
||||
start = shift[0] + datetime.timedelta(weeks=week)
|
||||
end = shift[1] + datetime.timedelta(weeks=week)
|
||||
shifts_for_multiple_weeks.append((start, end))
|
||||
|
||||
shifts = [
|
||||
(
|
||||
shift[0],
|
||||
shift[1],
|
||||
["MO", "TU", "WE", "TH", "FR", "SA", "SU"][
|
||||
shift[0].date().weekday()
|
||||
],
|
||||
["MO", "TU", "WE", "TH", "FR", "SA", "SU"],
|
||||
)
|
||||
for shift in shifts_for_multiple_weeks
|
||||
]
|
||||
|
||||
elif (
|
||||
frequency == "daily"
|
||||
and restrictions_type == Restriction.Type.WEEKLY
|
||||
and interval == 1
|
||||
):
|
||||
# the only case when it's possible to migrate a daily rotation with weekly restrictions
|
||||
# is when the restrictions start at the same time as the shift start
|
||||
# and the restrictions are a multiple of 24 hours
|
||||
restrictions = Restriction.merge_restrictions(self.restrictions)
|
||||
for restriction in restrictions:
|
||||
if (
|
||||
not restriction.start_time_of_day
|
||||
== self.rotation_virtual_start.time()
|
||||
):
|
||||
return (
|
||||
None,
|
||||
f"Cannot migrate {interval}-daily rotation with weekly restrictions that start at a different time than the shift start.",
|
||||
)
|
||||
if not restriction.duration % datetime.timedelta(
|
||||
days=1
|
||||
) == datetime.timedelta(0):
|
||||
return (
|
||||
None,
|
||||
f"Cannot migrate {interval}-daily rotation with weekly restrictions that have durations that are not a multiple of a 24 hours.",
|
||||
)
|
||||
|
||||
# get the first restriction and use its start time as the start of the shift
|
||||
restriction, shift_start = Restriction.current_or_next_restriction(
|
||||
restrictions, self.rotation_virtual_start
|
||||
)
|
||||
shift_end = shift_start + datetime.timedelta(days=1)
|
||||
|
||||
# determine which days of the week are covered
|
||||
by_day = set()
|
||||
for restriction in restrictions:
|
||||
days = restriction.duration // datetime.timedelta(days=1)
|
||||
|
||||
for day in range(days):
|
||||
weekday = ["MO", "TU", "WE", "TH", "FR", "SA", "SU"][
|
||||
(restriction.start_day_of_week + day) % 7
|
||||
]
|
||||
by_day.add(weekday)
|
||||
|
||||
# sort by_day so that the order is always the same
|
||||
by_day = sorted(
|
||||
list(by_day),
|
||||
key=lambda d: ["MO", "TU", "WE", "TH", "FR", "SA", "SU"].index(d),
|
||||
)
|
||||
|
||||
shifts = [(shift_start, shift_end, "MO", by_day)]
|
||||
|
||||
else:
|
||||
restrictions_type_verbal = (
|
||||
"daily" if restrictions_type == Restriction.Type.DAILY else "weekly"
|
||||
)
|
||||
return (
|
||||
None,
|
||||
f"Cannot migrate {interval}-{frequency} rotation with {restrictions_type_verbal} restrictions.",
|
||||
)
|
||||
|
||||
payloads = []
|
||||
for shift in shifts:
|
||||
payload = {
|
||||
"name": uuid4().hex,
|
||||
"level": self.level,
|
||||
"type": "rolling_users",
|
||||
"rotation_start": _dt_to_oncall_datetime(self.start),
|
||||
"until": _dt_to_oncall_datetime(self.end) if self.end else None,
|
||||
"start": _dt_to_oncall_datetime(shift[0]),
|
||||
"duration": int((shift[1] - shift[0]).total_seconds()),
|
||||
"frequency": frequency,
|
||||
"interval": interval,
|
||||
"by_day": shift[3],
|
||||
"rolling_users": rolling_users,
|
||||
"start_rotation_from_user_index": 0,
|
||||
"week_start": shift[2],
|
||||
"time_zone": self.rotation_virtual_start.tzname(),
|
||||
"source": 0, # 0 is alias for "web"
|
||||
}
|
||||
payloads.append(payload)
|
||||
return payloads, None
|
||||
|
||||
def _generate_shifts(
|
||||
self, unique: bool = True
|
||||
) -> tuple[list[tuple[datetime.datetime, datetime.datetime]], bool]:
|
||||
"""
|
||||
Returns a list of (start, end) tuples representing the shifts in this layer.
|
||||
Note that these are not the actual shifts for OnCall API but rather a list of
|
||||
unique intervals generated by traversing the restrictions.
|
||||
|
||||
Also returns a boolean indicating whether there are restrictions split by on-call handoff.
|
||||
"""
|
||||
|
||||
start = self.rotation_virtual_start
|
||||
end = self.rotation_virtual_start + self.rotation_turn_length
|
||||
|
||||
# Convert restrictions to weekly restrictions, then merge overlapping ones
|
||||
restrictions = []
|
||||
for restriction in self.restrictions:
|
||||
restrictions += restriction.to_weekly_restrictions()
|
||||
restrictions = Restriction.merge_restrictions(restrictions)
|
||||
|
||||
is_split = False
|
||||
current = start
|
||||
shifts = []
|
||||
unique_shift_times = []
|
||||
|
||||
while current < end:
|
||||
restriction, restriction_start = Restriction.current_or_next_restriction(
|
||||
restrictions, current
|
||||
)
|
||||
restriction_end = restriction_start + restriction.duration
|
||||
|
||||
shift_start = max(current, restriction_start)
|
||||
shift_end = min(restriction_end, end)
|
||||
|
||||
# If the next restriction starts after the end of the rotation or shift is empty, we're done
|
||||
if restriction_start > end or shift_start == shift_end:
|
||||
break
|
||||
|
||||
# Check if restriction is split by handoff
|
||||
if (shift_start == start and shift_start > restriction_start) or (
|
||||
shift_end == end and shift_end < restriction_end
|
||||
):
|
||||
is_split = True
|
||||
|
||||
shift = (shift_start, shift_end)
|
||||
|
||||
# check that we haven't already added this shift
|
||||
if (
|
||||
not unique
|
||||
or (shift[0].time(), shift[1].time()) not in unique_shift_times
|
||||
):
|
||||
shifts.append(shift)
|
||||
unique_shift_times.append((shift[0].time(), shift[1].time()))
|
||||
|
||||
current = shift_end
|
||||
|
||||
return shifts, is_split
|
||||
|
||||
|
||||
@dataclass
|
||||
class Restriction:
|
||||
"""
|
||||
Utility class for representing a restriction on a rotation in PagerDuty.
|
||||
"""
|
||||
|
||||
class Type(Enum):
|
||||
DAILY = "daily_restriction"
|
||||
WEEKLY = "weekly_restriction"
|
||||
|
||||
type: Type
|
||||
start_time_of_day: datetime.time
|
||||
duration: datetime.timedelta
|
||||
start_day_of_week: Optional[int] # this is only present for weekly restrictions
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, restriction: dict) -> "Restriction":
|
||||
"""
|
||||
Create a Restriction object from PagerDuty's API representation.
|
||||
Converts PagerDuty datetime strings to datetime objects for easier manipulation as well.
|
||||
"""
|
||||
|
||||
# PagerDuty's API uses 1-indexed days of the week, converting to 0-indexed for ease of use
|
||||
start_day_of_week = restriction.get("start_day_of_week")
|
||||
if start_day_of_week is not None:
|
||||
start_day_of_week -= 1
|
||||
|
||||
return cls(
|
||||
type=Restriction.Type(restriction["type"]),
|
||||
start_time_of_day=datetime.time.fromisoformat(
|
||||
restriction["start_time_of_day"]
|
||||
),
|
||||
duration=datetime.timedelta(seconds=restriction["duration_seconds"]),
|
||||
start_day_of_week=start_day_of_week,
|
||||
)
|
||||
|
||||
def to_weekly_restrictions(self) -> list["Restriction"]:
|
||||
"""
|
||||
Convert a daily restriction to a list of weekly restrictions.
|
||||
Daily restriction is basically 7 weekly restrictions with the same start time and duration,
|
||||
but different days of the week (e.g. 9am-5pm daily restriction is the same as 9am-5pm on Monday, 9am-5pm on Tuesday, etc.)
|
||||
|
||||
Converting to weekly restrictions makes it easier to work with restrictions and only care about weekly ones.
|
||||
"""
|
||||
|
||||
if self.type == Restriction.Type.WEEKLY:
|
||||
return [self]
|
||||
else:
|
||||
return [
|
||||
Restriction(
|
||||
type=Restriction.Type.WEEKLY,
|
||||
start_time_of_day=self.start_time_of_day,
|
||||
duration=self.duration,
|
||||
start_day_of_week=day,
|
||||
)
|
||||
for day in range(7)
|
||||
]
|
||||
|
||||
@staticmethod
|
||||
def merge_restrictions(restrictions: list["Restriction"]) -> list["Restriction"]:
|
||||
"""
|
||||
Merge a list of weekly restrictions into a list of the fewest possible weekly restrictions that cover the same time period.
|
||||
Example: (9am - 5pm restriction on Monday, 10am - 4pm restriction on Monday) -> (9am - 5pm restriction on Monday).
|
||||
|
||||
Only works on weekly restrictions, as daily restrictions are converted to weekly restrictions first.
|
||||
"""
|
||||
|
||||
assert all(r.type == Restriction.Type.WEEKLY for r in restrictions)
|
||||
|
||||
restrictions = sorted(
|
||||
restrictions, key=lambda r: (r.start_day_of_week, r.start_time_of_day)
|
||||
)
|
||||
merged = []
|
||||
|
||||
for restriction in restrictions:
|
||||
restriction_start = datetime.datetime.combine(
|
||||
datetime.date.min
|
||||
+ datetime.timedelta(days=restriction.start_day_of_week),
|
||||
restriction.start_time_of_day,
|
||||
)
|
||||
restriction_end = restriction_start + restriction.duration
|
||||
|
||||
if not merged:
|
||||
merged.append(restriction)
|
||||
continue
|
||||
|
||||
last = merged[-1]
|
||||
last_start = datetime.datetime.combine(
|
||||
datetime.date.min + datetime.timedelta(days=last.start_day_of_week),
|
||||
last.start_time_of_day,
|
||||
)
|
||||
last_end = last_start + last.duration
|
||||
|
||||
if last_end < restriction_start:
|
||||
merged.append(restriction)
|
||||
else:
|
||||
restriction.start_day_of_week = last_start.weekday()
|
||||
restriction.start_time_of_day = last_start.time()
|
||||
restriction.duration = max(last_end, restriction_end) - last_start
|
||||
merged = merged[:-1] + [restriction]
|
||||
|
||||
return merged
|
||||
|
||||
@staticmethod
|
||||
def current_or_next_restriction(
|
||||
restrictions: list["Restriction"], dt: datetime.datetime
|
||||
) -> tuple["Restriction", datetime.datetime]:
|
||||
"""
|
||||
Get the current or next restriction for a given datetime.
|
||||
This is useful for finding all the restrictions that apply to a given rotation shift.
|
||||
"""
|
||||
assert all(r.type == Restriction.Type.WEEKLY for r in restrictions)
|
||||
|
||||
for weeks in (-1, 0, 1): # check last week, this week, and next week
|
||||
for restriction in restrictions:
|
||||
restriction_date = (
|
||||
dt.date()
|
||||
- datetime.timedelta(days=dt.weekday())
|
||||
+ datetime.timedelta(days=restriction.start_day_of_week)
|
||||
+ datetime.timedelta(weeks=weeks)
|
||||
)
|
||||
restriction_start = datetime.datetime.combine(
|
||||
restriction_date,
|
||||
restriction.start_time_of_day,
|
||||
tzinfo=datetime.timezone.utc,
|
||||
)
|
||||
restriction_end = restriction_start + restriction.duration
|
||||
|
||||
if restriction_end > dt:
|
||||
return restriction, restriction_start
|
||||
|
||||
# there should always be a restriction
|
||||
raise ValueError("No restriction found for given datetime")
|
||||
|
|
|
|||
|
|
@ -53,7 +53,7 @@ def match_users_and_schedules_for_escalation_policy(
|
|||
if not schedule:
|
||||
continue
|
||||
|
||||
if schedule["unmatched_users"]:
|
||||
if schedule["unmatched_users"] or schedule["migration_errors"]:
|
||||
flawed_schedule_ids.add(target_id)
|
||||
|
||||
policy["unmatched_users"] = [
|
||||
|
|
|
|||
|
|
@ -785,6 +785,7 @@ expected_schedules_result = [
|
|||
"on_call_now": [],
|
||||
"slack": None,
|
||||
},
|
||||
"migration_errors": [],
|
||||
"unmatched_users": [
|
||||
{
|
||||
"id": "TESTUSER2",
|
||||
|
|
@ -842,6 +843,7 @@ expected_schedules_result = [
|
|||
"teams": [],
|
||||
"oncall_schedule": None,
|
||||
"unmatched_users": [],
|
||||
"migration_errors": [],
|
||||
},
|
||||
{
|
||||
"id": "TESTSCH3",
|
||||
|
|
@ -866,6 +868,7 @@ expected_schedules_result = [
|
|||
"escalation_policies": [],
|
||||
"teams": [],
|
||||
"oncall_schedule": None,
|
||||
"migration_errors": [],
|
||||
"unmatched_users": [
|
||||
{
|
||||
"id": "TESTUSER2",
|
||||
|
|
@ -924,6 +927,7 @@ expected_schedules_result = [
|
|||
"teams": [],
|
||||
"oncall_schedule": None,
|
||||
"unmatched_users": [],
|
||||
"migration_errors": [],
|
||||
},
|
||||
]
|
||||
expected_escalation_policies_result = [
|
||||
|
|
@ -1028,6 +1032,7 @@ expected_escalation_policies_result = [
|
|||
"on_call_now": [],
|
||||
"slack": None,
|
||||
},
|
||||
"migration_errors": [],
|
||||
"unmatched_users": [
|
||||
{
|
||||
"id": "TESTUSER2",
|
||||
|
|
@ -1570,7 +1575,7 @@ def test_match_user_not_found():
|
|||
|
||||
def test_match_schedule():
|
||||
for schedule in pd_schedules_payload:
|
||||
match_schedule(schedule, oncall_schedules_payload)
|
||||
match_schedule(schedule, oncall_schedules_payload, user_id_map={})
|
||||
match_users_for_schedule(schedule, pd_users_payload)
|
||||
|
||||
assert pd_schedules_payload == expected_schedules_result
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ def test_match_schedule_name_case_insensitive():
|
|||
pd_schedule = {"name": "Test"}
|
||||
oncall_schedules = [{"name": "test"}]
|
||||
|
||||
match_schedule(pd_schedule, oncall_schedules)
|
||||
match_schedule(pd_schedule, oncall_schedules, user_id_map={})
|
||||
assert pd_schedule["oncall_schedule"] == oncall_schedules[0]
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ def test_match_schedule_name_extra_spaces():
|
|||
pd_schedule = {"name": " test "}
|
||||
oncall_schedules = [{"name": "test"}]
|
||||
|
||||
match_schedule(pd_schedule, oncall_schedules)
|
||||
match_schedule(pd_schedule, oncall_schedules, user_id_map={})
|
||||
assert pd_schedule["oncall_schedule"] == oncall_schedules[0]
|
||||
|
||||
|
||||
|
|
|
|||
2125
tools/pagerduty-migrator/migrator/tests/test_schedules.py
Normal file
2125
tools/pagerduty-migrator/migrator/tests/test_schedules.py
Normal file
File diff suppressed because it is too large
Load diff
Loading…
Add table
Reference in a new issue