Merge pull request #75 from vadimkerr/pagerduty-migrator

Add PagerDuty migrator
This commit is contained in:
Ildar Iskhakov 2022-06-14 19:52:12 +03:00 committed by GitHub
commit 3a455e170b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 2501 additions and 0 deletions

View file

@ -0,0 +1,2 @@
[settings]
profile=black

View file

@ -0,0 +1,10 @@
FROM python:3.9-alpine
ENV PYTHONUNBUFFERED=1
WORKDIR /app
COPY requirements.txt requirements.txt
RUN python3 -m pip install -r requirements.txt
COPY . .
CMD ["python3", "-m" , "migrator"]

View file

@ -0,0 +1,87 @@
# PagerDuty to Grafana OnCall migrator tool
This tool helps to migrate PagerDuty configuration to Grafana OnCall.
Resources that can be migrated using this tool:
* User notification rules
* Escalation policies
* On-call schedules
* Integrations (services)
## Limitations
* Not all integration types are supported (e.g. inbound email is not supported)
* Not all notification methods are supported (e.g. emails are not supported)
* Migrated on-call schedules in Grafana OnCall will use ICalendar files from PagerDuty
* Delays between migrated notification/escalation rules could be slightly different from original. E.g. if you have a 4-minute delay between rules in PagerDuty, the resulting delay in Grafana OnCall will be 5 minutes
## Prerequisites
1. Make sure you have `docker` installed
2. Build the docker image: `docker build -t pd-oncall-migrator .`
3. Obtain a PagerDuty API token: https://support.pagerduty.com/docs/api-access-keys
4. Obtain a Grafana OnCall API token and API URL on the "Settings" page of your Grafana OnCall instance
## Migration plan
Before starting the migration process, it's useful to see a migration plan by running the tool in `plan` mode:
```shell
docker run --rm \
-e PAGERDUTY_API_TOKEN="<PAGERDUTY_API_TOKEN>" \
-e ONCALL_API_URL="<ONCALL_API_URL>" \
-e ONCALL_API_TOKEN="<ONCALL_API_TOKEN>" \
-e MODE="plan" \
pd-oncall-migrator
```
Please read the generated report carefully since depending on the content of the report, some PagerDuty resources could not be migrated and some existing Grafana OnCall resources could be deleted.
Note that users are matched by email, so if there are users in the report with "no Grafana OnCall user found with this email" error, it's possible to fix it by adding these users to your Grafana organization.
### Example migration plan
```text
User notification rules report:
✅ John Doe (john.doe@example.com) (existing notification rules will be deleted)
❌ Ben Thompson (ben@example.com) — no Grafana OnCall user found with this email
Schedule report:
✅ Support (existing schedule with name 'Support' will be deleted)
✅ Support-shadow
❌ DevOps — schedule references unmatched users
❌ Ben Thompson (ben@example.com) — no Grafana OnCall user found with this email
Escalation policy report:
✅ Support
❌ DevOps Escalation Policy — policy references unmatched users and schedules with unmatched users
❌ Ben Thompson (ben@example.com) — no Grafana OnCall user found with this email
❌ DevOps — schedule references unmatched users
Integration report:
✅ Support - Prometheus (existing integration with name 'Support - Prometheus' will be deleted)
❌ DevOps - Prometheus — escalation policy 'DevOps Escalation Policy' references unmatched users or schedules with unmatched users
❌ DevOps - Email — cannot find appropriate Grafana OnCall integration type
```
## Migration
Once you are happy with the migration report, start the migration by setting the `MODE` environment variable to `migrate`:
```shell
docker run --rm \
-e PAGERDUTY_API_TOKEN="<PAGERDUTY_API_TOKEN>" \
-e ONCALL_API_URL="<ONCALL_API_URL>" \
-e ONCALL_API_TOKEN="<ONCALL_API_TOKEN>" \
-e ONCALL_DEFAULT_CONTACT_METHOD="sms" \
-e MODE="migrate" \
pd-oncall-migrator
```
It's possible to specify a default contact method type for user notification rules that cannot be migrated as-is by changing the `ONCALL_DEFAULT_CONTACT_METHOD` env variable. Options are: `sms`, `phone_call`, `slack`, `telegram` (default is `sms`).
### After migration
* Connect integrations (press the "How to connect" button on the integration page)
* Make sure users connect their phone numbers, Slack accounts, etc. in their user settings
* At some point you would probably want to recreate schedules using Google Calendar or Terraform to be able to modify migrated on-call schedules in Grafana OnCall

