oncall-engine/tools/migrators/lib/opsgenie/resources/integrations.py
Joey Orlando e4728ea69f
feat: add opsgenie to migrator script (#5495)
This PR adds support for migrating data from OpsGenie to Grafana IRM.

Closes https://github.com/grafana/irm/issues/1179
2025-04-07 08:47:27 -04:00

63 lines
2.1 KiB
Python

import re
from typing import List
from lib.oncall.api_client import OnCallAPIClient
from lib.opsgenie.config import (
OPSGENIE_FILTER_INTEGRATION_REGEX,
OPSGENIE_FILTER_TEAM,
OPSGENIE_TO_ONCALL_VENDOR_MAP,
UNSUPPORTED_INTEGRATION_TO_WEBHOOKS,
)
def filter_integrations(integrations: list[dict]) -> list[dict]:
"""Apply filters to integrations."""
if OPSGENIE_FILTER_TEAM:
integrations = [
i for i in integrations if i.get("teamId") == OPSGENIE_FILTER_TEAM
]
if OPSGENIE_FILTER_INTEGRATION_REGEX:
pattern = re.compile(OPSGENIE_FILTER_INTEGRATION_REGEX)
integrations = [i for i in integrations if pattern.match(i["name"])]
return integrations
def match_integration(integration: dict, oncall_integrations: List[dict]) -> None:
"""
Match OpsGenie integration with Grafana OnCall integration + match opsgenie
integration type with Grafana OnCall integration type.
"""
oncall_integration = None
for candidate in oncall_integrations:
name = integration["name"].lower().strip()
if name == candidate["name"].lower().strip():
oncall_integration = candidate
integration["oncall_integration"] = oncall_integration
integration_type = OPSGENIE_TO_ONCALL_VENDOR_MAP.get(integration["type"])
if not integration_type and UNSUPPORTED_INTEGRATION_TO_WEBHOOKS:
integration_type = "webhook"
integration["oncall_type"] = integration_type
def migrate_integration(integration: dict) -> None:
"""Migrate OpsGenie integration to Grafana OnCall."""
if integration["oncall_integration"]:
OnCallAPIClient.delete(
f"integrations/{integration['oncall_integration']['id']}"
)
# Create new integration
payload = {
"name": integration["name"],
"type": integration["oncall_type"],
"team_id": None,
}
if integration.get("oncall_escalation_chain"):
payload["escalation_chain_id"] = integration["oncall_escalation_chain"]["id"]
integration["oncall_integration"] = OnCallAPIClient.create("integrations", payload)