oncall-engine/tools/migrators/lib/pagerduty/resources/rulesets.py
Joey Orlando c46dff09d9
Splunk OnCall migration tool (#4267)
# What this PR does

Refactors the PagerDuty migration script to be a bit more generic + adds
a migration script to migrate from Splunk OnCall (VictorOps)

tldr;
```bash
❯ docker build -t oncall-migrator .
[+] Building 0.4s (10/10) FINISHED
❯ docker run --rm \
-e MIGRATING_FROM="pagerduty" \
-e MODE="plan" \
-e ONCALL_API_URL="http://localhost:8080" \
-e ONCALL_API_TOKEN="<ONCALL_API_TOKEN>" \
-e PAGERDUTY_API_TOKEN="<PAGERDUTY_API_TOKEN>" \
oncall-migrator
running pagerduty migration script...

❯ docker run --rm \
-e MIGRATING_FROM="splunk" \
-e MODE="plan" \
-e ONCALL_API_URL="http://localhost:8080" \
-e ONCALL_API_TOKEN="<ONCALL_API_TOKEN>" \
-e SPLUNK_API_ID="<SPLUNK_API_ID>" \
-e SPLUNK_API_KEY="<SPLUNK_API_KEY>" \
oncall-migrator
migrating from splunk oncall...
```

https://www.loom.com/share/a855062d436a4ef79f030e22528d8c71

## Checklist

- [x] Unit, integration, and e2e (if applicable) tests updated
- [x] Documentation added (or `pr:no public docs` PR label added if not
required)
- [x] Added the relevant release notes label (see labels prefixed w/
`release:`). These labels dictate how your PR will
    show up in the autogenerated release notes.
2024-05-14 13:53:59 +00:00

202 lines
7 KiB
Python

from lib.oncall.api_client import OnCallAPIClient
from lib.pagerduty.config import EXPERIMENTAL_MIGRATE_EVENT_RULES_LONG_NAMES
from lib.utils import find_by_id
def match_ruleset(
ruleset: dict,
oncall_integrations: list[dict],
escalation_policies: list[dict],
services: list[dict],
integrations: list[dict],
) -> None:
# Find existing integration with the same name
oncall_integration = None
name = _generate_ruleset_name(ruleset, services, integrations)
for candidate in oncall_integrations:
if candidate["name"].lower().strip() == name.lower().strip():
oncall_integration = candidate
ruleset["oncall_integration"] = oncall_integration
ruleset["oncall_name"] = name
# Find services that use escalation policies that cannot be migrated
service_ids = [
r["actions"]["route"]["value"]
for r in ruleset["rules"]
if not r["disabled"] and r["actions"]["route"]
]
escalation_policy_ids = []
for service_id in service_ids:
service = find_by_id(services, service_id)
# Sometimes service cannot be found, e.g. when it is deleted but still referenced in ruleset
if service:
escalation_policy_ids.append(service["escalation_policy"]["id"])
flawed_escalation_policies = []
for escalation_policy_id in escalation_policy_ids:
escalation_policy = find_by_id(escalation_policies, escalation_policy_id)
if bool(
escalation_policy["unmatched_users"]
or escalation_policy["flawed_schedules"]
):
flawed_escalation_policies.append(escalation_policy)
ruleset["flawed_escalation_policies"] = flawed_escalation_policies
def migrate_ruleset(
ruleset: dict, escalation_policies: list[dict], services: list[dict]
) -> None:
# Delete existing integration with the same name
if ruleset["oncall_integration"]:
OnCallAPIClient.delete(
"integrations/{}".format(ruleset["oncall_integration"]["id"])
)
# Create new integration with type "webhook"
integration_payload = {
"name": ruleset["oncall_name"],
"type": "webhook",
"team_id": None,
}
integration = OnCallAPIClient.create("integrations", integration_payload)
# Migrate rules that are not disabled and not catch-all
rules = [r for r in ruleset["rules"] if not r["disabled"] and not r["catch_all"]]
for rule in sorted(rules, key=lambda r: r["position"]):
service_id = (
rule["actions"]["route"]["value"] if rule["actions"]["route"] else None
)
escalation_chain_id = _pd_service_id_to_oncall_escalation_chain_id(
service_id, services, escalation_policies
)
filtering_term = transform_condition_to_jinja(rule["conditions"])
route_payload = {
"routing_type": "jinja2",
"routing_regex": filtering_term,
"integration_id": integration["id"],
"escalation_chain_id": escalation_chain_id,
}
OnCallAPIClient.create("routes", route_payload)
# Migrate catch-all rule
catch_all_rule = [r for r in ruleset["rules"] if r["catch_all"]][0]
catch_all_service_id = (
catch_all_rule["actions"]["route"]["value"]
if catch_all_rule["actions"]["route"]
else None
)
catch_all_escalation_chain_id = _pd_service_id_to_oncall_escalation_chain_id(
catch_all_service_id, services, escalation_policies
)
if catch_all_escalation_chain_id:
# Get the default route and update it to use appropriate escalation chain
routes = OnCallAPIClient.list_all(
"routes/?integration_id={}".format(integration["id"])
)
default_route_id = routes[-1]["id"]
OnCallAPIClient.update(
f"routes/{default_route_id}",
{"escalation_chain_id": catch_all_escalation_chain_id},
)
def transform_condition_to_jinja(condition):
"""
Transform PD event rule condition to Jinja2 template
"""
operator = condition["operator"]
assert operator in ("and", "or")
# Insert "and" or "or" between subconditions
template = f" {operator} ".join(
[
"(" + transform_subcondition_to_jinja(subcondition) + ")"
for subcondition in condition["subconditions"]
]
)
template = "{{ " + template + " }}"
return template
def transform_subcondition_to_jinja(subcondition):
"""
Transform PD event rule subcondition to Jinja2 template.
"""
operator = subcondition["operator"]
path = subcondition["parameters"]["path"]
value = subcondition["parameters"]["value"]
if value:
value = value.replace('"', '\\"').replace("'", "\\'")
OPERATOR_TO_JINJA_TEMPLATE = {
"exists": "{path} is defined",
"nexists": "{path} is not defined",
"equals": '{path} == "{value}"',
"nequals": '{path} != "{value}"',
"contains": '"{value}" in {path}',
"ncontains": '"{value}" not in {path}',
"matches": '{path} | regex_match("{value}")',
"nmatches": 'not ({path} | regex_match("{value}"))',
}
jinja_template = OPERATOR_TO_JINJA_TEMPLATE[operator].format(path=path, value=value)
return jinja_template
def _pd_service_id_to_oncall_escalation_chain_id(
service_id, services, escalation_policies
):
"""
Helper function to get the OnCall escalation chain ID from a PD service ID.
"""
if service_id is None:
return None
service = find_by_id(services, service_id)
if service is None:
# Service cannot be found, e.g. when it is deleted but still referenced in ruleset
return None
escalation_policy_id = service["escalation_policy"]["id"]
escalation_policy = find_by_id(escalation_policies, escalation_policy_id)
escalation_chain_id = escalation_policy["oncall_escalation_chain"]["id"]
return escalation_chain_id
def _generate_ruleset_name(ruleset, services, integrations):
result = "{} Ruleset".format(ruleset["name"])
if not EXPERIMENTAL_MIGRATE_EVENT_RULES_LONG_NAMES:
return result
service_ids = [
r["actions"]["route"]["value"]
for r in sorted(ruleset["rules"], key=lambda r: r["position"])
if not r["disabled"] and r["actions"]["route"]
]
ruleset_services = [find_by_id(services, service_id) for service_id in service_ids]
ruleset_services = [s for s in ruleset_services if s is not None]
if not ruleset_services:
return result
service_names = []
for service in ruleset_services:
service_name = service["name"]
service_integrations = [
integration
for integration in integrations
if integration["service"]["id"] == service["id"]
]
if service_integrations:
service_name += " ({})".format(
", ".join([integration["name"] for integration in service_integrations])
)
service_names.append(service_name)
# OnCall limit for integration name is 150 chars
return "{}: {}".format(result, ", ".join(service_names))[:150]