View file

@ -0,0 +1,128 @@
from pdpyras import APISession
from migrator import oncall_api_client
from migrator.config import MODE, MODE_PLAN, PAGERDUTY_API_TOKEN
from migrator.report import (
TAB,
escalation_policy_report,
format_escalation_policy,
format_integration,
format_schedule,
format_user,
integration_report,
schedule_report,
user_report,
)
from migrator.resources.escalation_policies import (
match_escalation_policy,
match_escalation_policy_for_integration,
migrate_escalation_policy,
)
from migrator.resources.integrations import (
match_integration,
match_integration_type,
migrate_integration,
)
from migrator.resources.notification_rules import migrate_notification_rules
from migrator.resources.schedules import match_schedule, migrate_schedule
from migrator.resources.users import (
match_user,
match_users_and_schedules_for_escalation_policy,
match_users_for_schedule,
)
def main() -> None:
session = APISession(PAGERDUTY_API_TOKEN)
print("▶ Fetching users...")
users = session.list_all("users", params={"include[]": "notification_rules"})
oncall_users = oncall_api_client.list_all("users")
oncall_notification_rules = oncall_api_client.list_all(
"personal_notification_rules/?important=false"
)
for user in oncall_users:
user["notification_rules"] = [
rule for rule in oncall_notification_rules if rule["user_id"] == user["id"]
]
print("▶ Fetching schedules...")
schedules = session.list_all("schedules")
oncall_schedules = oncall_api_client.list_all("schedules")
print("▶ Fetching escalation policies...")
escalation_policies = session.list_all("escalation_policies")
oncall_escalation_chains = oncall_api_client.list_all("escalation_chains")
print("▶ Fetching integrations...")
services = session.list_all("services", params={"include[]": "integrations"})
vendors = session.list_all("vendors")
integrations = []
for service in services:
service_integrations = service.pop("integrations")
for integration in service_integrations:
integration["service"] = service
integrations.append(integration)
oncall_integrations = oncall_api_client.list_all("integrations")
for user in users:
match_user(user, oncall_users)
for schedule in schedules:
match_schedule(schedule, oncall_schedules)
match_users_for_schedule(schedule, users)
for policy in escalation_policies:
match_escalation_policy(policy, oncall_escalation_chains)
match_users_and_schedules_for_escalation_policy(policy, users, schedules)
for integration in integrations:
match_integration(integration, oncall_integrations)
match_integration_type(integration, vendors)
match_escalation_policy_for_integration(integration, escalation_policies)
if MODE == MODE_PLAN:
print()
print(user_report(users))
print()
print(schedule_report(schedules))
print()
print(escalation_policy_report(escalation_policies))
print()
print(integration_report(integrations))
return
print("▶ Migrating user notification rules...")
for user in users:
if user["oncall_user"]:
migrate_notification_rules(user)
print(TAB + format_user(user))
print("▶ Migrating schedules...")
for schedule in schedules:
if not schedule["unmatched_users"]:
migrate_schedule(schedule)
print(TAB + format_schedule(schedule))
print("▶ Migrating escalation policies...")
for policy in escalation_policies:
if not policy["unmatched_users"] and not policy["flawed_schedules"]:
migrate_escalation_policy(policy, users, schedules)
print(TAB + format_escalation_policy(policy))
print("▶ Migrating integrations...")
for integration in integrations:
if (
integration["oncall_type"]
and not integration["is_escalation_policy_flawed"]
):
migrate_integration(integration, escalation_policies)
print(TAB + format_integration(integration))
if __name__ == "__main__":
main()

