This PR adds support for migrating data from OpsGenie to Grafana IRM. Closes https://github.com/grafana/irm/issues/1179
156 lines
5.5 KiB
Python
156 lines
5.5 KiB
Python
from lib.common.report import TAB
|
|
from lib.common.resources.users import match_user
|
|
from lib.oncall.api_client import OnCallAPIClient
|
|
from lib.opsgenie.api_client import OpsGenieAPIClient
|
|
from lib.opsgenie.config import (
|
|
MIGRATE_USERS,
|
|
MODE,
|
|
MODE_PLAN,
|
|
UNSUPPORTED_INTEGRATION_TO_WEBHOOKS,
|
|
)
|
|
from lib.opsgenie.report import (
|
|
escalation_policy_report,
|
|
format_escalation_policy,
|
|
format_integration,
|
|
format_schedule,
|
|
format_user,
|
|
integration_report,
|
|
schedule_report,
|
|
user_report,
|
|
)
|
|
from lib.opsgenie.resources.escalation_policies import (
|
|
filter_escalation_policies,
|
|
match_escalation_policy,
|
|
match_users_and_schedules_for_escalation_policy,
|
|
migrate_escalation_policy,
|
|
)
|
|
from lib.opsgenie.resources.integrations import (
|
|
filter_integrations,
|
|
match_integration,
|
|
migrate_integration,
|
|
)
|
|
from lib.opsgenie.resources.notification_rules import migrate_notification_rules
|
|
from lib.opsgenie.resources.schedules import (
|
|
filter_schedules,
|
|
match_schedule,
|
|
match_users_for_schedule,
|
|
migrate_schedule,
|
|
)
|
|
from lib.opsgenie.resources.users import filter_users
|
|
|
|
|
|
def migrate() -> None:
|
|
client = OpsGenieAPIClient()
|
|
|
|
if MIGRATE_USERS:
|
|
print("▶ Fetching users...")
|
|
users = client.list_users()
|
|
users = filter_users(users)
|
|
else:
|
|
print("▶ Skipping user migration as MIGRATE_USERS is false...")
|
|
users = []
|
|
|
|
oncall_users = OnCallAPIClient.list_users_with_notification_rules()
|
|
|
|
print("▶ Fetching schedules...")
|
|
schedules = client.list_schedules()
|
|
schedules = filter_schedules(schedules)
|
|
oncall_schedules = OnCallAPIClient.list_all("schedules")
|
|
|
|
print("▶ Fetching escalation policies...")
|
|
escalation_policies = client.list_escalation_policies()
|
|
escalation_policies = filter_escalation_policies(escalation_policies)
|
|
oncall_escalation_chains = OnCallAPIClient.list_all("escalation_chains")
|
|
|
|
print("▶ Fetching integrations...")
|
|
integrations = client.list_integrations()
|
|
integrations = filter_integrations(integrations)
|
|
oncall_integrations = OnCallAPIClient.list_all("integrations")
|
|
|
|
# Match users with their Grafana OnCall counterparts
|
|
if MIGRATE_USERS:
|
|
print("\n▶ Matching users...")
|
|
for user in users:
|
|
match_user(user, oncall_users)
|
|
print(user_report(users))
|
|
|
|
# Match schedules with their Grafana OnCall counterparts
|
|
print("\n▶ Matching schedules...")
|
|
user_id_map = {
|
|
u["id"]: u["oncall_user"]["id"] for u in users if u.get("oncall_user")
|
|
}
|
|
for schedule in schedules:
|
|
match_schedule(schedule, oncall_schedules, user_id_map)
|
|
match_users_for_schedule(schedule, users)
|
|
print(schedule_report(schedules))
|
|
|
|
# Match escalation policies with their Grafana OnCall counterparts
|
|
print("\n▶ Matching escalation policies...")
|
|
for policy in escalation_policies:
|
|
match_escalation_policy(policy, oncall_escalation_chains)
|
|
match_users_and_schedules_for_escalation_policy(policy, users, schedules)
|
|
print(escalation_policy_report(escalation_policies))
|
|
|
|
# Match integrations with their Grafana OnCall counterparts
|
|
print("\n▶ Matching integrations...")
|
|
for integration in integrations:
|
|
match_integration(integration, oncall_integrations)
|
|
print(integration_report(integrations))
|
|
|
|
if MODE == MODE_PLAN:
|
|
return
|
|
|
|
# Migrate users
|
|
if MIGRATE_USERS:
|
|
print("\n▶ Migrating users...")
|
|
for user in users:
|
|
if user.get("oncall_user"):
|
|
print(f"{TAB}Migrating {format_user(user)}...")
|
|
migrate_notification_rules(user)
|
|
|
|
# Migrate schedules
|
|
print("\n▶ Migrating schedules...")
|
|
for schedule in schedules:
|
|
if not schedule.get("migration_errors"):
|
|
print(f"{TAB}Migrating {format_schedule(schedule)}...")
|
|
migrate_schedule(schedule, user_id_map)
|
|
|
|
# Migrate escalation policies
|
|
print("\n▶ Migrating escalation policies...")
|
|
for policy in escalation_policies:
|
|
if all(rule["notifyType"] != "default" for rule in policy["rules"]):
|
|
print(
|
|
f"{TAB}Skipping migrating {format_escalation_policy(policy)} because all of its rules "
|
|
"have a non-default notifyType"
|
|
)
|
|
continue
|
|
elif any(rule["notifyType"] != "default" for rule in policy["rules"]):
|
|
print(
|
|
f"{TAB}Migrating {format_escalation_policy(policy)} but some of its rules "
|
|
"have a non-default notifyType, and those rules will not be migrated"
|
|
)
|
|
else:
|
|
print(f"{TAB}Migrating {format_escalation_policy(policy)}...")
|
|
|
|
migrate_escalation_policy(policy, users, schedules)
|
|
|
|
# Migrate integrations
|
|
print("\n▶ Migrating integrations...")
|
|
for integration in integrations:
|
|
print(f"{TAB}Migrating {format_integration(integration)}...")
|
|
|
|
if (
|
|
integration["oncall_type"] is None
|
|
and not UNSUPPORTED_INTEGRATION_TO_WEBHOOKS
|
|
):
|
|
print(
|
|
f"{TAB}Skipping {format_integration(integration)} because it is not supported and UNSUPPORTED_INTEGRATION_TO_WEBHOOKS is false"
|
|
)
|
|
continue
|
|
elif integration["oncall_type"] is None and UNSUPPORTED_INTEGRATION_TO_WEBHOOKS:
|
|
print(
|
|
f"{TAB}Migrating {format_integration(integration)} as webhook because it is not supported and UNSUPPORTED_INTEGRATION_TO_WEBHOOKS is true"
|
|
)
|
|
continue
|
|
|
|
migrate_integration(integration)
|