commit
41e68a6db6
19 changed files with 2777 additions and 59 deletions
11
CHANGELOG.md
11
CHANGELOG.md
|
|
@ -5,6 +5,17 @@ All notable changes to this project will be documented in this file.
|
|||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## v1.1.29 (2023-02-23)
|
||||
|
||||
### Changed
|
||||
|
||||
- Allow creating schedules with type "web" using public API
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed minor issue during the sync process where an HTTP 302 (redirect) status code from the Grafana
|
||||
instance would cause the sync to not properly finish
|
||||
|
||||
## v1.1.28 (2023-02-23)
|
||||
|
||||
### Fixed
|
||||
|
|
|
|||
|
|
@ -18,8 +18,27 @@ def notify_all_task(alert_group_pk, escalation_policy_snapshot_order=None):
|
|||
|
||||
alert_group = AlertGroup.all_objects.get(pk=alert_group_pk)
|
||||
|
||||
# check alert group state before notifying all users in the channel
|
||||
if alert_group.resolved or alert_group.acknowledged or alert_group.silenced:
|
||||
task_logger.info(f"alert_group {alert_group.pk} was resolved, acked or silenced forever. No need to notify all")
|
||||
return
|
||||
|
||||
escalation_snapshot = alert_group.escalation_snapshot
|
||||
escalation_policy_snapshot = escalation_snapshot.escalation_policies_snapshots[escalation_policy_snapshot_order]
|
||||
try:
|
||||
escalation_policy_snapshot = escalation_snapshot.escalation_policies_snapshots[escalation_policy_snapshot_order]
|
||||
except IndexError:
|
||||
escalation_policy_snapshot = None
|
||||
|
||||
if not escalation_policy_snapshot:
|
||||
# The step has an incorrect order. Probably the order was changed manually with terraform.
|
||||
# It is a quick fix, tasks notify_all_task and notify_group_task should be refactored to avoid getting snapshot
|
||||
# by order
|
||||
task_logger.warning(
|
||||
f"escalation_policy_snapshot for alert_group {alert_group.pk} with order "
|
||||
f"{escalation_policy_snapshot_order} is not found. Skip step"
|
||||
)
|
||||
return
|
||||
|
||||
escalation_policy_pk = escalation_policy_snapshot.id
|
||||
escalation_policy = EscalationPolicy.objects.filter(pk=escalation_policy_pk).first()
|
||||
escalation_policy_step = escalation_policy_snapshot.step
|
||||
|
|
|
|||
|
|
@ -20,6 +20,10 @@ def notify_group_task(alert_group_pk, escalation_policy_snapshot_order=None):
|
|||
EscalationDeliveryStep = scenario_step.ScenarioStep.get_step("escalation_delivery", "EscalationDeliveryStep")
|
||||
|
||||
alert_group = AlertGroup.all_objects.get(pk=alert_group_pk)
|
||||
# check alert group state before notifying all users in the group
|
||||
if alert_group.resolved or alert_group.acknowledged or alert_group.silenced:
|
||||
task_logger.info(f"alert_group {alert_group.pk} was resolved, acked or silenced. No need to notify group")
|
||||
return
|
||||
|
||||
organization = alert_group.channel.organization
|
||||
slack_team_identity = organization.slack_team_identity
|
||||
|
|
@ -31,7 +35,21 @@ def notify_group_task(alert_group_pk, escalation_policy_snapshot_order=None):
|
|||
step = EscalationDeliveryStep(slack_team_identity, organization)
|
||||
|
||||
escalation_snapshot = alert_group.escalation_snapshot
|
||||
escalation_policy_snapshot = escalation_snapshot.escalation_policies_snapshots[escalation_policy_snapshot_order]
|
||||
try:
|
||||
escalation_policy_snapshot = escalation_snapshot.escalation_policies_snapshots[escalation_policy_snapshot_order]
|
||||
except IndexError:
|
||||
escalation_policy_snapshot = None
|
||||
|
||||
if not escalation_policy_snapshot:
|
||||
# The step has an incorrect order. Probably the order was changed manually with terraform.
|
||||
# It is a quick fix, tasks notify_all_task and notify_group_task should be refactored to avoid getting snapshot
|
||||
# by order
|
||||
task_logger.warning(
|
||||
f"escalation_policy_snapshot for alert_group {alert_group.pk} with order "
|
||||
f"{escalation_policy_snapshot_order} is not found. Skip step"
|
||||
)
|
||||
return
|
||||
|
||||
escalation_policy_pk = escalation_policy_snapshot.id
|
||||
escalation_policy = EscalationPolicy.objects.filter(pk=escalation_policy_pk).first()
|
||||
escalation_policy_step = escalation_policy_snapshot.step
|
||||
|
|
|
|||
|
|
@ -152,7 +152,7 @@ class GrafanaAPIClient(APIClient):
|
|||
|
||||
def is_rbac_enabled_for_organization(self) -> bool:
|
||||
_, resp_status = self.api_head(self.USER_PERMISSION_ENDPOINT)
|
||||
return resp_status["status_code"] == status.HTTP_200_OK
|
||||
return resp_status["connected"]
|
||||
|
||||
def get_users(self, rbac_is_enabled_for_org: bool, **kwargs) -> List[GrafanaUserWithPermissions]:
|
||||
users, _ = self.api_get("api/org/users", **kwargs)
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
from rest_framework import status
|
||||
|
||||
from apps.grafana_plugin.helpers.client import GrafanaAPIClient
|
||||
|
||||
|
|
@ -44,17 +43,17 @@ class TestGetUsersPermissions:
|
|||
|
||||
class TestIsRbacEnabledForOrganization:
|
||||
@pytest.mark.parametrize(
|
||||
"grafana_api_status_code,expected",
|
||||
"api_response_connected,expected",
|
||||
[
|
||||
(status.HTTP_200_OK, True),
|
||||
(status.HTTP_404_NOT_FOUND, False),
|
||||
(True, True),
|
||||
(False, False),
|
||||
],
|
||||
)
|
||||
@patch("apps.grafana_plugin.helpers.client.GrafanaAPIClient.api_head")
|
||||
def test_it_returns_based_on_status_code_of_head_call(
|
||||
self, mocked_grafana_api_client_api_head, grafana_api_status_code, expected
|
||||
self, mocked_grafana_api_client_api_head, api_response_connected, expected
|
||||
):
|
||||
mocked_grafana_api_client_api_head.return_value = (None, {"status_code": grafana_api_status_code})
|
||||
mocked_grafana_api_client_api_head.return_value = (None, {"connected": api_response_connected})
|
||||
|
||||
api_client = GrafanaAPIClient(API_URL, API_TOKEN)
|
||||
assert api_client.is_rbac_enabled_for_organization() == expected
|
||||
|
|
|
|||
|
|
@ -238,7 +238,7 @@ class CustomOnCallShiftSerializer(EagerLoadingMixin, serializers.ModelSerializer
|
|||
data["users"] = []
|
||||
if data.get("rolling_users", []) is None: # terraform case
|
||||
data["rolling_users"] = []
|
||||
if data.get("source") != CustomOnCallShift.SOURCE_TERRAFORM:
|
||||
if data.get("source") not in (CustomOnCallShift.SOURCE_TERRAFORM, CustomOnCallShift.SOURCE_WEB):
|
||||
data["source"] = CustomOnCallShift.SOURCE_API
|
||||
if data.get("start") is not None:
|
||||
self._validate_start(data["start"])
|
||||
|
|
|
|||
|
|
@ -232,25 +232,6 @@ def test_get_web_schedule(
|
|||
assert response.json() == result
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_create_web_schedule(make_organization_and_user_with_token):
|
||||
_, _, token = make_organization_and_user_with_token()
|
||||
client = APIClient()
|
||||
|
||||
url = reverse("api-public:schedules-list")
|
||||
|
||||
data = {
|
||||
"team_id": None,
|
||||
"name": "schedule test name",
|
||||
"time_zone": "Europe/Moscow",
|
||||
"type": "web",
|
||||
}
|
||||
|
||||
response = client.post(url, data=data, format="json", HTTP_AUTHORIZATION=f"{token}")
|
||||
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
||||
assert response.json() == {"detail": "Web schedule creation is not enabled through API"}
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_update_web_schedule(
|
||||
make_organization_and_user_with_token,
|
||||
|
|
|
|||
|
|
@ -56,9 +56,6 @@ class OnCallScheduleChannelView(RateLimitHeadersMixin, UpdateSerializerMixin, Mo
|
|||
raise NotFound
|
||||
|
||||
def perform_create(self, serializer):
|
||||
if serializer.validated_data["type"] == "web":
|
||||
raise BadRequest(detail="Web schedule creation is not enabled through API")
|
||||
|
||||
serializer.save()
|
||||
instance = serializer.instance
|
||||
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ def sync_organization(organization):
|
|||
_sync_instance_info(organization)
|
||||
|
||||
_, check_token_call_status = grafana_api_client.check_token()
|
||||
if check_token_call_status["status_code"] == 200:
|
||||
if check_token_call_status["connected"]:
|
||||
organization.api_token_status = Organization.API_TOKEN_STATUS_OK
|
||||
sync_users_and_teams(grafana_api_client, organization)
|
||||
organization.last_time_synced = timezone.now()
|
||||
|
|
|
|||
|
|
@ -134,7 +134,7 @@ def test_sync_organization(make_organization, make_team, make_user_for_organizat
|
|||
},
|
||||
)
|
||||
|
||||
api_check_token_call_status = {"status_code": 200}
|
||||
api_check_token_call_status = {"connected": True}
|
||||
|
||||
with patch.object(GrafanaAPIClient, "is_rbac_enabled_for_organization", return_value=False):
|
||||
with patch.object(GrafanaAPIClient, "get_users", return_value=api_users_response):
|
||||
|
|
@ -203,7 +203,7 @@ def test_sync_organization_is_rbac_permissions_enabled_open_source(make_organiza
|
|||
"userId": 1,
|
||||
},
|
||||
)
|
||||
api_check_token_call_status = {"status_code": 200}
|
||||
api_check_token_call_status = {"connected": True}
|
||||
|
||||
with patch.object(GrafanaAPIClient, "is_rbac_enabled_for_organization", return_value=grafana_api_response):
|
||||
with patch.object(GrafanaAPIClient, "get_users", return_value=api_users_response):
|
||||
|
|
@ -230,7 +230,7 @@ def test_sync_organization_is_rbac_permissions_enabled_cloud(mocked_gcom_client,
|
|||
stack_id = 5
|
||||
organization = make_organization(stack_id=stack_id)
|
||||
|
||||
api_check_token_call_status = {"status_code": 200}
|
||||
api_check_token_call_status = {"connected": True}
|
||||
|
||||
mocked_gcom_client.return_value.is_rbac_enabled_for_stack.return_value = gcom_api_response
|
||||
|
||||
|
|
|
|||
|
|
@ -49,7 +49,7 @@ def main() -> None:
|
|||
]
|
||||
|
||||
print("▶ Fetching schedules...")
|
||||
schedules = session.list_all("schedules")
|
||||
schedules = session.list_all("schedules", params={"include[]": "schedule_layers"})
|
||||
oncall_schedules = oncall_api_client.list_all("schedules")
|
||||
|
||||
print("▶ Fetching escalation policies...")
|
||||
|
|
@ -72,8 +72,12 @@ def main() -> None:
|
|||
for user in users:
|
||||
match_user(user, oncall_users)
|
||||
|
||||
user_id_map = {
|
||||
u["id"]: u["oncall_user"]["id"] if u["oncall_user"] else None for u in users
|
||||
}
|
||||
|
||||
for schedule in schedules:
|
||||
match_schedule(schedule, oncall_schedules)
|
||||
match_schedule(schedule, oncall_schedules, user_id_map)
|
||||
match_users_for_schedule(schedule, users)
|
||||
|
||||
for policy in escalation_policies:
|
||||
|
|
@ -105,8 +109,8 @@ def main() -> None:
|
|||
|
||||
print("▶ Migrating schedules...")
|
||||
for schedule in schedules:
|
||||
if not schedule["unmatched_users"]:
|
||||
migrate_schedule(schedule)
|
||||
if not schedule["unmatched_users"] and not schedule["migration_errors"]:
|
||||
migrate_schedule(schedule, user_id_map)
|
||||
print(TAB + format_schedule(schedule))
|
||||
|
||||
print("▶ Migrating escalation policies...")
|
||||
|
|
|
|||
|
|
@ -35,3 +35,9 @@ PAGERDUTY_TO_ONCALL_VENDOR_MAP = {
|
|||
"Elastic Alerts": "elastalert",
|
||||
"Firebase": "fabric",
|
||||
}
|
||||
|
||||
SCHEDULE_MIGRATION_MODE_ICAL = "ical"
|
||||
SCHEDULE_MIGRATION_MODE_WEB = "web"
|
||||
SCHEDULE_MIGRATION_MODE = os.getenv(
|
||||
"SCHEDULE_MIGRATION_MODE", SCHEDULE_MIGRATION_MODE_ICAL
|
||||
)
|
||||
|
|
|
|||
|
|
@ -17,10 +17,18 @@ def format_user(user: dict) -> str:
|
|||
|
||||
|
||||
def format_schedule(schedule: dict) -> str:
|
||||
if schedule["unmatched_users"]:
|
||||
if schedule["unmatched_users"] and schedule["migration_errors"]:
|
||||
result = "{} {} — schedule references unmatched users and some layers cannot be migrated".format(
|
||||
ERROR_SIGN, schedule["name"]
|
||||
)
|
||||
elif schedule["unmatched_users"]:
|
||||
result = "{} {} — schedule references unmatched users".format(
|
||||
ERROR_SIGN, schedule["name"]
|
||||
)
|
||||
elif schedule["migration_errors"]:
|
||||
result = "{} {} — some layers cannot be migrated".format(
|
||||
ERROR_SIGN, schedule["name"]
|
||||
)
|
||||
else:
|
||||
result = "{} {}".format(SUCCESS_SIGN, schedule["name"])
|
||||
|
||||
|
|
@ -29,7 +37,7 @@ def format_schedule(schedule: dict) -> str:
|
|||
|
||||
def format_escalation_policy(policy: dict) -> str:
|
||||
if policy["unmatched_users"] and policy["flawed_schedules"]:
|
||||
result = "{} {} — policy references unmatched users and schedules with unmatched users".format(
|
||||
result = "{} {} — policy references unmatched users and schedules that cannot be migrated".format(
|
||||
ERROR_SIGN, policy["name"]
|
||||
)
|
||||
elif policy["unmatched_users"]:
|
||||
|
|
@ -37,7 +45,7 @@ def format_escalation_policy(policy: dict) -> str:
|
|||
ERROR_SIGN, policy["name"]
|
||||
)
|
||||
elif policy["flawed_schedules"]:
|
||||
result = "{} {} — policy references schedules with unmatched users".format(
|
||||
result = "{} {} — policy references schedules that cannot be migrated".format(
|
||||
ERROR_SIGN, policy["name"]
|
||||
)
|
||||
else:
|
||||
|
|
@ -61,7 +69,7 @@ def format_integration(integration: dict) -> str:
|
|||
|
||||
elif integration["is_escalation_policy_flawed"]:
|
||||
policy_name = integration["service"]["escalation_policy"]["summary"]
|
||||
result = "{} {} — escalation policy '{}' references unmatched users or schedules with unmatched users".format(
|
||||
result = "{} {} — escalation policy '{}' references unmatched users or schedules that cannot be migrated".format(
|
||||
ERROR_SIGN, result, policy_name
|
||||
)
|
||||
else:
|
||||
|
|
@ -85,10 +93,16 @@ def user_report(users: list[dict]) -> str:
|
|||
def schedule_report(schedules: list[dict]) -> str:
|
||||
result = "Schedule report:"
|
||||
|
||||
for schedule in sorted(schedules, key=lambda s: bool(s["unmatched_users"])):
|
||||
for schedule in sorted(
|
||||
schedules, key=lambda s: bool(s["unmatched_users"] or s["migration_errors"])
|
||||
):
|
||||
result += "\n" + TAB + format_schedule(schedule)
|
||||
|
||||
if not schedule["unmatched_users"] and schedule["oncall_schedule"]:
|
||||
if (
|
||||
not schedule["unmatched_users"]
|
||||
and schedule["oncall_schedule"]
|
||||
and not schedule["migration_errors"]
|
||||
):
|
||||
result += " (existing schedule with name '{}' will be deleted)".format(
|
||||
schedule["oncall_schedule"]["name"]
|
||||
)
|
||||
|
|
@ -96,6 +110,9 @@ def schedule_report(schedules: list[dict]) -> str:
|
|||
for user in schedule["unmatched_users"]:
|
||||
result += "\n" + TAB * 2 + format_user(user)
|
||||
|
||||
for error in schedule["migration_errors"]:
|
||||
result += "\n" + TAB * 2 + "{} {}".format(ERROR_SIGN, error)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -1,27 +1,563 @@
|
|||
import datetime
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
from uuid import uuid4
|
||||
|
||||
from migrator import oncall_api_client
|
||||
from migrator.config import (
|
||||
SCHEDULE_MIGRATION_MODE,
|
||||
SCHEDULE_MIGRATION_MODE_ICAL,
|
||||
SCHEDULE_MIGRATION_MODE_WEB,
|
||||
)
|
||||
|
||||
|
||||
def match_schedule(schedule: dict, oncall_schedules: list[dict]) -> None:
|
||||
def match_schedule(
|
||||
schedule: dict, oncall_schedules: list[dict], user_id_map: dict[str, str]
|
||||
) -> None:
|
||||
oncall_schedule = None
|
||||
for candidate in oncall_schedules:
|
||||
if schedule["name"].lower().strip() == candidate["name"].lower().strip():
|
||||
oncall_schedule = candidate
|
||||
|
||||
schedule["migration_errors"] = []
|
||||
if SCHEDULE_MIGRATION_MODE == SCHEDULE_MIGRATION_MODE_WEB:
|
||||
_, errors = Schedule.from_dict(schedule).to_oncall_schedule(user_id_map)
|
||||
schedule["migration_errors"] = errors
|
||||
|
||||
schedule["oncall_schedule"] = oncall_schedule
|
||||
|
||||
|
||||
def migrate_schedule(schedule: dict) -> None:
|
||||
def migrate_schedule(schedule: dict, user_id_map: dict[str, str]) -> None:
|
||||
if schedule["oncall_schedule"]:
|
||||
oncall_api_client.delete(
|
||||
"schedules/{}".format(schedule["oncall_schedule"]["id"])
|
||||
)
|
||||
|
||||
payload = {
|
||||
"name": schedule["name"],
|
||||
"type": "ical",
|
||||
"ical_url_primary": schedule["http_cal_url"],
|
||||
"team_id": None,
|
||||
}
|
||||
oncall_schedule = oncall_api_client.create("schedules", payload)
|
||||
if SCHEDULE_MIGRATION_MODE == SCHEDULE_MIGRATION_MODE_WEB:
|
||||
# Migrate shifts
|
||||
oncall_schedule = Schedule.from_dict(schedule).migrate(user_id_map)
|
||||
elif SCHEDULE_MIGRATION_MODE == SCHEDULE_MIGRATION_MODE_ICAL:
|
||||
# Migrate using ICal URL
|
||||
payload = {
|
||||
"name": schedule["name"],
|
||||
"type": "ical",
|
||||
"ical_url_primary": schedule["http_cal_url"],
|
||||
"team_id": None,
|
||||
}
|
||||
oncall_schedule = oncall_api_client.create("schedules", payload)
|
||||
else:
|
||||
raise ValueError("Invalid schedule migration mode")
|
||||
|
||||
schedule["oncall_schedule"] = oncall_schedule
|
||||
|
||||
|
||||
def duration_to_frequency_and_interval(duration: datetime.timedelta) -> tuple[str, int]:
|
||||
"""
|
||||
Convert a duration to shift frequency and interval.
|
||||
For example, 1 day duration returns ("daily", 1), 14 days returns ("weekly", 2),
|
||||
"""
|
||||
seconds = int(duration.total_seconds())
|
||||
|
||||
assert seconds >= 3600, "Rotation must be at least 1 hour"
|
||||
hours = seconds // 3600
|
||||
|
||||
if hours >= 24 and hours % 24 == 0:
|
||||
days = hours // 24
|
||||
if days >= 7 and days % 7 == 0:
|
||||
weeks = days // 7
|
||||
return "weekly", weeks
|
||||
else:
|
||||
return "daily", days
|
||||
else:
|
||||
return "hourly", hours
|
||||
|
||||
|
||||
def _pd_datetime_to_dt(text: str) -> datetime.datetime:
|
||||
"""
|
||||
Convert a PagerDuty datetime string to a datetime object.
|
||||
"""
|
||||
return datetime.datetime.fromisoformat(text)
|
||||
|
||||
|
||||
def _dt_to_oncall_datetime(dt: datetime.datetime) -> str:
|
||||
"""
|
||||
Convert a datetime object to an OnCall datetime string.
|
||||
"""
|
||||
return dt.strftime("%Y-%m-%dT%H:%M:%S")
|
||||
|
||||
|
||||
@dataclass
|
||||
class Schedule:
|
||||
"""
|
||||
Utility class for converting a PagerDuty schedule to an OnCall schedule.
|
||||
A PagerDuty schedule has multiple layers, each with a rotation of users.
|
||||
"""
|
||||
|
||||
name: str
|
||||
time_zone: str
|
||||
layers: list["Layer"]
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, schedule: dict) -> "Schedule":
|
||||
"""
|
||||
Create a Schedule object from a PagerDuty API response for a schedule.
|
||||
"""
|
||||
|
||||
layers = []
|
||||
# PagerDuty API returns layers in reverse order (e.g. Layer 3, Layer 2, Layer 1)
|
||||
for level, layer_dict in enumerate(reversed(schedule["schedule_layers"])):
|
||||
layer = Layer.from_dict(layer_dict, level)
|
||||
|
||||
# skip any layers that have already ended
|
||||
if layer.end and layer.end < datetime.datetime.now(datetime.timezone.utc):
|
||||
continue
|
||||
|
||||
layers.append(layer)
|
||||
|
||||
return cls(
|
||||
name=schedule["name"],
|
||||
time_zone=schedule["time_zone"],
|
||||
layers=layers,
|
||||
)
|
||||
|
||||
def to_oncall_schedule(
|
||||
self, user_id_map: dict[str, str]
|
||||
) -> tuple[Optional[dict], list[str]]:
|
||||
"""
|
||||
Convert a Schedule object to an OnCall schedule.
|
||||
Note that it also returns shifts, but these are not created at the same time as the schedule (see migrate method for more info).
|
||||
"""
|
||||
|
||||
shifts = []
|
||||
errors = []
|
||||
for layer in self.layers:
|
||||
# A single PagerDuty layer can result in multiple OnCall shifts
|
||||
layer_shifts, error = layer.to_oncall_shifts(user_id_map)
|
||||
|
||||
if layer_shifts:
|
||||
shifts += layer_shifts
|
||||
|
||||
if error:
|
||||
error_text = f"{layer.name}: {error}"
|
||||
|
||||
# If a layer has a single user, it's likely can be easily tweaked in PD to make it possible to migrate
|
||||
if len(set(layer.user_ids)) == 1:
|
||||
error_text += " Layer has a single user, consider simplifying the rotation in PD."
|
||||
|
||||
errors.append(error_text)
|
||||
|
||||
if errors:
|
||||
return None, errors
|
||||
|
||||
return {
|
||||
"name": self.name,
|
||||
"type": "web",
|
||||
"team_id": None,
|
||||
"time_zone": self.time_zone,
|
||||
"shifts": shifts,
|
||||
}, []
|
||||
|
||||
def migrate(self, user_id_map: dict[str, str]) -> dict:
|
||||
"""
|
||||
Create an OnCall schedule and its shifts.
|
||||
First create the shifts, then create a schedule with shift IDs provided.
|
||||
"""
|
||||
|
||||
schedule, errors = self.to_oncall_schedule(user_id_map)
|
||||
assert not errors, "Unexpected errors: {}".format(errors)
|
||||
|
||||
# Create shifts in OnCall
|
||||
shift_ids = []
|
||||
for shift in schedule["shifts"]:
|
||||
created_shift = oncall_api_client.create("on_call_shifts", shift)
|
||||
shift_ids.append(created_shift["id"])
|
||||
|
||||
# Create schedule in OnCall with shift IDs provided
|
||||
schedule["shifts"] = shift_ids
|
||||
new_schedule = oncall_api_client.create("schedules", schedule)
|
||||
|
||||
return new_schedule
|
||||
|
||||
|
||||
@dataclass
|
||||
class Layer:
|
||||
"""
|
||||
Utility class for converting a PagerDuty schedule layer to OnCall shifts.
|
||||
"""
|
||||
|
||||
name: str
|
||||
level: int
|
||||
|
||||
rotation_virtual_start: datetime.datetime
|
||||
rotation_turn_length: datetime.timedelta
|
||||
|
||||
start: datetime.datetime
|
||||
end: Optional[datetime.datetime]
|
||||
|
||||
user_ids: list[str]
|
||||
restrictions: list["Restriction"]
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, layer: dict, level: int) -> "Layer":
|
||||
"""
|
||||
Create a Layer object from a PagerDuty API response for a schedule layer.
|
||||
Converts PagerDuty datetime strings to datetime objects for easier manipulation.
|
||||
"""
|
||||
return cls(
|
||||
name=layer["name"],
|
||||
level=level,
|
||||
rotation_virtual_start=_pd_datetime_to_dt(layer["rotation_virtual_start"]),
|
||||
rotation_turn_length=datetime.timedelta(
|
||||
seconds=layer["rotation_turn_length_seconds"]
|
||||
),
|
||||
start=_pd_datetime_to_dt(layer["start"]),
|
||||
end=_pd_datetime_to_dt(layer["end"]) if layer["end"] else None,
|
||||
user_ids=[u["user"]["id"] for u in layer["users"]],
|
||||
restrictions=[Restriction.from_dict(r) for r in layer["restrictions"]],
|
||||
)
|
||||
|
||||
def to_oncall_shifts(
|
||||
self, user_id_map: dict[str, str]
|
||||
) -> tuple[Optional[list[dict]], Optional[str]]:
|
||||
frequency, interval = duration_to_frequency_and_interval(
|
||||
self.rotation_turn_length
|
||||
)
|
||||
rolling_users = []
|
||||
for user_id in self.user_ids:
|
||||
oncall_user_id = user_id_map[user_id]
|
||||
rolling_users.append([oncall_user_id])
|
||||
|
||||
if not self.restrictions:
|
||||
return [
|
||||
{
|
||||
"name": uuid4().hex,
|
||||
"level": self.level,
|
||||
"type": "rolling_users",
|
||||
"rotation_start": _dt_to_oncall_datetime(self.start),
|
||||
"until": _dt_to_oncall_datetime(self.end) if self.end else None,
|
||||
"start": _dt_to_oncall_datetime(self.rotation_virtual_start),
|
||||
"duration": int(self.rotation_turn_length.total_seconds()),
|
||||
"frequency": frequency,
|
||||
"interval": interval,
|
||||
"rolling_users": rolling_users,
|
||||
"start_rotation_from_user_index": 0,
|
||||
"week_start": "MO",
|
||||
"time_zone": self.rotation_virtual_start.tzname(),
|
||||
"source": 0, # 0 is alias for "web"
|
||||
}
|
||||
], None
|
||||
|
||||
restrictions_type = self.restrictions[0].type
|
||||
|
||||
if (frequency, restrictions_type) in (
|
||||
("daily", Restriction.Type.DAILY),
|
||||
("weekly", Restriction.Type.WEEKLY),
|
||||
):
|
||||
# TODO: some of this can use by_day?
|
||||
shifts, _ = self._generate_shifts(unique=False)
|
||||
shifts = [(s[0], s[1], "MO", None) for s in shifts]
|
||||
|
||||
elif frequency == "weekly" and restrictions_type == Restriction.Type.DAILY:
|
||||
shifts, is_split = self._generate_shifts(unique=True)
|
||||
|
||||
if is_split:
|
||||
return (
|
||||
None,
|
||||
f"Cannot migrate {interval}-weekly rotation with daily restrictions that are split by handoff.",
|
||||
)
|
||||
|
||||
# repeat ["MO", "TU", "WE", "TH", "FR", "SA", "SU"] shift for the number of weeks in the rotation
|
||||
shifts_for_multiple_weeks = []
|
||||
for shift in shifts:
|
||||
for week in range(interval):
|
||||
start = shift[0] + datetime.timedelta(weeks=week)
|
||||
end = shift[1] + datetime.timedelta(weeks=week)
|
||||
shifts_for_multiple_weeks.append((start, end))
|
||||
|
||||
shifts = [
|
||||
(
|
||||
shift[0],
|
||||
shift[1],
|
||||
["MO", "TU", "WE", "TH", "FR", "SA", "SU"][
|
||||
shift[0].date().weekday()
|
||||
],
|
||||
["MO", "TU", "WE", "TH", "FR", "SA", "SU"],
|
||||
)
|
||||
for shift in shifts_for_multiple_weeks
|
||||
]
|
||||
|
||||
elif (
|
||||
frequency == "daily"
|
||||
and restrictions_type == Restriction.Type.WEEKLY
|
||||
and interval == 1
|
||||
):
|
||||
# the only case when it's possible to migrate a daily rotation with weekly restrictions
|
||||
# is when the restrictions start at the same time as the shift start
|
||||
# and the restrictions are a multiple of 24 hours
|
||||
restrictions = Restriction.merge_restrictions(self.restrictions)
|
||||
for restriction in restrictions:
|
||||
if (
|
||||
not restriction.start_time_of_day
|
||||
== self.rotation_virtual_start.time()
|
||||
):
|
||||
return (
|
||||
None,
|
||||
f"Cannot migrate {interval}-daily rotation with weekly restrictions that start at a different time than the shift start.",
|
||||
)
|
||||
if not restriction.duration % datetime.timedelta(
|
||||
days=1
|
||||
) == datetime.timedelta(0):
|
||||
return (
|
||||
None,
|
||||
f"Cannot migrate {interval}-daily rotation with weekly restrictions that have durations that are not a multiple of a 24 hours.",
|
||||
)
|
||||
|
||||
# get the first restriction and use its start time as the start of the shift
|
||||
restriction, shift_start = Restriction.current_or_next_restriction(
|
||||
restrictions, self.rotation_virtual_start
|
||||
)
|
||||
shift_end = shift_start + datetime.timedelta(days=1)
|
||||
|
||||
# determine which days of the week are covered
|
||||
by_day = set()
|
||||
for restriction in restrictions:
|
||||
days = restriction.duration // datetime.timedelta(days=1)
|
||||
|
||||
for day in range(days):
|
||||
weekday = ["MO", "TU", "WE", "TH", "FR", "SA", "SU"][
|
||||
(restriction.start_day_of_week + day) % 7
|
||||
]
|
||||
by_day.add(weekday)
|
||||
|
||||
# sort by_day so that the order is always the same
|
||||
by_day = sorted(
|
||||
list(by_day),
|
||||
key=lambda d: ["MO", "TU", "WE", "TH", "FR", "SA", "SU"].index(d),
|
||||
)
|
||||
|
||||
shifts = [(shift_start, shift_end, "MO", by_day)]
|
||||
|
||||
else:
|
||||
restrictions_type_verbal = (
|
||||
"daily" if restrictions_type == Restriction.Type.DAILY else "weekly"
|
||||
)
|
||||
return (
|
||||
None,
|
||||
f"Cannot migrate {interval}-{frequency} rotation with {restrictions_type_verbal} restrictions.",
|
||||
)
|
||||
|
||||
payloads = []
|
||||
for shift in shifts:
|
||||
payload = {
|
||||
"name": uuid4().hex,
|
||||
"level": self.level,
|
||||
"type": "rolling_users",
|
||||
"rotation_start": _dt_to_oncall_datetime(self.start),
|
||||
"until": _dt_to_oncall_datetime(self.end) if self.end else None,
|
||||
"start": _dt_to_oncall_datetime(shift[0]),
|
||||
"duration": int((shift[1] - shift[0]).total_seconds()),
|
||||
"frequency": frequency,
|
||||
"interval": interval,
|
||||
"by_day": shift[3],
|
||||
"rolling_users": rolling_users,
|
||||
"start_rotation_from_user_index": 0,
|
||||
"week_start": shift[2],
|
||||
"time_zone": self.rotation_virtual_start.tzname(),
|
||||
"source": 0, # 0 is alias for "web"
|
||||
}
|
||||
payloads.append(payload)
|
||||
return payloads, None
|
||||
|
||||
def _generate_shifts(
|
||||
self, unique: bool = True
|
||||
) -> tuple[list[tuple[datetime.datetime, datetime.datetime]], bool]:
|
||||
"""
|
||||
Returns a list of (start, end) tuples representing the shifts in this layer.
|
||||
Note that these are not the actual shifts for OnCall API but rather a list of
|
||||
unique intervals generated by traversing the restrictions.
|
||||
|
||||
Also returns a boolean indicating whether there are restrictions split by on-call handoff.
|
||||
"""
|
||||
|
||||
start = self.rotation_virtual_start
|
||||
end = self.rotation_virtual_start + self.rotation_turn_length
|
||||
|
||||
# Convert restrictions to weekly restrictions, then merge overlapping ones
|
||||
restrictions = []
|
||||
for restriction in self.restrictions:
|
||||
restrictions += restriction.to_weekly_restrictions()
|
||||
restrictions = Restriction.merge_restrictions(restrictions)
|
||||
|
||||
is_split = False
|
||||
current = start
|
||||
shifts = []
|
||||
unique_shift_times = []
|
||||
|
||||
while current < end:
|
||||
restriction, restriction_start = Restriction.current_or_next_restriction(
|
||||
restrictions, current
|
||||
)
|
||||
restriction_end = restriction_start + restriction.duration
|
||||
|
||||
shift_start = max(current, restriction_start)
|
||||
shift_end = min(restriction_end, end)
|
||||
|
||||
# If the next restriction starts after the end of the rotation or shift is empty, we're done
|
||||
if restriction_start > end or shift_start == shift_end:
|
||||
break
|
||||
|
||||
# Check if restriction is split by handoff
|
||||
if (shift_start == start and shift_start > restriction_start) or (
|
||||
shift_end == end and shift_end < restriction_end
|
||||
):
|
||||
is_split = True
|
||||
|
||||
shift = (shift_start, shift_end)
|
||||
|
||||
# check that we haven't already added this shift
|
||||
if (
|
||||
not unique
|
||||
or (shift[0].time(), shift[1].time()) not in unique_shift_times
|
||||
):
|
||||
shifts.append(shift)
|
||||
unique_shift_times.append((shift[0].time(), shift[1].time()))
|
||||
|
||||
current = shift_end
|
||||
|
||||
return shifts, is_split
|
||||
|
||||
|
||||
@dataclass
|
||||
class Restriction:
|
||||
"""
|
||||
Utility class for representing a restriction on a rotation in PagerDuty.
|
||||
"""
|
||||
|
||||
class Type(Enum):
|
||||
DAILY = "daily_restriction"
|
||||
WEEKLY = "weekly_restriction"
|
||||
|
||||
type: Type
|
||||
start_time_of_day: datetime.time
|
||||
duration: datetime.timedelta
|
||||
start_day_of_week: Optional[int] # this is only present for weekly restrictions
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, restriction: dict) -> "Restriction":
|
||||
"""
|
||||
Create a Restriction object from PagerDuty's API representation.
|
||||
Converts PagerDuty datetime strings to datetime objects for easier manipulation as well.
|
||||
"""
|
||||
|
||||
# PagerDuty's API uses 1-indexed days of the week, converting to 0-indexed for ease of use
|
||||
start_day_of_week = restriction.get("start_day_of_week")
|
||||
if start_day_of_week is not None:
|
||||
start_day_of_week -= 1
|
||||
|
||||
return cls(
|
||||
type=Restriction.Type(restriction["type"]),
|
||||
start_time_of_day=datetime.time.fromisoformat(
|
||||
restriction["start_time_of_day"]
|
||||
),
|
||||
duration=datetime.timedelta(seconds=restriction["duration_seconds"]),
|
||||
start_day_of_week=start_day_of_week,
|
||||
)
|
||||
|
||||
def to_weekly_restrictions(self) -> list["Restriction"]:
|
||||
"""
|
||||
Convert a daily restriction to a list of weekly restrictions.
|
||||
Daily restriction is basically 7 weekly restrictions with the same start time and duration,
|
||||
but different days of the week (e.g. 9am-5pm daily restriction is the same as 9am-5pm on Monday, 9am-5pm on Tuesday, etc.)
|
||||
|
||||
Converting to weekly restrictions makes it easier to work with restrictions and only care about weekly ones.
|
||||
"""
|
||||
|
||||
if self.type == Restriction.Type.WEEKLY:
|
||||
return [self]
|
||||
else:
|
||||
return [
|
||||
Restriction(
|
||||
type=Restriction.Type.WEEKLY,
|
||||
start_time_of_day=self.start_time_of_day,
|
||||
duration=self.duration,
|
||||
start_day_of_week=day,
|
||||
)
|
||||
for day in range(7)
|
||||
]
|
||||
|
||||
@staticmethod
|
||||
def merge_restrictions(restrictions: list["Restriction"]) -> list["Restriction"]:
|
||||
"""
|
||||
Merge a list of weekly restrictions into a list of the fewest possible weekly restrictions that cover the same time period.
|
||||
Example: (9am - 5pm restriction on Monday, 10am - 4pm restriction on Monday) -> (9am - 5pm restriction on Monday).
|
||||
|
||||
Only works on weekly restrictions, as daily restrictions are converted to weekly restrictions first.
|
||||
"""
|
||||
|
||||
assert all(r.type == Restriction.Type.WEEKLY for r in restrictions)
|
||||
|
||||
restrictions = sorted(
|
||||
restrictions, key=lambda r: (r.start_day_of_week, r.start_time_of_day)
|
||||
)
|
||||
merged = []
|
||||
|
||||
for restriction in restrictions:
|
||||
restriction_start = datetime.datetime.combine(
|
||||
datetime.date.min
|
||||
+ datetime.timedelta(days=restriction.start_day_of_week),
|
||||
restriction.start_time_of_day,
|
||||
)
|
||||
restriction_end = restriction_start + restriction.duration
|
||||
|
||||
if not merged:
|
||||
merged.append(restriction)
|
||||
continue
|
||||
|
||||
last = merged[-1]
|
||||
last_start = datetime.datetime.combine(
|
||||
datetime.date.min + datetime.timedelta(days=last.start_day_of_week),
|
||||
last.start_time_of_day,
|
||||
)
|
||||
last_end = last_start + last.duration
|
||||
|
||||
if last_end < restriction_start:
|
||||
merged.append(restriction)
|
||||
else:
|
||||
restriction.start_day_of_week = last_start.weekday()
|
||||
restriction.start_time_of_day = last_start.time()
|
||||
restriction.duration = max(last_end, restriction_end) - last_start
|
||||
merged = merged[:-1] + [restriction]
|
||||
|
||||
return merged
|
||||
|
||||
@staticmethod
|
||||
def current_or_next_restriction(
|
||||
restrictions: list["Restriction"], dt: datetime.datetime
|
||||
) -> tuple["Restriction", datetime.datetime]:
|
||||
"""
|
||||
Get the current or next restriction for a given datetime.
|
||||
This is useful for finding all the restrictions that apply to a given rotation shift.
|
||||
"""
|
||||
assert all(r.type == Restriction.Type.WEEKLY for r in restrictions)
|
||||
|
||||
for weeks in (-1, 0, 1): # check last week, this week, and next week
|
||||
for restriction in restrictions:
|
||||
restriction_date = (
|
||||
dt.date()
|
||||
- datetime.timedelta(days=dt.weekday())
|
||||
+ datetime.timedelta(days=restriction.start_day_of_week)
|
||||
+ datetime.timedelta(weeks=weeks)
|
||||
)
|
||||
restriction_start = datetime.datetime.combine(
|
||||
restriction_date,
|
||||
restriction.start_time_of_day,
|
||||
tzinfo=datetime.timezone.utc,
|
||||
)
|
||||
restriction_end = restriction_start + restriction.duration
|
||||
|
||||
if restriction_end > dt:
|
||||
return restriction, restriction_start
|
||||
|
||||
# there should always be a restriction
|
||||
raise ValueError("No restriction found for given datetime")
|
||||
|
|
|
|||
|
|
@ -53,7 +53,7 @@ def match_users_and_schedules_for_escalation_policy(
|
|||
if not schedule:
|
||||
continue
|
||||
|
||||
if schedule["unmatched_users"]:
|
||||
if schedule["unmatched_users"] or schedule["migration_errors"]:
|
||||
flawed_schedule_ids.add(target_id)
|
||||
|
||||
policy["unmatched_users"] = [
|
||||
|
|
|
|||
|
|
@ -785,6 +785,7 @@ expected_schedules_result = [
|
|||
"on_call_now": [],
|
||||
"slack": None,
|
||||
},
|
||||
"migration_errors": [],
|
||||
"unmatched_users": [
|
||||
{
|
||||
"id": "TESTUSER2",
|
||||
|
|
@ -842,6 +843,7 @@ expected_schedules_result = [
|
|||
"teams": [],
|
||||
"oncall_schedule": None,
|
||||
"unmatched_users": [],
|
||||
"migration_errors": [],
|
||||
},
|
||||
{
|
||||
"id": "TESTSCH3",
|
||||
|
|
@ -866,6 +868,7 @@ expected_schedules_result = [
|
|||
"escalation_policies": [],
|
||||
"teams": [],
|
||||
"oncall_schedule": None,
|
||||
"migration_errors": [],
|
||||
"unmatched_users": [
|
||||
{
|
||||
"id": "TESTUSER2",
|
||||
|
|
@ -924,6 +927,7 @@ expected_schedules_result = [
|
|||
"teams": [],
|
||||
"oncall_schedule": None,
|
||||
"unmatched_users": [],
|
||||
"migration_errors": [],
|
||||
},
|
||||
]
|
||||
expected_escalation_policies_result = [
|
||||
|
|
@ -1028,6 +1032,7 @@ expected_escalation_policies_result = [
|
|||
"on_call_now": [],
|
||||
"slack": None,
|
||||
},
|
||||
"migration_errors": [],
|
||||
"unmatched_users": [
|
||||
{
|
||||
"id": "TESTUSER2",
|
||||
|
|
@ -1570,7 +1575,7 @@ def test_match_user_not_found():
|
|||
|
||||
def test_match_schedule():
|
||||
for schedule in pd_schedules_payload:
|
||||
match_schedule(schedule, oncall_schedules_payload)
|
||||
match_schedule(schedule, oncall_schedules_payload, user_id_map={})
|
||||
match_users_for_schedule(schedule, pd_users_payload)
|
||||
|
||||
assert pd_schedules_payload == expected_schedules_result
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ def test_match_schedule_name_case_insensitive():
|
|||
pd_schedule = {"name": "Test"}
|
||||
oncall_schedules = [{"name": "test"}]
|
||||
|
||||
match_schedule(pd_schedule, oncall_schedules)
|
||||
match_schedule(pd_schedule, oncall_schedules, user_id_map={})
|
||||
assert pd_schedule["oncall_schedule"] == oncall_schedules[0]
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ def test_match_schedule_name_extra_spaces():
|
|||
pd_schedule = {"name": " test "}
|
||||
oncall_schedules = [{"name": "test"}]
|
||||
|
||||
match_schedule(pd_schedule, oncall_schedules)
|
||||
match_schedule(pd_schedule, oncall_schedules, user_id_map={})
|
||||
assert pd_schedule["oncall_schedule"] == oncall_schedules[0]
|
||||
|
||||
|
||||
|
|
|
|||
2125
tools/pagerduty-migrator/migrator/tests/test_schedules.py
Normal file
2125
tools/pagerduty-migrator/migrator/tests/test_schedules.py
Normal file
File diff suppressed because it is too large
Load diff
Loading…
Add table
Reference in a new issue