View file

@ -0,0 +1,34 @@
import os
from urllib.parse import urljoin
MODE_PLAN = "plan"
MODE_MIGRATE = "migrate"
MODE = os.getenv("MODE", default=MODE_PLAN)
assert MODE in (MODE_PLAN, MODE_MIGRATE)
PAGERDUTY_API_TOKEN = os.environ["PAGERDUTY_API_TOKEN"]
ONCALL_API_TOKEN = os.environ["ONCALL_API_TOKEN"]
ONCALL_API_URL = urljoin(os.environ["ONCALL_API_URL"], "api/v1/")
ONCALL_DELAY_OPTIONS = [1, 5, 15, 30, 60]
ONCALL_DEFAULT_CONTACT_METHOD = "notify_by_" + os.getenv(
"ONCALL_DEFAULT_CONTACT_METHOD", default="sms"
)
PAGERDUTY_TO_ONCALL_CONTACT_METHOD_MAP = {
"sms_contact_method": "notify_by_sms",
"phone_contact_method": "notify_by_phone_call",
"email_contact_method": ONCALL_DEFAULT_CONTACT_METHOD,
"push_notification_contact_method": ONCALL_DEFAULT_CONTACT_METHOD,
}
PAGERDUTY_TO_ONCALL_VENDOR_MAP = {
"Datadog": "datadog",
"Pingdom": "pingdom",
"Prometheus": "alertmanager",
"PRTG": "prtg",
"Stackdriver": "stackdriver",
"UptimeRobot": "uptimerobot",
"New Relic": "newrelic",
"Zabbix Webhook (for 5.0 and 5.2)": "zabbix",
"Elastic Alerts": "elastalert",
"Firebase": "fabric",
}

View file

@ -0,0 +1,56 @@
from time import sleep
from urllib.parse import urljoin
import requests
from requests import HTTPError
from migrator.config import ONCALL_API_TOKEN, ONCALL_API_URL
def api_call(method: str, path: str, **kwargs) -> requests.Response:
url = urljoin(ONCALL_API_URL, path)
response = requests.request(
method, url, headers={"Authorization": ONCALL_API_TOKEN}, **kwargs
)
try:
response.raise_for_status()
except HTTPError as e:
if e.response.status_code == 429:
cooldown_seconds = int(e.response.headers["Retry-After"])
sleep(cooldown_seconds)
return api_call(method, path, **kwargs)
else:
raise
return response
def list_all(path: str) -> list[dict]:
response = api_call("get", path)
data = response.json()
results = data["results"]
while data["next"]:
response = api_call("get", data["next"])
data = response.json()
results += data["results"]
return results
def create(path: str, payload: dict) -> dict:
response = api_call("post", path, json=payload)
return response.json()
def delete(path: str) -> None:
api_call("delete", path)
def update(path: str, payload: dict) -> dict:
response = api_call("put", path, json=payload)
return response.json()

View file

