PD migrator: include service & integration names to ruleset (#1687)

# What this PR does
Allows to include service & integration names to migrated ruleset name
when passing `EXPERIMENTAL_MIGRATE_EVENT_RULES_LONG_NAMES=True`.
This commit is contained in:
Vadim Stepanov 2023-03-31 18:12:45 +01:00 committed by GitHub
parent cf026c6c9a
commit 47b0ab4465
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 53 additions and 8 deletions

View file

@ -126,7 +126,13 @@ def main() -> None:
if rulesets is not None:
for ruleset in rulesets:
match_ruleset(ruleset, oncall_integrations, escalation_policies, services)
match_ruleset(
ruleset,
oncall_integrations,
escalation_policies,
services,
integrations,
)
if MODE == MODE_PLAN:
print(user_report(users), end="\n\n")

View file

@ -46,3 +46,7 @@ SCHEDULE_MIGRATION_MODE = os.getenv(
EXPERIMENTAL_MIGRATE_EVENT_RULES = (
os.getenv("EXPERIMENTAL_MIGRATE_EVENT_RULES", "false").lower() == "true"
)
# Set to true to include service & integration names in the ruleset name
EXPERIMENTAL_MIGRATE_EVENT_RULES_LONG_NAMES = (
os.getenv("EXPERIMENTAL_MIGRATE_EVENT_RULES_LONG_NAMES", "false").lower() == "true"
)

View file

@ -189,10 +189,8 @@ def ruleset_report(rulesets: list[dict]) -> str:
):
result += "\n" + TAB + format_ruleset(ruleset)
if not ruleset["flawed_escalation_policies"] and ruleset["oncall_integration"]:
result += (
" (existing integration with name '{} Ruleset' will be deleted)".format(
ruleset["name"]
)
result += " (existing integration with name '{}' will be deleted)".format(
ruleset["oncall_name"]
)
return result

View file

@ -1,4 +1,5 @@
from migrator import oncall_api_client
from migrator.config import EXPERIMENTAL_MIGRATE_EVENT_RULES_LONG_NAMES
from migrator.utils import find_by_id
@ -7,14 +8,16 @@ def match_ruleset(
oncall_integrations: list[dict],
escalation_policies: list[dict],
services: list[dict],
integrations: list[dict],
) -> None:
# Find existing integration with the same name
oncall_integration = None
name = "{} Ruleset".format(ruleset["name"]).lower().strip()
name = _generate_ruleset_name(ruleset, services, integrations)
for candidate in oncall_integrations:
if candidate["name"].lower().strip() == name:
if candidate["name"].lower().strip() == name.lower().strip():
oncall_integration = candidate
ruleset["oncall_integration"] = oncall_integration
ruleset["oncall_name"] = name
# Find services that use escalation policies that cannot be migrated
service_ids = [
@ -52,7 +55,7 @@ def migrate_ruleset(
# Create new integration with type "webhook"
integration_payload = {
"name": "{} Ruleset".format(ruleset["name"]),
"name": ruleset["oncall_name"],
"type": "webhook",
"team_id": None,
}
@ -163,3 +166,37 @@ def _pd_service_id_to_oncall_escalation_chain_id(
escalation_chain_id = escalation_policy["oncall_escalation_chain"]["id"]
return escalation_chain_id
def _generate_ruleset_name(ruleset, services, integrations):
result = "{} Ruleset".format(ruleset["name"])
if not EXPERIMENTAL_MIGRATE_EVENT_RULES_LONG_NAMES:
return result
service_ids = [
r["actions"]["route"]["value"]
for r in sorted(ruleset["rules"], key=lambda r: r["position"])
if not r["disabled"] and r["actions"]["route"]
]
ruleset_services = [find_by_id(services, service_id) for service_id in service_ids]
ruleset_services = [s for s in ruleset_services if s is not None]
if not ruleset_services:
return result
service_names = []
for service in ruleset_services:
service_name = service["name"]
service_integrations = [
integration
for integration in integrations
if integration["service"]["id"] == service["id"]
]
if service_integrations:
service_name += " ({})".format(
", ".join([integration["name"] for integration in service_integrations])
)
service_names.append(service_name)
# OnCall limit for integration name is 150 chars
return "{}: {}".format(result, ", ".join(service_names))[:150]