oncall-engine/tools/migrators/lib/pagerduty/resources/services.py
Joey Orlando 307afbe464
fix(pagerduty): apply several small PagerDuty migrator fixes (#5536)
This PR applies the diff described in
https://github.com/grafana/irm/issues/1863 to the PagerDuty migrator
tool.

Specifically, it:
- Fixes the target type check for user references in service filtering
- Ensures escalation policies are included in the PagerDuty API request
- Corrects the logging output for service migration errors
2025-05-12 19:33:31 -04:00

686 lines
23 KiB
Python

import json
import re
from typing import Any, Dict, List, Optional, Union
from pdpyras import APISession
from lib.common.report import TAB
from lib.grafana.service_model_client import ServiceModelClient
from lib.pagerduty.config import (
PAGERDUTY_FILTER_SERVICE_REGEX,
PAGERDUTY_FILTER_TEAM,
PAGERDUTY_FILTER_USERS,
)
from lib.pagerduty.report import format_service
def filter_services(services: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Filter services based on configured filters.
Args:
services: List of service dictionaries to filter
Returns:
List of filtered services
"""
filtered_services = []
filtered_out = 0
for service in services:
should_include = True
reason = None
# Filter by team
if PAGERDUTY_FILTER_TEAM:
teams = service.get("teams", [])
if not any(team["summary"] == PAGERDUTY_FILTER_TEAM for team in teams):
should_include = False
reason = f"No teams found for team filter: {PAGERDUTY_FILTER_TEAM}"
# Filter by users (for technical services)
if (
should_include
and PAGERDUTY_FILTER_USERS
and service.get("type") != "business_service"
):
service_users = set()
# Get users from escalation policy if present
if service.get("escalation_policy"):
for rule in service["escalation_policy"].get("escalation_rules", []):
for target in rule.get("targets", []):
if target["type"] == "user_reference":
service_users.add(target["id"])
if not any(user_id in service_users for user_id in PAGERDUTY_FILTER_USERS):
should_include = False
reason = f"No users found for user filter: {','.join(PAGERDUTY_FILTER_USERS)}"
# Filter by name regex
if should_include and PAGERDUTY_FILTER_SERVICE_REGEX:
if not re.match(PAGERDUTY_FILTER_SERVICE_REGEX, service["name"]):
should_include = False
reason = f"Service name does not match regex: {PAGERDUTY_FILTER_SERVICE_REGEX}"
if should_include:
filtered_services.append(service)
else:
filtered_out += 1
print(f"{TAB}Service {service['id']}: {reason}")
if filtered_out > 0:
print(f"Filtered out {filtered_out} services")
return filtered_services
class BusinessService:
"""Class representing a PagerDuty business service with all necessary metadata."""
def __init__(self, service_data: Dict[str, Any]):
"""
Initialize a PagerDuty business service from API data.
Args:
service_data: Raw business service data from the PagerDuty API
"""
self.id = service_data.get("id")
self.name = service_data.get("name", "")
self.description = service_data.get("description", "")
self.point_of_contact = service_data.get("point_of_contact", "")
self.created_at = service_data.get("created_at")
self.updated_at = service_data.get("updated_at")
# URLs and permalinks
self.html_url = service_data.get("html_url")
self.self_url = service_data.get("self")
# Related entities
self.teams = service_data.get("teams", [])
# Dependencies - will be populated separately
self.dependencies = []
# Store raw data for access to any fields we might need later
self.raw_data = service_data
def __str__(self) -> str:
return f"BusinessService(id={self.id}, name={self.name})"
class TechnicalService:
"""Class representing a PagerDuty technical service with all necessary metadata for migration."""
def __init__(self, service_data: Dict[str, Any]):
"""
Initialize a PagerDuty technical service from API data.
Args:
service_data: Raw service data from the PagerDuty API
"""
self.id = service_data.get("id")
self.name = service_data.get("name", "")
self.description = service_data.get("description", "")
self.status = service_data.get("status", "")
self.created_at = service_data.get("created_at")
self.updated_at = service_data.get("updated_at")
# URLs and permalinks
self.html_url = service_data.get("html_url")
self.self_url = service_data.get("self")
# Related entities
self.escalation_policy = service_data.get("escalation_policy", {})
self.teams = service_data.get("teams", [])
# Dependencies - will be populated separately
self.dependencies = []
# Store raw data for access to any fields we might need later
self.raw_data = service_data
def __str__(self) -> str:
return f"TechnicalService(id={self.id}, name={self.name})"
def fetch_services(
session: APISession, include_integrations: bool = True, include_teams: bool = True
) -> List[TechnicalService]:
"""
Fetch all PagerDuty technical services with their metadata.
Args:
session: Authenticated PagerDuty API session
include_integrations: Whether to include integrations data
include_teams: Whether to include teams data
Returns:
List of TechnicalService objects
"""
include_params = []
if include_integrations:
include_params.append("integrations")
if include_teams:
include_params.append("teams")
include_params.append("escalation_policies")
params = {}
if include_params:
params["include[]"] = include_params
# Fetch all services with the specified includes
services_data = session.list_all("services", params=params)
# Convert to TechnicalService objects
services = [TechnicalService(service) for service in services_data]
return services
def fetch_service_dependencies(
session: APISession, services: List[TechnicalService]
) -> None:
"""
Fetch and populate service dependencies using PagerDuty's service dependencies API.
This function modifies the provided services list in-place by populating
the dependencies field for each service.
Args:
session: Authenticated PagerDuty API session
services: List of TechnicalService objects to update with dependencies
"""
# Create a mapping of service_id to service for efficient lookup
service_map = {service.id: service for service in services}
print("Fetching service dependencies...")
# Process each service to find its dependencies
for service in services:
try:
# Use the service dependencies endpoint for technical services
# Format: https://api.pagerduty.com/service_dependencies/technical_services/{id}
response = session.get(
f"service_dependencies/technical_services/{service.id}"
)
# Parse the response - depending on how pdpyras works, this might already be parsed
# If it's already a dict, this will just use it as is
dependencies_data = response
if hasattr(response, "json"):
dependencies_data = response.json()
# Extract relationships from the response
if (
dependencies_data
and isinstance(dependencies_data, dict)
and "relationships" in dependencies_data
):
for relationship in dependencies_data["relationships"]:
# A dependency relationship has a supporting_service that the current service depends on
if "supporting_service" in relationship:
dep_id = relationship["supporting_service"]["id"]
if (
dep_id in service_map and dep_id != service.id
): # Avoid self-references
service.dependencies.append(service_map[dep_id])
else:
print(
f"No valid relationship data found for service {service.name} (ID: {service.id})"
)
except Exception as e:
# Log but continue if we can't fetch dependencies for a service
print(f"Error fetching dependencies for service {service.name}: {e}")
print(f"Completed fetching dependencies for {len(services)} services.")
def fetch_business_services(session: APISession) -> List[BusinessService]:
"""
Fetch all PagerDuty business services with their metadata.
Args:
session: Authenticated PagerDuty API session
Returns:
List of BusinessService objects
"""
# Fetch all business services
services_data = session.list_all("business_services")
# Convert to BusinessService objects
services = [BusinessService(service) for service in services_data]
return services
def get_all_technical_services_with_metadata(
session: APISession,
) -> List[TechnicalService]:
"""
Fetch all PagerDuty technical services with complete metadata including dependencies.
This is the main function that should be used by the migration process.
Args:
session: Authenticated PagerDuty API session
Returns:
List of TechnicalService objects with all required metadata
"""
# Fetch services with their basic metadata
services = fetch_services(session)
# Fetch and populate dependencies
fetch_service_dependencies(session, services)
return services
def fetch_business_service_dependencies(
session: APISession,
business_services: List[BusinessService],
technical_services: Dict[str, Any],
) -> None:
"""
Fetch and populate business service dependencies on technical services.
This function modifies the provided business services list in-place by populating
the dependencies field for each service.
Args:
session: Authenticated PagerDuty API session
business_services: List of BusinessService objects to update with dependencies
technical_services: Dictionary mapping service IDs to technical service objects
"""
print("Fetching business service dependencies...")
# Process each business service to find its dependencies
for service in business_services:
try:
# Use the business service dependencies endpoint
response = session.get(
f"service_dependencies/business_services/{service.id}"
)
# Parse the response
dependencies_data = response
if hasattr(response, "json"):
dependencies_data = response.json()
# Extract relationships from the response
if (
dependencies_data
and isinstance(dependencies_data, dict)
and "relationships" in dependencies_data
):
for relationship in dependencies_data["relationships"]:
# A dependency relationship has a supporting_service that the business service depends on
if "supporting_service" in relationship:
dep_id = relationship["supporting_service"]["id"]
if (
dep_id in technical_services
): # Only add if it's a technical service
service.dependencies.append(technical_services[dep_id])
else:
print(
f"No valid relationship data found for business service {service.name} (ID: {service.id})"
)
except Exception as e:
# Log but continue if we can't fetch dependencies for a service
print(
f"Error fetching dependencies for business service {service.name}: {e}"
)
print(
f"Completed fetching dependencies for {len(business_services)} business services."
)
def get_all_business_services_with_metadata(
session: APISession, technical_services: Dict[str, Any]
) -> List[BusinessService]:
"""
Fetch all PagerDuty business services with complete metadata including dependencies.
Args:
session: Authenticated PagerDuty API session
technical_services: Dictionary mapping service IDs to technical service objects
Returns:
List of BusinessService objects with all required metadata
"""
# Fetch business services with their basic metadata
business_services = fetch_business_services(session)
# Fetch and populate dependencies
fetch_business_service_dependencies(session, business_services, technical_services)
return business_services
def _migrate_service_batch(
client: ServiceModelClient,
services: List[Any],
migrate_func: callable,
dry_run: bool = False,
) -> Dict[str, Any]:
"""
Migrate a batch of services using the provided migration function.
Args:
client: The ServiceModelClient to use
services: List of services to migrate
migrate_func: Function to use for migrating each service
dry_run: If True, only validate and log what would be done
Returns:
Dictionary containing migration statistics and created components
"""
created_components = {}
for service in services:
component = migrate_func(client, service, dry_run)
if component:
created_components[service.id] = component
return created_components
def _update_service_dependencies(
client: ServiceModelClient,
services: List[Any],
created_components: Dict[str, Any],
dry_run: bool = False,
) -> None:
"""
Update dependencies for all services with proper refs.
Args:
client: The ServiceModelClient to use
services: List of services to update
created_components: Dictionary of created components by service ID
dry_run: If True, only validate and log what would be done
"""
for service in services:
if service.id in created_components and service.dependencies:
component_name = created_components[service.id]["metadata"]["name"]
depends_on_refs = [
{
"apiVersion": "servicemodel.ext.grafana.com/v1alpha1",
"kind": "Component",
"name": created_components[dep.id]["metadata"]["name"],
}
for dep in service.dependencies
if dep.id in created_components
]
if depends_on_refs:
# Create patch payload with only the dependsOnRefs field
patch_payload = {"spec": {"dependsOnRefs": depends_on_refs}}
if not dry_run:
try:
client.patch_component(component_name, patch_payload)
print(f"Updated dependencies for service: {service.name}")
except Exception as e:
print(
f"Failed to update dependencies for service {service.name}: {e}"
)
# Log the full error details for debugging
print(f"Patch payload: {json.dumps(patch_payload, indent=2)}")
def _transform_service(
service: Union[TechnicalService, BusinessService]
) -> Dict[str, Any]:
"""
Transform a PagerDuty service (technical or business) into a Backstage Component.
Args:
service: The PagerDuty service to transform (either TechnicalService or BusinessService)
Returns:
A dictionary containing the transformed service in Backstage Component format
"""
# Determine service type and required fields
is_technical = isinstance(service, TechnicalService)
service_type = "service" if is_technical else "business_service"
# Create the base component structure
component = {
"apiVersion": "servicemodel.ext.grafana.com/v1alpha1",
"kind": "Component",
"metadata": {
"name": service.name.lower().replace(
" ", "-"
), # Convert to k8s-friendly name
"annotations": {"pagerduty.com/service-id": service.id},
},
"spec": {"type": service_type, "description": service.description},
}
# Add status annotation for technical services
if is_technical and hasattr(service, "status"):
component["metadata"]["annotations"]["pagerduty.com/status"] = service.status
# Add PagerDuty URLs to annotations
if service.html_url:
component["metadata"]["annotations"][
"pagerduty.com/html-url"
] = service.html_url
if service.self_url:
component["metadata"]["annotations"]["pagerduty.com/api-url"] = service.self_url
return component
def _validate_component(component: Dict[str, Any]) -> List[str]:
"""
Validate a transformed Component resource.
Args:
component: The Component resource to validate
Returns:
List of validation errors. Empty list means valid.
"""
errors = []
# Check required fields
required_fields = [
("apiVersion", str),
("kind", str),
("metadata", dict),
("spec", dict),
]
for field, field_type in required_fields:
if field not in component:
errors.append(f"Missing required field: {field}")
elif not isinstance(component[field], field_type):
errors.append(f"Field {field} must be of type {field_type.__name__}")
# If we're missing required fields, don't continue with deeper validation
if errors:
return errors
# Check metadata requirements
metadata = component["metadata"]
if "name" not in metadata:
errors.append("metadata.name is required")
elif not isinstance(metadata["name"], str):
errors.append("metadata.name must be a string")
# Check required annotations
if "annotations" not in metadata:
errors.append("metadata.annotations is required")
else:
annotations = metadata["annotations"]
if "pagerduty.com/service-id" not in annotations:
errors.append("Required annotation missing: pagerduty.com/service-id")
if (
component["spec"]["type"] == "service"
and "pagerduty.com/status" not in annotations
):
errors.append("Required annotation missing: pagerduty.com/status")
# Check spec requirements
spec = component["spec"]
if "type" not in spec:
errors.append("spec.type is required")
elif not isinstance(spec["type"], str):
errors.append("spec.type must be a string")
elif spec["type"] not in ["service", "business_service"]:
errors.append("spec.type must be either 'service' or 'business_service'")
return errors
def _migrate_technical_service(
client: ServiceModelClient, service: TechnicalService, dry_run: bool = False
) -> Optional[Dict[str, Any]]:
"""
Migrate a single technical service to Grafana's service model.
Args:
client: The ServiceModelClient to use
service: The technical service to migrate
dry_run: If True, only validate and log what would be done
Returns:
The created component if successful, None otherwise
"""
try:
# Transform the service
component = _transform_service(service)
# Check if component already exists
existing = client.get_component(component["metadata"]["name"])
if existing:
print(TAB + format_service(service, True) + " (preserved)")
service.preserved = True
service.migration_errors = None
return existing
# Validate the transformed component
errors = _validate_component(component)
if errors:
service.migration_errors = errors
service.preserved = False
print(TAB + format_service(service, True))
return None
if dry_run:
service.migration_errors = None
service.preserved = False
print(TAB + format_service(service, True) + " (would create)")
return component
# Create the component
created = client.create_component(component)
service.migration_errors = None
service.preserved = False
print(TAB + format_service(service, True) + " (created)")
return created
except Exception as e:
service.migration_errors = str(e)
service.preserved = False
print(TAB + format_service(service, True))
return None
def _migrate_business_service(
client: ServiceModelClient, service: BusinessService, dry_run: bool = False
) -> Optional[Dict[str, Any]]:
"""
Migrate a single business service to Grafana's service model.
Args:
client: The ServiceModelClient to use
service: The business service to migrate
dry_run: If True, only validate and log what would be done
Returns:
The created component if successful, None otherwise
"""
try:
# Transform the service
component = _transform_service(service)
# Check if component already exists
existing = client.get_component(component["metadata"]["name"])
if existing:
print(TAB + format_service(service, True) + " (preserved)")
service.preserved = True
service.migration_errors = None
return existing
# Validate the transformed component
errors = _validate_component(component)
if errors:
service.migration_errors = errors
service.preserved = False
print(TAB + format_service(service, True))
return None
if dry_run:
service.migration_errors = None
service.preserved = False
print(TAB + format_service(service, True) + " (would create)")
return component
# Create the component
created = client.create_component(component)
service.migration_errors = None
service.preserved = False
print(TAB + format_service(service, True) + " (created)")
return created
except Exception as e:
service.migration_errors = str(e)
service.preserved = False
print(TAB + format_service(service, True))
return None
def migrate_all_services(
client: ServiceModelClient,
technical_services: List[TechnicalService],
business_services: List[BusinessService],
dry_run: bool = False,
) -> None:
"""
Migrate all PagerDuty services to Grafana's service model.
Args:
client: The ServiceModelClient to use
technical_services: List of technical services to migrate
business_services: List of business services to migrate
dry_run: If True, only validate and log what would be done
Returns:
Dictionary containing migration statistics
"""
# Migrate technical services
tech_components = _migrate_service_batch(
client, technical_services, _migrate_technical_service, dry_run
)
# Migrate business services
bus_components = _migrate_service_batch(
client, business_services, _migrate_business_service, dry_run
)
# Update dependencies
created_components = {**tech_components, **bus_components}
_update_service_dependencies(
client, technical_services + business_services, created_components, dry_run
)
return