@ -0,0 +1,146 @@
TAB = " " * 4
SUCCESS_SIGN = ""
ERROR_SIGN = ""
def format_user(user: dict) -> str:
result = "{} ({})".format(user["name"], user["email"])
if user["oncall_user"]:
result = "{} {}".format(SUCCESS_SIGN, result)
else:
result = "{} {} — no Grafana OnCall user found with this email".format(
ERROR_SIGN, result
)
return result
def format_schedule(schedule: dict) -> str:
if schedule["unmatched_users"]:
result = "{} {} — schedule references unmatched users".format(
ERROR_SIGN, schedule["name"]
)
else:
result = "{} {}".format(SUCCESS_SIGN, schedule["name"])
return result
def format_escalation_policy(policy: dict) -> str:
if policy["unmatched_users"] and policy["flawed_schedules"]:
result = "{} {} — policy references unmatched users and schedules with unmatched users".format(
ERROR_SIGN, policy["name"]
)
elif policy["unmatched_users"]:
result = "{} {} — policy references unmatched users".format(
ERROR_SIGN, policy["name"]
)
elif policy["flawed_schedules"]:
result = "{} {} — policy references schedules with unmatched users".format(
ERROR_SIGN, policy["name"]
)
else:
result = "{} {}".format(SUCCESS_SIGN, policy["name"])
return result
def format_integration(integration: dict) -> str:
result = integration["service"]["name"] + " - " + integration["name"]
if not integration["oncall_type"]:
result = (
"{} {} — cannot find appropriate Grafana OnCall integration type".format(
ERROR_SIGN, result
)
)
elif integration["is_escalation_policy_flawed"]:
policy_name = integration["service"]["escalation_policy"]["summary"]
result = "{} {} — escalation policy '{}' references unmatched users or schedules with unmatched users".format(
ERROR_SIGN, result, policy_name
)
else:
result = "{} {}".format(SUCCESS_SIGN, result)
return result
def user_report(users: list[dict]) -> str:
result = "User notification rules report:"
for user in sorted(users, key=lambda u: bool(u["oncall_user"]), reverse=True):
result += "\n" + TAB + format_user(user)
if user["oncall_user"] and user["notification_rules"]:
result += " (existing notification rules will be deleted)"
return result
def schedule_report(schedules: list[dict]) -> str:
result = "Schedule report:"
for schedule in sorted(schedules, key=lambda s: bool(s["unmatched_users"])):
result += "\n" + TAB + format_schedule(schedule)
if not schedule["unmatched_users"]:
result += " (existing schedule with name '{}' will be deleted)".format(
schedule["name"]
)
for user in schedule["unmatched_users"]:
result += "\n" + TAB * 2 + format_user(user)
return result
def escalation_policy_report(policies: list[dict]) -> str:
result = "Escalation policy report: "
for policy in sorted(
policies, key=lambda p: bool(p["unmatched_users"] or p["flawed_schedules"])
):
result += f"\n" + TAB + format_escalation_policy(policy)
for user in policy["unmatched_users"]:
result += f"\n" + TAB * 2 + format_user(user)
for schedule in policy["flawed_schedules"]:
result += f"\n" + TAB * 2 + format_schedule(schedule)
if (
not policy["unmatched_users"]
and not policy["flawed_schedules"]
and policy["oncall_escalation_chain"]
):
result += (
" (existing escalation chain with name '{}' will be deleted)".format(
policy["name"]
)
)
return result
def integration_report(integrations: list[dict]) -> str:
result = "Integration report:"
for integration in sorted(
integrations,
key=lambda i: bool(i["oncall_type"] and not i["is_escalation_policy_flawed"]),
reverse=True,
):
result += f"\n" + TAB + format_integration(integration)
if (
integration["oncall_type"]
and not integration["is_escalation_policy_flawed"]
and integration["oncall_integration"]
):
result += (
" (existing integration with name '{} - {}' will be deleted)".format(
integration["service"]["name"], integration["name"]
)
)
return result

View file

