PD migrator: add ability to migrate event rules (#1555)

# What this PR does
Adds an ability to migrate global event rulesets to OnCall integrations.

## Which issue(s) this PR fixes
Related to https://github.com/grafana/oncall/issues/1300
This commit is contained in:
Vadim Stepanov 2023-03-16 14:58:21 +00:00 committed by GitHub
parent 3de7766389
commit 4aa4adfecb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 233 additions and 9 deletions

View file

@ -3,15 +3,22 @@ import datetime
from pdpyras import APISession
from migrator import oncall_api_client
from migrator.config import MODE, MODE_PLAN, PAGERDUTY_API_TOKEN
from migrator.config import (
EXPERIMENTAL_MIGRATE_EVENT_RULES,
MODE,
MODE_PLAN,
PAGERDUTY_API_TOKEN,
)
from migrator.report import (
TAB,
escalation_policy_report,
format_escalation_policy,
format_integration,
format_ruleset,
format_schedule,
format_user,
integration_report,
ruleset_report,
schedule_report,
user_report,
)
@ -26,6 +33,7 @@ from migrator.resources.integrations import (
migrate_integration,
)
from migrator.resources.notification_rules import migrate_notification_rules
from migrator.resources.rulesets import match_ruleset, migrate_ruleset
from migrator.resources.schedules import match_schedule, migrate_schedule
from migrator.resources.users import (
match_user,
@ -88,6 +96,14 @@ def main() -> None:
oncall_integrations = oncall_api_client.list_all("integrations")
rulesets = None
if EXPERIMENTAL_MIGRATE_EVENT_RULES:
print("▶ Fetching event rules (rulesets) ...")
rulesets = session.list_all("rulesets")
for ruleset in rulesets:
rules = session.list_all(f"rulesets/{ruleset['id']}/rules")
ruleset["rules"] = rules
for user in users:
match_user(user, oncall_users)
@ -108,15 +124,18 @@ def main() -> None:
match_integration_type(integration, vendors)
match_escalation_policy_for_integration(integration, escalation_policies)
if rulesets is not None:
for ruleset in rulesets:
match_ruleset(ruleset, oncall_integrations, escalation_policies, services)
if MODE == MODE_PLAN:
print()
print(user_report(users))
print()
print(schedule_report(schedules))
print()
print(escalation_policy_report(escalation_policies))
print()
print(integration_report(integrations))
print(user_report(users), end="\n\n")
print(schedule_report(schedules), end="\n\n")
print(escalation_policy_report(escalation_policies), end="\n\n")
print(integration_report(integrations), end="\n\n")
if rulesets is not None:
print(ruleset_report(rulesets), end="\n\n")
return
@ -147,6 +166,13 @@ def main() -> None:
migrate_integration(integration, escalation_policies)
print(TAB + format_integration(integration))
if rulesets is not None:
print("▶ Migrating event rules (rulesets) ...")
for ruleset in rulesets:
if not ruleset["flawed_escalation_policies"]:
migrate_ruleset(ruleset, escalation_policies, services)
print(TAB + format_ruleset(ruleset))
if __name__ == "__main__":
main()

View file

@ -41,3 +41,8 @@ SCHEDULE_MIGRATION_MODE_WEB = "web"
SCHEDULE_MIGRATION_MODE = os.getenv(
"SCHEDULE_MIGRATION_MODE", SCHEDULE_MIGRATION_MODE_ICAL
)
# Experimental feature to migrate PD rulesets to OnCall integrations
EXPERIMENTAL_MIGRATE_EVENT_RULES = (
os.getenv("EXPERIMENTAL_MIGRATE_EVENT_RULES", "false").lower() == "true"
)

View file

@ -163,3 +163,36 @@ def integration_report(integrations: list[dict]) -> str:
)
return result
def format_ruleset(ruleset: dict) -> str:
if ruleset["flawed_escalation_policies"]:
escalation_policy_names = [
p["name"] for p in ruleset["flawed_escalation_policies"]
]
result = "{} {} — escalation policies '{}' reference unmatched users or schedules that cannot be migrated".format(
ERROR_SIGN, ruleset["name"], ", ".join(escalation_policy_names)
)
else:
result = "{} {}".format(SUCCESS_SIGN, ruleset["name"])
return result
def ruleset_report(rulesets: list[dict]) -> str:
result = "Event rules (rulesets) report:"
for ruleset in sorted(
rulesets,
key=lambda r: bool(r["flawed_escalation_policies"]),
reverse=True,
):
result += "\n" + TAB + format_ruleset(ruleset)
if not ruleset["flawed_escalation_policies"] and ruleset["oncall_integration"]:
result += (
" (existing integration with name '{} Ruleset' will be deleted)".format(
ruleset["name"]
)
)
return result

View file

@ -0,0 +1,160 @@
from migrator import oncall_api_client
from migrator.utils import find_by_id
def match_ruleset(
ruleset: dict,
oncall_integrations: list[dict],
escalation_policies: list[dict],
services: list[dict],
) -> None:
# Find existing integration with the same name
oncall_integration = None
name = "{} Ruleset".format(ruleset["name"]).lower().strip()
for candidate in oncall_integrations:
if candidate["name"].lower().strip() == name:
oncall_integration = candidate
ruleset["oncall_integration"] = oncall_integration
# Find services that use escalation policies that cannot be migrated
service_ids = [
r["actions"]["route"]["value"]
for r in ruleset["rules"]
if not r["disabled"] and r["actions"]["route"]
]
escalation_policy_ids = [
find_by_id(services, service_id)["escalation_policy"]["id"]
for service_id in service_ids
]
flawed_escalation_policies = []
for escalation_policy_id in escalation_policy_ids:
escalation_policy = find_by_id(escalation_policies, escalation_policy_id)
if bool(
escalation_policy["unmatched_users"]
or escalation_policy["flawed_schedules"]
):
flawed_escalation_policies.append(escalation_policy)
ruleset["flawed_escalation_policies"] = flawed_escalation_policies
def migrate_ruleset(
ruleset: dict, escalation_policies: list[dict], services: list[dict]
) -> None:
# Delete existing integration with the same name
if ruleset["oncall_integration"]:
oncall_api_client.delete(
"integrations/{}".format(ruleset["oncall_integration"]["id"])
)
# Create new integration with type "webhook"
integration_payload = {
"name": "{} Ruleset".format(ruleset["name"]),
"type": "webhook",
"team_id": None,
}
integration = oncall_api_client.create("integrations", integration_payload)
# Migrate rules that are not disabled and not catch-all
rules = [r for r in ruleset["rules"] if not r["disabled"] and not r["catch_all"]]
for rule in rules:
service_id = (
rule["actions"]["route"]["value"] if rule["actions"]["route"] else None
)
escalation_chain_id = _pd_service_id_to_oncall_escalation_chain_id(
service_id, services, escalation_policies
)
filtering_term = transform_condition_to_jinja(rule["conditions"])
route_payload = {
"routing_type": "jinja2",
"routing_regex": filtering_term,
"position": rule["position"],
"integration_id": integration["id"],
"escalation_chain_id": escalation_chain_id,
}
oncall_api_client.create("routes", route_payload)
# Migrate catch-all rule
catch_all_rule = [r for r in ruleset["rules"] if r["catch_all"]][0]
catch_all_service_id = (
catch_all_rule["actions"]["route"]["value"]
if catch_all_rule["actions"]["route"]
else None
)
catch_all_escalation_chain_id = _pd_service_id_to_oncall_escalation_chain_id(
catch_all_service_id, services, escalation_policies
)
if catch_all_escalation_chain_id:
# Get the default route and update it to use appropriate escalation chain
routes = oncall_api_client.list_all(
"routes/?integration_id={}".format(integration["id"])
)
default_route_id = routes[-1]["id"]
oncall_api_client.update(
f"routes/{default_route_id}",
{"escalation_chain_id": catch_all_escalation_chain_id},
)
def transform_condition_to_jinja(condition):
"""
Transform PD event rule condition to Jinja2 template
"""
operator = condition["operator"]
assert operator in ("and", "or")
# Insert "and" or "or" between subconditions
template = f" {operator} ".join(
[
"(" + transform_subcondition_to_jinja(subcondition) + ")"
for subcondition in condition["subconditions"]
]
)
template = "{{ " + template + " }}"
return template
def transform_subcondition_to_jinja(subcondition):
"""
Transform PD event rule subcondition to Jinja2 template.
"""
operator = subcondition["operator"]
path = subcondition["parameters"]["path"]
value = subcondition["parameters"]["value"]
if value:
value = value.replace('"', '\\"').replace("'", "\\'")
OPERATOR_TO_JINJA_TEMPLATE = {
"exists": "{path} is defined",
"nexists": "{path} is not defined",
"equals": '{path} == "{value}"',
"nequals": '{path} != "{value}"',
"contains": '"{value}" in {path}',
"ncontains": '"{value}" not in {path}',
"matches": '{path} | regex_match("{value}")',
"nmatches": 'not ({path} | regex_match("{value}"))',
}
jinja_template = OPERATOR_TO_JINJA_TEMPLATE[operator].format(path=path, value=value)
return jinja_template
def _pd_service_id_to_oncall_escalation_chain_id(
service_id, services, escalation_policies
):
"""
Helper function to get the OnCall escalation chain ID from a PD service ID.
"""
if service_id is None:
return None
service = find_by_id(services, service_id)
escalation_policy_id = service["escalation_policy"]["id"]
escalation_policy = find_by_id(escalation_policies, escalation_policy_id)
escalation_chain_id = escalation_policy["oncall_escalation_chain"]["id"]
return escalation_chain_id