diff --git a/tools/pagerduty-migrator/migrator/__main__.py b/tools/pagerduty-migrator/migrator/__main__.py index 4f544cb9..403f9232 100644 --- a/tools/pagerduty-migrator/migrator/__main__.py +++ b/tools/pagerduty-migrator/migrator/__main__.py @@ -3,15 +3,22 @@ import datetime from pdpyras import APISession from migrator import oncall_api_client -from migrator.config import MODE, MODE_PLAN, PAGERDUTY_API_TOKEN +from migrator.config import ( + EXPERIMENTAL_MIGRATE_EVENT_RULES, + MODE, + MODE_PLAN, + PAGERDUTY_API_TOKEN, +) from migrator.report import ( TAB, escalation_policy_report, format_escalation_policy, format_integration, + format_ruleset, format_schedule, format_user, integration_report, + ruleset_report, schedule_report, user_report, ) @@ -26,6 +33,7 @@ from migrator.resources.integrations import ( migrate_integration, ) from migrator.resources.notification_rules import migrate_notification_rules +from migrator.resources.rulesets import match_ruleset, migrate_ruleset from migrator.resources.schedules import match_schedule, migrate_schedule from migrator.resources.users import ( match_user, @@ -88,6 +96,14 @@ def main() -> None: oncall_integrations = oncall_api_client.list_all("integrations") + rulesets = None + if EXPERIMENTAL_MIGRATE_EVENT_RULES: + print("▶ Fetching event rules (rulesets) ...") + rulesets = session.list_all("rulesets") + for ruleset in rulesets: + rules = session.list_all(f"rulesets/{ruleset['id']}/rules") + ruleset["rules"] = rules + for user in users: match_user(user, oncall_users) @@ -108,15 +124,18 @@ def main() -> None: match_integration_type(integration, vendors) match_escalation_policy_for_integration(integration, escalation_policies) + if rulesets is not None: + for ruleset in rulesets: + match_ruleset(ruleset, oncall_integrations, escalation_policies, services) + if MODE == MODE_PLAN: - print() - print(user_report(users)) - print() - print(schedule_report(schedules)) - print() - print(escalation_policy_report(escalation_policies)) - print() - print(integration_report(integrations)) + print(user_report(users), end="\n\n") + print(schedule_report(schedules), end="\n\n") + print(escalation_policy_report(escalation_policies), end="\n\n") + print(integration_report(integrations), end="\n\n") + + if rulesets is not None: + print(ruleset_report(rulesets), end="\n\n") return @@ -147,6 +166,13 @@ def main() -> None: migrate_integration(integration, escalation_policies) print(TAB + format_integration(integration)) + if rulesets is not None: + print("▶ Migrating event rules (rulesets) ...") + for ruleset in rulesets: + if not ruleset["flawed_escalation_policies"]: + migrate_ruleset(ruleset, escalation_policies, services) + print(TAB + format_ruleset(ruleset)) + if __name__ == "__main__": main() diff --git a/tools/pagerduty-migrator/migrator/config.py b/tools/pagerduty-migrator/migrator/config.py index 35700c54..f2baeccb 100644 --- a/tools/pagerduty-migrator/migrator/config.py +++ b/tools/pagerduty-migrator/migrator/config.py @@ -41,3 +41,8 @@ SCHEDULE_MIGRATION_MODE_WEB = "web" SCHEDULE_MIGRATION_MODE = os.getenv( "SCHEDULE_MIGRATION_MODE", SCHEDULE_MIGRATION_MODE_ICAL ) + +# Experimental feature to migrate PD rulesets to OnCall integrations +EXPERIMENTAL_MIGRATE_EVENT_RULES = ( + os.getenv("EXPERIMENTAL_MIGRATE_EVENT_RULES", "false").lower() == "true" +) diff --git a/tools/pagerduty-migrator/migrator/report.py b/tools/pagerduty-migrator/migrator/report.py index ff975988..b9b6aeef 100644 --- a/tools/pagerduty-migrator/migrator/report.py +++ b/tools/pagerduty-migrator/migrator/report.py @@ -163,3 +163,36 @@ def integration_report(integrations: list[dict]) -> str: ) return result + + +def format_ruleset(ruleset: dict) -> str: + if ruleset["flawed_escalation_policies"]: + escalation_policy_names = [ + p["name"] for p in ruleset["flawed_escalation_policies"] + ] + result = "{} {} — escalation policies '{}' reference unmatched users or schedules that cannot be migrated".format( + ERROR_SIGN, ruleset["name"], ", ".join(escalation_policy_names) + ) + else: + result = "{} {}".format(SUCCESS_SIGN, ruleset["name"]) + + return result + + +def ruleset_report(rulesets: list[dict]) -> str: + result = "Event rules (rulesets) report:" + + for ruleset in sorted( + rulesets, + key=lambda r: bool(r["flawed_escalation_policies"]), + reverse=True, + ): + result += "\n" + TAB + format_ruleset(ruleset) + if not ruleset["flawed_escalation_policies"] and ruleset["oncall_integration"]: + result += ( + " (existing integration with name '{} Ruleset' will be deleted)".format( + ruleset["name"] + ) + ) + + return result diff --git a/tools/pagerduty-migrator/migrator/resources/rulesets.py b/tools/pagerduty-migrator/migrator/resources/rulesets.py new file mode 100644 index 00000000..0296c8b8 --- /dev/null +++ b/tools/pagerduty-migrator/migrator/resources/rulesets.py @@ -0,0 +1,160 @@ +from migrator import oncall_api_client +from migrator.utils import find_by_id + + +def match_ruleset( + ruleset: dict, + oncall_integrations: list[dict], + escalation_policies: list[dict], + services: list[dict], +) -> None: + # Find existing integration with the same name + oncall_integration = None + name = "{} Ruleset".format(ruleset["name"]).lower().strip() + for candidate in oncall_integrations: + if candidate["name"].lower().strip() == name: + oncall_integration = candidate + ruleset["oncall_integration"] = oncall_integration + + # Find services that use escalation policies that cannot be migrated + service_ids = [ + r["actions"]["route"]["value"] + for r in ruleset["rules"] + if not r["disabled"] and r["actions"]["route"] + ] + escalation_policy_ids = [ + find_by_id(services, service_id)["escalation_policy"]["id"] + for service_id in service_ids + ] + + flawed_escalation_policies = [] + for escalation_policy_id in escalation_policy_ids: + escalation_policy = find_by_id(escalation_policies, escalation_policy_id) + if bool( + escalation_policy["unmatched_users"] + or escalation_policy["flawed_schedules"] + ): + flawed_escalation_policies.append(escalation_policy) + + ruleset["flawed_escalation_policies"] = flawed_escalation_policies + + +def migrate_ruleset( + ruleset: dict, escalation_policies: list[dict], services: list[dict] +) -> None: + # Delete existing integration with the same name + if ruleset["oncall_integration"]: + oncall_api_client.delete( + "integrations/{}".format(ruleset["oncall_integration"]["id"]) + ) + + # Create new integration with type "webhook" + integration_payload = { + "name": "{} Ruleset".format(ruleset["name"]), + "type": "webhook", + "team_id": None, + } + integration = oncall_api_client.create("integrations", integration_payload) + + # Migrate rules that are not disabled and not catch-all + rules = [r for r in ruleset["rules"] if not r["disabled"] and not r["catch_all"]] + for rule in rules: + service_id = ( + rule["actions"]["route"]["value"] if rule["actions"]["route"] else None + ) + + escalation_chain_id = _pd_service_id_to_oncall_escalation_chain_id( + service_id, services, escalation_policies + ) + filtering_term = transform_condition_to_jinja(rule["conditions"]) + route_payload = { + "routing_type": "jinja2", + "routing_regex": filtering_term, + "position": rule["position"], + "integration_id": integration["id"], + "escalation_chain_id": escalation_chain_id, + } + oncall_api_client.create("routes", route_payload) + + # Migrate catch-all rule + catch_all_rule = [r for r in ruleset["rules"] if r["catch_all"]][0] + catch_all_service_id = ( + catch_all_rule["actions"]["route"]["value"] + if catch_all_rule["actions"]["route"] + else None + ) + catch_all_escalation_chain_id = _pd_service_id_to_oncall_escalation_chain_id( + catch_all_service_id, services, escalation_policies + ) + + if catch_all_escalation_chain_id: + # Get the default route and update it to use appropriate escalation chain + routes = oncall_api_client.list_all( + "routes/?integration_id={}".format(integration["id"]) + ) + default_route_id = routes[-1]["id"] + oncall_api_client.update( + f"routes/{default_route_id}", + {"escalation_chain_id": catch_all_escalation_chain_id}, + ) + + +def transform_condition_to_jinja(condition): + """ + Transform PD event rule condition to Jinja2 template + """ + + operator = condition["operator"] + assert operator in ("and", "or") + + # Insert "and" or "or" between subconditions + template = f" {operator} ".join( + [ + "(" + transform_subcondition_to_jinja(subcondition) + ")" + for subcondition in condition["subconditions"] + ] + ) + template = "{{ " + template + " }}" + return template + + +def transform_subcondition_to_jinja(subcondition): + """ + Transform PD event rule subcondition to Jinja2 template. + """ + operator = subcondition["operator"] + path = subcondition["parameters"]["path"] + value = subcondition["parameters"]["value"] + if value: + value = value.replace('"', '\\"').replace("'", "\\'") + + OPERATOR_TO_JINJA_TEMPLATE = { + "exists": "{path} is defined", + "nexists": "{path} is not defined", + "equals": '{path} == "{value}"', + "nequals": '{path} != "{value}"', + "contains": '"{value}" in {path}', + "ncontains": '"{value}" not in {path}', + "matches": '{path} | regex_match("{value}")', + "nmatches": 'not ({path} | regex_match("{value}"))', + } + jinja_template = OPERATOR_TO_JINJA_TEMPLATE[operator].format(path=path, value=value) + return jinja_template + + +def _pd_service_id_to_oncall_escalation_chain_id( + service_id, services, escalation_policies +): + """ + Helper function to get the OnCall escalation chain ID from a PD service ID. + """ + + if service_id is None: + return None + + service = find_by_id(services, service_id) + escalation_policy_id = service["escalation_policy"]["id"] + escalation_policy = find_by_id(escalation_policies, escalation_policy_id) + escalation_chain_id = escalation_policy["oncall_escalation_chain"]["id"] + + return escalation_chain_id