@ -0,0 +1,127 @@
from migrator import oncall_api_client
from migrator.utils import find_by_id, transform_wait_delay
def match_escalation_policy(policy: dict, oncall_escalation_chains: list[dict]) -> None:
oncall_escalation_chain = None
for candidate in oncall_escalation_chains:
if candidate["name"] == policy["name"]:
oncall_escalation_chain = candidate
policy["oncall_escalation_chain"] = oncall_escalation_chain
def match_escalation_policy_for_integration(
integration: dict, escalation_policies: list[dict]
) -> None:
policy_id = integration["service"]["escalation_policy"]["id"]
policy = find_by_id(escalation_policies, policy_id)
integration["is_escalation_policy_flawed"] = bool(
policy["unmatched_users"] or policy["flawed_schedules"]
)
def migrate_escalation_policy(
escalation_policy: dict, users: list[dict], schedules: list[dict]
) -> None:
name = escalation_policy["name"]
rules = escalation_policy["escalation_rules"]
num_loops = escalation_policy["num_loops"]
if escalation_policy["oncall_escalation_chain"]:
oncall_api_client.delete(
"escalation_chains/{}".format(
escalation_policy["oncall_escalation_chain"]["id"]
)
)
oncall_escalation_chain_payload = {"name": name, "team_id": None}
oncall_escalation_chain = oncall_api_client.create(
"escalation_chains", oncall_escalation_chain_payload
)
escalation_policy["oncall_escalation_chain"] = oncall_escalation_chain
oncall_escalation_policies = transform_rules(
rules, oncall_escalation_chain["id"], users, schedules, num_loops
)
for policy in oncall_escalation_policies:
oncall_api_client.create("escalation_policies", policy)
def transform_rules(
rules: list[dict],
escalation_chain_id: str,
users: list[dict],
schedules: list[dict],
num_loops: int,
) -> list[dict]:
"""
Transform PagerDuty escalation policy rules to Grafana OnCall escalation policies.
"""
escalation_policies = []
for rule in rules:
escalation_policies += transform_rule(
rule, escalation_chain_id, users, schedules
)
if num_loops > 0:
escalation_policies.append(
{"escalation_chain_id": escalation_chain_id, "type": "repeat_escalation"}
)
return escalation_policies
def transform_rule(
rule: dict, escalation_chain_id: str, users: list[dict], schedules: list[dict]
) -> list[dict]:
targets = rule["targets"]
delay = rule["escalation_delay_in_minutes"]
schedule_targets = [
target for target in targets if target["type"] == "schedule_reference"
]
user_targets = [target for target in targets if target["type"] == "user_reference"]
escalation_policies = []
for target in schedule_targets:
schedule = find_by_id(schedules, target["id"])
if schedule is None:
continue
oncall_schedule_id = schedule["oncall_schedule"]["id"]
escalation_policy = {
"escalation_chain_id": escalation_chain_id,
"type": "notify_on_call_from_schedule",
"notify_on_call_from_schedule": oncall_schedule_id,
}
escalation_policies.append(escalation_policy)
if user_targets:
rule_users = [find_by_id(users, target["id"]) for target in user_targets]
oncall_user_ids = [
user["oncall_user"]["id"]
for user in rule_users
if user and user["oncall_user"]
]
user_escalation_policy = {
"escalation_chain_id": escalation_chain_id,
"type": "notify_persons",
"persons_to_notify": oncall_user_ids,
}
escalation_policies.append(user_escalation_policy)
if delay > 0:
wait_escalation_policy = {
"escalation_chain_id": escalation_chain_id,
"type": "wait",
"duration": transform_wait_delay(delay),
}
escalation_policies.append(wait_escalation_policy)
return escalation_policies

View file

@ -0,0 +1,63 @@
from migrator import oncall_api_client
from migrator.config import PAGERDUTY_TO_ONCALL_VENDOR_MAP
from migrator.utils import find_by_id
def match_integration(integration: dict, oncall_integrations: list[dict]) -> None:
oncall_integration = None
for candidate in oncall_integrations:
if candidate["name"] == "{} - {}".format(
integration["service"]["name"], integration["name"]
):
oncall_integration = candidate
integration["oncall_integration"] = oncall_integration
def match_integration_type(integration: dict, vendors: list[dict]) -> None:
vendors_map = {vendor["id"]: vendor for vendor in vendors}
if integration["type"] not in [
"generic_events_api_inbound_integration",
"events_api_v2_inbound_integration",
]:
integration["oncall_type"] = None
return
vendor_id = integration["vendor"]["id"]
vendor_name = vendors_map[vendor_id]["name"]
integration["oncall_type"] = PAGERDUTY_TO_ONCALL_VENDOR_MAP.get(vendor_name)
def migrate_integration(integration: dict, escalation_policies: list[dict]) -> None:
escalation_policy = find_by_id(
escalation_policies, integration["service"]["escalation_policy"]["id"]
)
oncall_escalation_chain = escalation_policy["oncall_escalation_chain"]
if integration["oncall_integration"]:
oncall_api_client.delete(
"integrations/{}".format(integration["oncall_integration"]["id"])
)
oncall_name = "{} - {}".format(integration["service"]["name"], integration["name"])
create_integration(
oncall_name,
integration["oncall_type"],
oncall_escalation_chain["id"],
)
def create_integration(
name: str, integration_type: str, escalation_chain_id: str
) -> None:
payload = {"name": name, "type": integration_type, "team_id": None}
integration = oncall_api_client.create("integrations", payload)
default_route_id = integration["default_route_id"]
oncall_api_client.update(
f"routes/{default_route_id}", {"escalation_chain_id": escalation_chain_id}
)

View file

@ -0,0 +1,85 @@
import copy
from migrator import oncall_api_client
from migrator.config import PAGERDUTY_TO_ONCALL_CONTACT_METHOD_MAP
from migrator.utils import remove_duplicates, transform_wait_delay
def remove_duplicate_rules_between_waits(rules: list[dict]) -> list[dict]:
"""
Remove duplicate rules in chunks between wait rules.
E.g. "SMS - SMS - 1min - Phone call" becomes "SMS - 1min - Phone call"
"""
rules_copy = copy.deepcopy(rules)
for method in set(PAGERDUTY_TO_ONCALL_CONTACT_METHOD_MAP.values()):
rules_copy = remove_duplicates(
rules_copy,
split_condition=lambda rule: rule["type"] == "wait",
duplicate_condition=lambda rule: rule["type"] == method,
)
return rules_copy
def migrate_notification_rules(user: dict) -> None:
notification_rules = [
rule for rule in user["notification_rules"] if rule["urgency"] == "high"
]
oncall_rules = transform_notification_rules(
notification_rules, user["oncall_user"]["id"]
)
for rule in user["oncall_user"]["notification_rules"]:
oncall_api_client.delete("personal_notification_rules/{}".format(rule["id"]))
for rule in oncall_rules:
oncall_api_client.create("personal_notification_rules", rule)
def transform_notification_rules(
notification_rules: list[dict], user_id: str
) -> list[dict]:
"""
Transform PagerDuty user notification rules to Grafana OnCall personal notification rules.
"""
notification_rules = sorted(
notification_rules, key=lambda rule: rule["start_delay_in_minutes"]
)
oncall_notification_rules = []
for idx, rule in enumerate(notification_rules):
delay = rule["start_delay_in_minutes"]
if idx > 0:
previous_delay = notification_rules[idx - 1]["start_delay_in_minutes"]
delay -= previous_delay
oncall_notification_rules += transform_notification_rule(rule, delay, user_id)
oncall_notification_rules = remove_duplicate_rules_between_waits(
oncall_notification_rules
)
return oncall_notification_rules
def transform_notification_rule(
notification_rule: dict, delay: int, user_id: str
) -> list[dict]:
contact_method_type = notification_rule["contact_method"]["type"]
oncall_type = PAGERDUTY_TO_ONCALL_CONTACT_METHOD_MAP[contact_method_type]
notify_rule = {"user_id": user_id, "type": oncall_type, "important": False}
if not delay:
return [notify_rule]
wait_rule = {
"user_id": user_id,
"type": "wait",
"duration": transform_wait_delay(delay),
"important": "False",
}
return [wait_rule, notify_rule]

View file

@ -0,0 +1,27 @@
from migrator import oncall_api_client
def match_schedule(schedule: dict, oncall_schedules: list[dict]) -> None:
oncall_schedule = None
for candidate in oncall_schedules:
if schedule["name"] == candidate["name"]:
oncall_schedule = candidate
schedule["oncall_schedule"] = oncall_schedule
def migrate_schedule(schedule: dict) -> None:
if schedule["oncall_schedule"]:
oncall_api_client.delete(
"schedules/{}".format(schedule["oncall_schedule"]["id"])
)
payload = {
"name": schedule["name"],
"type": "ical",
"ical_url_primary": schedule["http_cal_url"],
"team_id": None,
}
oncall_schedule = oncall_api_client.create("schedules", payload)
schedule["oncall_schedule"] = oncall_schedule

View file

@ -0,0 +1,64 @@
from migrator.utils import find_by_id
def match_user(user: dict, oncall_users: list[dict]) -> None:
oncall_user = None
for candidate_user in oncall_users:
if user["email"] == candidate_user["email"]:
oncall_user = candidate_user
break
user["oncall_user"] = oncall_user
def match_users_for_schedule(schedule: dict, users: list[dict]) -> None:
unmatched_users = []
for user_reference in schedule["users"]:
user = find_by_id(users, user_reference["id"])
if not user:
continue
if not user["oncall_user"]:
unmatched_users.append(user)
schedule["unmatched_users"] = unmatched_users
def match_users_and_schedules_for_escalation_policy(
policy: dict, users: list[dict], schedules: list[dict]
) -> None:
unmatched_user_ids = set()
flawed_schedule_ids = set()
for rule in policy["escalation_rules"]:
targets = rule["targets"]
for target in targets:
target_id = target["id"]
if target["type"] == "user_reference":
user = find_by_id(users, target_id)
if not user:
continue
if not user["oncall_user"]:
unmatched_user_ids.add(target_id)
elif target["type"] == "schedule_reference":
schedule = find_by_id(schedules, target_id)
if not schedule:
continue
if schedule["unmatched_users"]:
flawed_schedule_ids.add(target_id)
policy["unmatched_users"] = [
find_by_id(users, user_id) for user_id in unmatched_user_ids
]
policy["flawed_schedules"] = [
find_by_id(schedules, schedule_id) for schedule_id in flawed_schedule_ids
]

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,65 @@
from typing import Callable, Optional, TypeVar
from migrator.config import ONCALL_DELAY_OPTIONS
T = TypeVar("T")
def find(
lst: list[T], cond: Callable[[T], bool], reverse: bool = False
) -> Optional[int]:
indices = range(len(lst))
if reverse:
indices = indices[::-1]
for idx in indices:
if cond(lst[idx]):
return idx
return None
def split(lst: list[T], cond: Callable[[T], bool]) -> list[list[T]]:
idx = find(lst, cond)
if idx is None:
return [lst]
return [lst[: idx + 1]] + split(lst[idx + 1 :], cond)
def remove_duplicates(
lst: list[T],
split_condition: Callable[[T], bool],
duplicate_condition: Callable[[T], bool],
) -> list[T]:
result = []
chunks = split(lst, split_condition)
for chunk in chunks:
count = len([element for element in chunk if duplicate_condition(element)])
if count > 1:
for _ in range(count - 1):
idx = find(chunk, duplicate_condition, reverse=True)
del chunk[idx]
result += chunk
return result
def find_by_id(resources: list[dict], resource_id: str) -> Optional[dict]:
for resource in resources:
if resource["id"] == resource_id:
return resource
return None
def find_closest_value(lst: list[int], value: int) -> int:
return min(lst, key=lambda v: abs(v - value))
def transform_wait_delay(delay: int) -> int:
return find_closest_value(ONCALL_DELAY_OPTIONS, delay) * 60

View file

@ -0,0 +1,5 @@
[pytest]
env =
D:PAGERDUTY_API_TOKEN=test
D:ONCALL_API_TOKEN=test
D:ONCALL_API_URL=test

View file

@ -0,0 +1,6 @@
requests==2.27.1
pdpyras==4.5.0
isort==5.10.1
black==22.3.0
pytest==7.1.2
pytest-env==0.6.2