Service to service model migration (#5485)

# What this PR does

Adds Service and Business Service migration to the Pager Duty Migrator.

To test, in addition to the OnCall configs, you need to crate a Grafana
Service Account with `Admin` permission and generate a token. You will
set `GRAFANA_SERVICE_ACCOUNT_URL`, per the README, to
`https://<namespace>:<token>@<server>` The namespace is the stack id, in
the format of `stacks-<stack id>`

Service migration is configurable, filterable, and idempotent.

## Which issue(s) this PR closes

Related to [issue link here]

<!--
*Note*: If you want the issue to be auto-closed once the PR is merged,
change "Related to" to "Closes" in the line above.
If you have more than one GitHub issue that this PR closes, be sure to
preface
each issue link with a [closing
keyword](https://docs.github.com/en/get-started/writing-on-github/working-with-advanced-formatting/using-keywords-in-issues-and-pull-requests#linking-a-pull-request-to-an-issue).
This ensures that the issue(s) are auto-closed once the PR has been
merged.
-->

## Checklist

- [x] Unit, integration, and e2e (if applicable) tests updated
- [x] Documentation added (or `pr:no public docs` PR label added if not
required)
- [ ] Added the relevant release notes label (see labels prefixed w/
`release:`). These labels dictate how your PR will
    show up in the autogenerated release notes.

---------

Co-authored-by: Joey Orlando <joey.orlando@grafana.com>
Co-authored-by: GitHub Actions <actions@github.com>
Co-authored-by: grafana-irm-app[bot] <165293418+grafana-irm-app[bot]@users.noreply.github.com>
Co-authored-by: Joey Orlando <joseph.t.orlando@gmail.com>
This commit is contained in:
Bob Cotton 2025-03-15 19:07:59 -06:00 committed by GitHub
parent bd5cd31c89
commit 0e1dcd2e71
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
20 changed files with 1585 additions and 15 deletions

View file

@ -2,8 +2,8 @@ apiVersion: v2
name: oncall
description: Developer-friendly incident response with brilliant Slack integration
type: application
version: 1.14.4
appVersion: v1.14.4
version: 1.15.0
appVersion: v1.15.0
dependencies:
- name: cert-manager
version: v1.8.0

View file

@ -229,17 +229,20 @@ Configuration is done via environment variables passed to the docker container.
| `PAGERDUTY_API_TOKEN` | PagerDuty API **user token**. To create a token, refer to [PagerDuty docs](https://support.pagerduty.com/docs/api-access-keys#generate-a-user-token-rest-api-key). | String | N/A |
| `ONCALL_API_URL` | Grafana OnCall API URL. This can be found on the "Settings" page of your Grafana OnCall instance. | String | N/A |
| `ONCALL_API_TOKEN` | Grafana OnCall API Token. To create a token, navigate to the "Settings" page of your Grafana OnCall instance. | String | N/A |
| `GRAFANA_SERVICE_ACCOUNT_URL` | A URL containing your tenant name (e.g. `stacks-xxx`) and Service Account Token. The URL is of the form `https://<stackid>:<token>@<server>`. e.g. `https://stacks-12345:xxxxxx@my-company.grafana.net/` Your stack id can be found at [grafana.com](https://grafana.com) . | String | N/A |
| `MODE` | Migration mode (plan vs actual migration). | String (choices: `plan`, `migrate`) | `plan` |
| `SCHEDULE_MIGRATION_MODE` | Determines how on-call schedules are migrated. | String (choices: `ical`, `web`) | `ical` |
| `UNSUPPORTED_INTEGRATION_TO_WEBHOOKS` | When set to `true`, integrations with unsupported type will be migrated to Grafana OnCall integrations with type "webhook". When set to `false`, integrations with unsupported type won't be migrated. | Boolean | `false` |
| `EXPERIMENTAL_MIGRATE_EVENT_RULES` | Migrate global event rulesets to Grafana OnCall integrations. | Boolean | `false` |
| `EXPERIMENTAL_MIGRATE_EVENT_RULES_LONG_NAMES` | Include service & integrations names from PD in migrated integrations (only effective when `EXPERIMENTAL_MIGRATE_EVENT_RULES` is `true`). | Boolean | `false` |
| `MIGRATE_USERS` | If `false`, will allow you to important all objects, while ignoring user references in schedules and escalation policies. In addition, if `false`, will also skip importing User notification rules. This may be helpful in cases where you are unable to import your list of Grafana users, but would like to experiment with OnCall using your existing PagerDuty setup as a starting point for experimentation. | Boolean | `true` |
| `PAGERDUTY_MIGRATE_SERVICES` | If `true`, will allow you to import technical and business services. | Boolean | `false` |
| `PAGERDUTY_FILTER_TEAM` | Filter resources by team name. Only resources associated with this team will be migrated. | String | N/A |
| `PAGERDUTY_FILTER_USERS` | Filter resources by PagerDuty user IDs (comma-separated). Only resources associated with these users will be migrated. | String | N/A |
| `PAGERDUTY_FILTER_SCHEDULE_REGEX` | Filter schedules by name using a regex pattern. Only schedules whose names match this pattern will be migrated. | String | N/A |
| `PAGERDUTY_FILTER_ESCALATION_POLICY_REGEX` | Filter escalation policies by name using a regex pattern. Only policies whose names match this pattern will be migrated. | String | N/A |
| `PAGERDUTY_FILTER_INTEGRATION_REGEX` | Filter integrations by name using a regex pattern. Only integrations whose names match this pattern will be migrated. | String | N/A |
| `PAGERDUTY_FILTER_SERVICE_REGEX` | Filter services by name using a regex pattern. Only services whose names match this pattern will be migrated. This filter applies to both technical and business services being migrated to Grafana's service model. | String | N/A |
| `PRESERVE_EXISTING_USER_NOTIFICATION_RULES` | Whether to preserve existing notification rules when migrating users | Boolean | `true` |
### Resources
@ -328,6 +331,92 @@ If you want to include service & integration names in the names of migrated inte
`EXPERIMENTAL_MIGRATE_EVENT_RULES` is `true`). This can make searching for integrations easier,
but it can also make the names of integrations too long.
#### Services and Business Services
The tool is capable of migrating both technical services and business services from PagerDuty to
Grafana's service model. This feature is disabled by default and can be enabled by setting
`PAGERDUTY_MIGRATE_SERVICES` to `true`.
Set GRAFANA_SERVICE_ACCOUNT_URL to the URL format of a Grafana service account with Admin
permission of the form: `https://<namespace>:<token>@<server>`
When enabled, the tool will:
1. **Technical Services**:
- Migrate PagerDuty technical services to Grafana Components with type "service"
- Preserve service metadata and relationships
- Map escalation policies to appropriate escalation chains
- Maintain service dependencies and relationships
2. **Business Services**:
- Migrate PagerDuty business services to Grafana Components with type "business_service"
- Preserve business service hierarchy and relationships
- Map technical service dependencies to appropriate Components
- Maintain business impact relationships
The migration process ensures that:
- Service hierarchies are preserved
- Dependencies between services are maintained
- Escalation policies are properly mapped
- Service metadata and annotations are preserved
- Business impact relationships are maintained
Example:
```bash
docker run --rm \
-e MIGRATING_FROM="pagerduty" \
-e MODE="migrate" \
-e GRAFANA_SERVICE_ACCOUNT_URL="<GRAFANA_SERVICE_ACCOUNT_URL>" \
-e ONCALL_API_URL="<ONCALL_API_URL>" \
-e ONCALL_API_TOKEN="<ONCALL_API_TOKEN>" \
-e PAGERDUTY_API_TOKEN="<PAGERDUTY_API_TOKEN>" \
-e PAGERDUTY_MIGRATE_SERVICES="true" \
oncall-migrator
```
#### Service Filtering
The tool provides several ways to filter which services are migrated:
1. **Team-based filtering** (`PAGERDUTY_FILTER_TEAM`):
- Only services associated with the specified team will be migrated
- Applies to both technical and business services
2. **User-based filtering** (`PAGERDUTY_FILTER_USERS`):
- For technical services: only services with the specified users in their escalation policies will be migrated
- Business services are not affected by user filters
- Multiple user IDs can be specified as a comma-separated list
3. **Name-based filtering** (`PAGERDUTY_FILTER_SERVICE_REGEX`):
- Only services whose names match the specified regex pattern will be migrated
- Applies to both technical and business services
These filters can be used individually or combined. When multiple filters are applied, a service must match all
active filters to be included in the migration.
Example:
```bash
docker run --rm \
-e MIGRATING_FROM="pagerduty" \
-e MODE="migrate" \
-e ONCALL_API_URL="<ONCALL_API_URL>" \
-e ONCALL_API_TOKEN="<ONCALL_API_TOKEN>" \
-e PAGERDUTY_API_TOKEN="<PAGERDUTY_API_TOKEN>" \
-e PAGERDUTY_FILTER_TEAM="Platform Team" \
-e PAGERDUTY_FILTER_USERS="U123,U456" \
-e PAGERDUTY_FILTER_SERVICE_REGEX="Prod.*" \
oncall-migrator
```
This example will only migrate services that:
- Belong to the "Platform Team"
- Have either user U123 or U456 in their escalation policy (for technical services)
- Have a name starting with "Prod"
### After migration
- Connect integrations (press the "How to connect" button on the integration page)

View file

@ -23,3 +23,7 @@ SCHEDULE_MIGRATION_MODE_WEB = "web"
SCHEDULE_MIGRATION_MODE = os.getenv(
"SCHEDULE_MIGRATION_MODE", SCHEDULE_MIGRATION_MODE_ICAL
)
# GRAFANA_SERVICE_ACCOUNT_URL is the URL format of a service account with
# Admin permission of the form: https://<namespace>:<token>@<server>
GRAFANA_SERVICE_ACCOUNT_URL = os.getenv("GRAFANA_SERVICE_ACCOUNT_URL", "")

View file

@ -0,0 +1,75 @@
"""
Common service filtering functionality.
"""
import re
from typing import Any, Dict, List
from lib.pagerduty.config import (
PAGERDUTY_FILTER_SERVICE_REGEX,
PAGERDUTY_FILTER_TEAM,
PAGERDUTY_FILTER_USERS,
)
def filter_services(
services: List[Dict[str, Any]], tab: str = ""
) -> List[Dict[str, Any]]:
"""
Filter services based on configured filters.
Args:
services: List of service dictionaries to filter
tab: Optional indentation prefix for logging
Returns:
List of filtered services
"""
filtered_services = []
filtered_out = 0
for service in services:
should_include = True
reason = None
# Filter by team
if PAGERDUTY_FILTER_TEAM:
teams = service.get("teams", [])
if not any(team["summary"] == PAGERDUTY_FILTER_TEAM for team in teams):
should_include = False
reason = f"No teams found for team filter: {PAGERDUTY_FILTER_TEAM}"
# Filter by users (for technical services)
if (
should_include
and PAGERDUTY_FILTER_USERS
and service.get("type") != "business_service"
):
service_users = set()
# Get users from escalation policy if present
if service.get("escalation_policy"):
for rule in service["escalation_policy"].get("escalation_rules", []):
for target in rule.get("targets", []):
if target["type"] == "user":
service_users.add(target["id"])
if not any(user_id in service_users for user_id in PAGERDUTY_FILTER_USERS):
should_include = False
reason = f"No users found for user filter: {','.join(PAGERDUTY_FILTER_USERS)}"
# Filter by name regex
if should_include and PAGERDUTY_FILTER_SERVICE_REGEX:
if not re.match(PAGERDUTY_FILTER_SERVICE_REGEX, service["name"]):
should_include = False
reason = f"Service name does not match regex: {PAGERDUTY_FILTER_SERVICE_REGEX}"
if should_include:
filtered_services.append(service)
else:
filtered_out += 1
print(f"{tab}Service {service['id']}: {reason}")
if filtered_out > 0:
print(f"Filtered out {filtered_out} services")
return filtered_services

View file

@ -0,0 +1,242 @@
"""
Migration logic for converting PagerDuty services to Grafana's service model.
This module provides functions to migrate PagerDuty services to Grafana's service model,
including creating the required 'pagerduty' Group and handling both individual and batch migrations.
"""
import json
import logging
from typing import Any, Dict, List, Optional
from lib.common.report import TAB
from lib.grafana.service_model_client import ServiceModelClient
from lib.grafana.transform import transform_service, validate_component
from lib.pagerduty.report import format_service
from lib.pagerduty.resources.business_service import BusinessService
from lib.pagerduty.resources.services import TechnicalService
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
def migrate_technical_service(
client: ServiceModelClient, service: TechnicalService, dry_run: bool = False
) -> Optional[Dict[str, Any]]:
"""
Migrate a single technical service to Grafana's service model.
Args:
client: The ServiceModelClient to use
service: The technical service to migrate
dry_run: If True, only validate and log what would be done
Returns:
The created component if successful, None otherwise
"""
try:
# Transform the service
component = transform_service(service)
# Check if component already exists
existing = client.get_component(component["metadata"]["name"])
if existing:
print(TAB + format_service(service, True) + " (preserved)")
service.preserved = True
service.migration_errors = None
return existing
# Validate the transformed component
errors = validate_component(component)
if errors:
service.migration_errors = errors
service.preserved = False
print(TAB + format_service(service, False))
return None
if dry_run:
service.migration_errors = None
service.preserved = False
print(TAB + format_service(service, True) + " (would create)")
return component
# Create the component
created = client.create_component(component)
service.migration_errors = None
service.preserved = False
print(TAB + format_service(service, True) + " (created)")
return created
except Exception as e:
service.migration_errors = str(e)
service.preserved = False
print(TAB + format_service(service, False))
return None
def migrate_business_service(
client: ServiceModelClient, service: BusinessService, dry_run: bool = False
) -> Optional[Dict[str, Any]]:
"""
Migrate a single business service to Grafana's service model.
Args:
client: The ServiceModelClient to use
service: The business service to migrate
dry_run: If True, only validate and log what would be done
Returns:
The created component if successful, None otherwise
"""
try:
# Transform the service
component = transform_service(service)
# Check if component already exists
existing = client.get_component(component["metadata"]["name"])
if existing:
print(TAB + format_service(service, True) + " (preserved)")
service.preserved = True
service.migration_errors = None
return existing
# Validate the transformed component
errors = validate_component(component)
if errors:
service.migration_errors = errors
service.preserved = False
print(TAB + format_service(service, False))
return None
if dry_run:
service.migration_errors = None
service.preserved = False
print(TAB + format_service(service, True) + " (would create)")
return component
# Create the component
created = client.create_component(component)
service.migration_errors = None
service.preserved = False
print(TAB + format_service(service, True) + " (created)")
return created
except Exception as e:
service.migration_errors = str(e)
service.preserved = False
print(TAB + format_service(service, False))
return None
def _migrate_service_batch(
client: ServiceModelClient,
services: List[Any],
migrate_func: callable,
dry_run: bool = False,
) -> Dict[str, Any]:
"""
Migrate a batch of services using the provided migration function.
Args:
client: The ServiceModelClient to use
services: List of services to migrate
migrate_func: Function to use for migrating each service
dry_run: If True, only validate and log what would be done
Returns:
Dictionary containing migration statistics and created components
"""
created_components = {}
for service in services:
component = migrate_func(client, service, dry_run)
if component:
created_components[service.id] = component
return created_components
def _update_service_dependencies(
client: ServiceModelClient,
services: List[Any],
created_components: Dict[str, Any],
dry_run: bool = False,
) -> None:
"""
Update dependencies for all services with proper refs.
Args:
client: The ServiceModelClient to use
services: List of services to update
created_components: Dictionary of created components by service ID
dry_run: If True, only validate and log what would be done
"""
for service in services:
if service.id in created_components and service.dependencies:
component_name = created_components[service.id]["metadata"]["name"]
depends_on_refs = [
{
"apiVersion": "servicemodel.ext.grafana.com/v1alpha1",
"kind": "Component",
"name": created_components[dep.id]["metadata"]["name"],
}
for dep in service.dependencies
if dep.id in created_components
]
if depends_on_refs:
# Create patch payload with only the dependsOnRefs field
patch_payload = {"spec": {"dependsOnRefs": depends_on_refs}}
if not dry_run:
try:
client.patch_component(component_name, patch_payload)
print(f"Updated dependencies for service: {service.name}")
except Exception as e:
print(
f"Failed to update dependencies for service {service.name}: {e}"
)
# Log the full error details for debugging
print(f"Patch payload: {json.dumps(patch_payload, indent=2)}")
def migrate_all_services(
client: ServiceModelClient,
technical_services: List[TechnicalService],
business_services: List[BusinessService],
dry_run: bool = False,
) -> None:
"""
Migrate all PagerDuty services to Grafana's service model.
Args:
client: The ServiceModelClient to use
technical_services: List of technical services to migrate
business_services: List of business services to migrate
dry_run: If True, only validate and log what would be done
Returns:
Dictionary containing migration statistics
"""
# Migrate technical services
tech_components = _migrate_service_batch(
client, technical_services, migrate_technical_service, dry_run
)
# Migrate business services
bus_components = _migrate_service_batch(
client, business_services, migrate_business_service, dry_run
)
# Update dependencies
created_components = {**tech_components, **bus_components}
_update_service_dependencies(
client, technical_services + business_services, created_components, dry_run
)
return

View file

@ -0,0 +1,194 @@
from urllib.parse import urlparse
import kubernetes
from kubernetes import client
from lib.base_config import GRAFANA_SERVICE_ACCOUNT_URL
SERVICE_MODEL_API_GROUP = "servicemodel.ext.grafana.com"
SERVICE_MODEL_API_VERSION = "v1alpha1"
class ServiceModelClient:
"""
Client for interacting with Grafana's Service Model API using the Kubernetes client.
This uses the k8s API to interact with the service model which is implemented
as a Kubernetes ApiServer embedded within Grafana.
"""
@staticmethod
def parse_k8s_url(url: str) -> tuple:
"""
Parse a kubernetes URL of the format https://<namespace>:<token>@<server>
Returns tuple of (server_url, namespace, token)
"""
parsed = urlparse(url)
if not all([parsed.scheme, parsed.netloc]):
raise ValueError(
"Invalid URL format. Expected: https://<namespace>:<token>@<server>"
)
# Split username (namespace) and password (token)
if "@" not in parsed.netloc:
raise ValueError(
"URL must contain credentials in the format namespace:token@server"
)
auth, server = parsed.netloc.rsplit("@", 1)
if ":" not in auth:
raise ValueError("Credentials must be in the format namespace:token")
namespace, token = auth.split(":", 1)
# Reconstruct server URL with scheme
server_url = f"{parsed.scheme}://{server}{parsed.path}"
return server_url, namespace, token
def __init__(self):
"""
Initialize the ServiceModelClient.
Configures the client using a URL-based format or falls back to legacy configuration.
"""
if GRAFANA_SERVICE_ACCOUNT_URL:
try:
server_url, namespace, token = self.parse_k8s_url(
GRAFANA_SERVICE_ACCOUNT_URL
)
# Configure client using parsed parameters
configuration = client.Configuration()
configuration.host = server_url
configuration.api_key = {"authorization": f"Bearer {token}"}
# configuration.verify_ssl = False # Note: In production, you should handle SSL verification properly
# Set the default namespace
self.default_namespace = namespace
# Create API client with custom configuration
client.Configuration.set_default(configuration)
self.api_client = client.ApiClient(configuration)
except ValueError as e:
raise ValueError(
f"Failed to parse GRAFANA_SERVICE_ACCOUNT_URL: {str(e)}"
)
else:
raise ValueError(
"Unable to configure Kubernetes client. Please set: "
"GRAFANA_SERVICE_ACCOUNT_URL (format: https://<namespace>:<token>@<server>) "
)
# Base API group and version for service model resources
self.api_group = SERVICE_MODEL_API_GROUP
self.api_version = SERVICE_MODEL_API_VERSION
# Initialize the CustomObjectsApi for interacting with custom resources
self.custom_api = client.CustomObjectsApi(self.api_client)
def get_components(self, namespace=None):
"""
Get all Component resources from the service model.
Args:
namespace: The namespace to list components from. Defaults to the namespace from the URL.
Returns:
List of Component resources.
"""
namespace = namespace or self.default_namespace
return self.custom_api.list_namespaced_custom_object(
group=self.api_group,
version=self.api_version,
namespace=namespace,
plural="components",
)
def get_component(self, name, namespace=None):
"""
Get a specific Component resource by name.
Args:
name: The name of the component.
namespace: The namespace of the component.
Returns:
The Component resource if found, None otherwise.
"""
namespace = namespace or self.default_namespace
try:
return self.custom_api.get_namespaced_custom_object(
group=self.api_group,
version=self.api_version,
namespace=namespace,
plural="components",
name=name,
)
except kubernetes.client.rest.ApiException as e:
if e.status == 404:
return None
raise
def create_component(self, component_data, namespace=None):
"""
Create a new Component resource.
Args:
component_data: The Component resource data.
namespace: The namespace to create the component in.
Returns:
The created Component resource.
"""
namespace = namespace or self.default_namespace
return self.custom_api.create_namespaced_custom_object(
group=self.api_group,
version=self.api_version,
namespace=namespace,
plural="components",
body=component_data,
)
def update_component(self, name, component_data, namespace=None):
"""
Update an existing Component resource.
Args:
name: The name of the component to update.
component_data: The updated Component resource data.
namespace: The namespace of the component.
Returns:
The updated Component resource.
"""
namespace = namespace or self.default_namespace
return self.custom_api.replace_namespaced_custom_object(
group=self.api_group,
version=self.api_version,
namespace=namespace,
plural="components",
name=name,
body=component_data,
)
def patch_component(self, name, patch_data, namespace=None):
"""
Patch an existing Component resource.
Args:
name: The name of the component to patch.
patch_data: The patch data to apply.
namespace: The namespace of the component.
Returns:
The patched Component resource.
"""
namespace = namespace or self.default_namespace
return self.custom_api.patch_namespaced_custom_object(
group=self.api_group,
version=self.api_version,
namespace=namespace,
plural="components",
name=name,
body=patch_data,
)

View file

@ -0,0 +1,117 @@
"""
Transformation logic for converting PagerDuty services to Grafana Service Model format.
This module provides functions to transform PagerDuty technical and business services
into the Backstage Catalog format used by Grafana's Service Model.
"""
from typing import Any, Dict, List, Union
from lib.pagerduty.resources.business_service import BusinessService
from lib.pagerduty.resources.services import TechnicalService
def transform_service(
service: Union[TechnicalService, BusinessService]
) -> Dict[str, Any]:
"""
Transform a PagerDuty service (technical or business) into a Backstage Component.
Args:
service: The PagerDuty service to transform (either TechnicalService or BusinessService)
Returns:
A dictionary containing the transformed service in Backstage Component format
"""
# Determine service type and required fields
is_technical = isinstance(service, TechnicalService)
service_type = "service" if is_technical else "business_service"
# Create the base component structure
component = {
"apiVersion": "servicemodel.ext.grafana.com/v1alpha1",
"kind": "Component",
"metadata": {
"name": service.name.lower().replace(
" ", "-"
), # Convert to k8s-friendly name
"annotations": {"pagerduty.com/service-id": service.id},
},
"spec": {"type": service_type, "description": service.description},
}
# Add status annotation for technical services
if is_technical and hasattr(service, "status"):
component["metadata"]["annotations"]["pagerduty.com/status"] = service.status
# Add PagerDuty URLs to annotations
if service.html_url:
component["metadata"]["annotations"][
"pagerduty.com/html-url"
] = service.html_url
if service.self_url:
component["metadata"]["annotations"]["pagerduty.com/api-url"] = service.self_url
return component
def validate_component(component: Dict[str, Any]) -> List[str]:
"""
Validate a transformed Component resource.
Args:
component: The Component resource to validate
Returns:
List of validation errors. Empty list means valid.
"""
errors = []
# Check required fields
required_fields = [
("apiVersion", str),
("kind", str),
("metadata", dict),
("spec", dict),
]
for field, field_type in required_fields:
if field not in component:
errors.append(f"Missing required field: {field}")
elif not isinstance(component[field], field_type):
errors.append(f"Field {field} must be of type {field_type.__name__}")
# If we're missing required fields, don't continue with deeper validation
if errors:
return errors
# Check metadata requirements
metadata = component["metadata"]
if "name" not in metadata:
errors.append("metadata.name is required")
elif not isinstance(metadata["name"], str):
errors.append("metadata.name must be a string")
# Check required annotations
if "annotations" not in metadata:
errors.append("metadata.annotations is required")
else:
annotations = metadata["annotations"]
if "pagerduty.com/service-id" not in annotations:
errors.append("Required annotation missing: pagerduty.com/service-id")
if (
component["spec"]["type"] == "service"
and "pagerduty.com/status" not in annotations
):
errors.append("Required annotation missing: pagerduty.com/status")
# Check spec requirements
spec = component["spec"]
if "type" not in spec:
errors.append("spec.type is required")
elif not isinstance(spec["type"], str):
errors.append("spec.type must be a string")
elif spec["type"] not in ["service", "business_service"]:
errors.append("spec.type must be either 'service' or 'business_service'")
return errors

View file

@ -12,7 +12,7 @@ class OnCallAPIClient:
ONCALL_API_URL,
path,
headers={"Authorization": ONCALL_API_TOKEN},
**kwargs
**kwargs,
)
@classmethod

View file

@ -40,22 +40,31 @@ UNSUPPORTED_INTEGRATION_TO_WEBHOOKS = (
MIGRATE_USERS = os.getenv("MIGRATE_USERS", "true").lower() == "true"
# Whether to migrate PagerDuty services to Grafana's service model
PAGERDUTY_MIGRATE_SERVICES = (
os.getenv("PAGERDUTY_MIGRATE_SERVICES", "false").lower() == "true"
)
# Filter resources by team
PAGERDUTY_FILTER_TEAM = os.getenv("PAGERDUTY_FILTER_TEAM")
PAGERDUTY_FILTER_TEAM = os.getenv("PAGERDUTY_FILTER_TEAM", "")
# Filter resources by users (comma-separated list of PagerDuty user IDs)
PAGERDUTY_FILTER_USERS = [
user_id.strip()
for user_id in os.getenv("PAGERDUTY_FILTER_USERS", "").split(",")
if user_id.strip()
]
PAGERDUTY_FILTER_USERS = (
os.getenv("PAGERDUTY_FILTER_USERS", "").split(",")
if os.getenv("PAGERDUTY_FILTER_USERS")
else []
)
# Filter resources by name regex patterns
PAGERDUTY_FILTER_SCHEDULE_REGEX = os.getenv("PAGERDUTY_FILTER_SCHEDULE_REGEX")
PAGERDUTY_FILTER_SCHEDULE_REGEX = os.getenv("PAGERDUTY_FILTER_SCHEDULE_REGEX", "")
PAGERDUTY_FILTER_ESCALATION_POLICY_REGEX = os.getenv(
"PAGERDUTY_FILTER_ESCALATION_POLICY_REGEX"
"PAGERDUTY_FILTER_ESCALATION_POLICY_REGEX", ""
)
PAGERDUTY_FILTER_INTEGRATION_REGEX = os.getenv("PAGERDUTY_FILTER_INTEGRATION_REGEX")
PAGERDUTY_FILTER_INTEGRATION_REGEX = os.getenv("PAGERDUTY_FILTER_INTEGRATION_REGEX", "")
# Filter services by name regex pattern. Only applies to services being migrated to Grafana's service model.
# This filter can be used to selectively migrate specific services based on their names.
PAGERDUTY_FILTER_SERVICE_REGEX = os.getenv("PAGERDUTY_FILTER_SERVICE_REGEX", "")
# Whether to preserve existing notification rules when migrating users
PRESERVE_EXISTING_USER_NOTIFICATION_RULES = (

View file

@ -4,7 +4,10 @@ import re
from pdpyras import APISession
from lib.common.report import TAB
from lib.common.resources.services import filter_services
from lib.common.resources.users import match_user
from lib.grafana.service_migrate import migrate_all_services
from lib.grafana.service_model_client import ServiceModelClient
from lib.oncall.api_client import OnCallAPIClient
from lib.pagerduty.config import (
EXPERIMENTAL_MIGRATE_EVENT_RULES,
@ -17,6 +20,7 @@ from lib.pagerduty.config import (
PAGERDUTY_FILTER_SCHEDULE_REGEX,
PAGERDUTY_FILTER_TEAM,
PAGERDUTY_FILTER_USERS,
PAGERDUTY_MIGRATE_SERVICES,
)
from lib.pagerduty.report import (
escalation_policy_report,
@ -28,8 +32,13 @@ from lib.pagerduty.report import (
integration_report,
ruleset_report,
schedule_report,
services_report,
user_report,
)
from lib.pagerduty.resources.business_service import (
BusinessService,
get_all_business_services_with_metadata,
)
from lib.pagerduty.resources.escalation_policies import (
match_escalation_policy,
match_escalation_policy_for_integration,
@ -43,6 +52,10 @@ from lib.pagerduty.resources.integrations import (
from lib.pagerduty.resources.notification_rules import migrate_notification_rules
from lib.pagerduty.resources.rulesets import match_ruleset, migrate_ruleset
from lib.pagerduty.resources.schedules import match_schedule, migrate_schedule
from lib.pagerduty.resources.services import (
TechnicalService,
get_all_technical_services_with_metadata,
)
from lib.pagerduty.resources.users import (
match_users_and_schedules_for_escalation_policy,
match_users_for_schedule,
@ -283,6 +296,32 @@ def migrate() -> None:
services,
integrations,
)
if PAGERDUTY_MIGRATE_SERVICES:
client = ServiceModelClient()
# Get all services
all_technical_services = get_all_technical_services_with_metadata(session)
technical_service_map = {
service.id: service for service in all_technical_services
}
all_business_services = get_all_business_services_with_metadata(
session, technical_service_map
)
# Apply filters to services
filtered_technical_data = filter_services(
[service.raw_data for service in all_technical_services], TAB
)
filtered_business_data = filter_services(
[service.raw_data for service in all_business_services], TAB
)
# Convert filtered data back to service objects
technical_services = [
TechnicalService(service) for service in filtered_technical_data
]
business_services = [
BusinessService(service) for service in filtered_business_data
]
if MODE == MODE_PLAN:
print(user_report(users), end="\n\n")
@ -293,6 +332,19 @@ def migrate() -> None:
if rulesets is not None:
print(ruleset_report(rulesets), end="\n\n")
if PAGERDUTY_MIGRATE_SERVICES:
print(
services_report(
all_technical_services,
all_business_services,
technical_services,
business_services,
),
end="\n\n",
)
return
return
if MIGRATE_USERS:
@ -333,3 +385,11 @@ def migrate() -> None:
if not ruleset["flawed_escalation_policies"]:
migrate_ruleset(ruleset, escalation_policies, services)
print(TAB + format_ruleset(ruleset))
if PAGERDUTY_MIGRATE_SERVICES:
print("▶ Migrating services to Grafana's service model...")
migrate_all_services(
client, technical_services, business_services, dry_run=False
)
else:
print("▶ Skipping service migration as PAGERDUTY_MIGRATE_SERVICES is false...")

View file

@ -1,3 +1,5 @@
from typing import Any, List
from lib.common.report import ERROR_SIGN, SUCCESS_SIGN, TAB, WARNING_SIGN
from lib.pagerduty.config import PRESERVE_EXISTING_USER_NOTIFICATION_RULES
@ -213,3 +215,52 @@ def ruleset_report(rulesets: list[dict]) -> str:
)
return result
def format_service(service: Any, will_be_migrated: bool = True) -> str:
"""Format a service for reporting."""
service_type = (
"Business Service"
if hasattr(service, "business_service")
else "Technical Service"
)
result = f"{service.name} ({service_type})"
if not will_be_migrated:
result = f"{ERROR_SIGN} {result} — Service was filtered out"
elif hasattr(service, "migration_errors") and service.migration_errors:
result = f"{ERROR_SIGN} {result}{service.migration_errors}"
else:
result = f"{SUCCESS_SIGN} {result}"
return result
def services_report(
all_technical_services: List[Any],
all_business_services: List[Any],
filtered_technical_services: List[Any],
filtered_business_services: List[Any],
) -> str:
"""Generate a report of services to be migrated."""
result = "Services migration report:"
# Create sets of service IDs that will be migrated
technical_ids = {s.id for s in filtered_technical_services}
business_ids = {s.id for s in filtered_business_services}
# Report technical services
result += "\n" + TAB + "Technical Services:"
for service in sorted(
all_technical_services, key=lambda service: service.id not in technical_ids
):
result += "\n" + TAB * 2 + format_service(service, service.id in technical_ids)
# Report business services
result += "\n" + TAB + "Business Services:"
for service in sorted(
all_business_services, key=lambda service: service.id not in business_ids
):
result += "\n" + TAB * 2 + format_service(service, service.id in business_ids)
return result

View file

@ -0,0 +1,145 @@
"""
PagerDuty business service resources.
This module provides classes and functions for interacting with PagerDuty business services.
"""
from typing import Any, Dict, List
from pdpyras import APISession
class BusinessService:
"""Class representing a PagerDuty business service with all necessary metadata."""
def __init__(self, service_data: Dict[str, Any]):
"""
Initialize a PagerDuty business service from API data.
Args:
service_data: Raw business service data from the PagerDuty API
"""
self.id = service_data.get("id")
self.name = service_data.get("name", "")
self.description = service_data.get("description", "")
self.point_of_contact = service_data.get("point_of_contact", "")
self.created_at = service_data.get("created_at")
self.updated_at = service_data.get("updated_at")
# URLs and permalinks
self.html_url = service_data.get("html_url")
self.self_url = service_data.get("self")
# Related entities
self.teams = service_data.get("teams", [])
# Dependencies - will be populated separately
self.dependencies = []
# Store raw data for access to any fields we might need later
self.raw_data = service_data
def __str__(self) -> str:
return f"BusinessService(id={self.id}, name={self.name})"
def fetch_business_services(session: APISession) -> List[BusinessService]:
"""
Fetch all PagerDuty business services with their metadata.
Args:
session: Authenticated PagerDuty API session
Returns:
List of BusinessService objects
"""
# Fetch all business services
services_data = session.list_all("business_services")
# Convert to BusinessService objects
services = [BusinessService(service) for service in services_data]
return services
def fetch_business_service_dependencies(
session: APISession,
business_services: List[BusinessService],
technical_services: Dict[str, Any],
) -> None:
"""
Fetch and populate business service dependencies on technical services.
This function modifies the provided business services list in-place by populating
the dependencies field for each service.
Args:
session: Authenticated PagerDuty API session
business_services: List of BusinessService objects to update with dependencies
technical_services: Dictionary mapping service IDs to technical service objects
"""
print("Fetching business service dependencies...")
# Process each business service to find its dependencies
for service in business_services:
try:
# Use the business service dependencies endpoint
response = session.get(
f"service_dependencies/business_services/{service.id}"
)
# Parse the response
dependencies_data = response
if hasattr(response, "json"):
dependencies_data = response.json()
# Extract relationships from the response
if (
dependencies_data
and isinstance(dependencies_data, dict)
and "relationships" in dependencies_data
):
for relationship in dependencies_data["relationships"]:
# A dependency relationship has a supporting_service that the business service depends on
if "supporting_service" in relationship:
dep_id = relationship["supporting_service"]["id"]
if (
dep_id in technical_services
): # Only add if it's a technical service
service.dependencies.append(technical_services[dep_id])
else:
print(
f"No valid relationship data found for business service {service.name} (ID: {service.id})"
)
except Exception as e:
# Log but continue if we can't fetch dependencies for a service
print(
f"Error fetching dependencies for business service {service.name}: {e}"
)
print(
f"Completed fetching dependencies for {len(business_services)} business services."
)
def get_all_business_services_with_metadata(
session: APISession, technical_services: Dict[str, Any]
) -> List[BusinessService]:
"""
Fetch all PagerDuty business services with complete metadata including dependencies.
Args:
session: Authenticated PagerDuty API session
technical_services: Dictionary mapping service IDs to technical service objects
Returns:
List of BusinessService objects with all required metadata
"""
# Fetch business services with their basic metadata
business_services = fetch_business_services(session)
# Fetch and populate dependencies
fetch_business_service_dependencies(session, business_services, technical_services)
return business_services

View file

@ -0,0 +1,160 @@
"""
PagerDuty services resource module.
This module provides functions for fetching PagerDuty services and extracting
relevant metadata for migration to Grafana's service model.
"""
from typing import Any, Dict, List
from pdpyras import APISession
class TechnicalService:
"""Class representing a PagerDuty technical service with all necessary metadata for migration."""
def __init__(self, service_data: Dict[str, Any]):
"""
Initialize a PagerDuty technical service from API data.
Args:
service_data: Raw service data from the PagerDuty API
"""
self.id = service_data.get("id")
self.name = service_data.get("name", "")
self.description = service_data.get("description", "")
self.status = service_data.get("status", "")
self.created_at = service_data.get("created_at")
self.updated_at = service_data.get("updated_at")
# URLs and permalinks
self.html_url = service_data.get("html_url")
self.self_url = service_data.get("self")
# Related entities
self.escalation_policy = service_data.get("escalation_policy", {})
self.teams = service_data.get("teams", [])
# Dependencies - will be populated separately
self.dependencies = []
# Store raw data for access to any fields we might need later
self.raw_data = service_data
def __str__(self) -> str:
return f"TechnicalService(id={self.id}, name={self.name})"
def fetch_services(
session: APISession, include_integrations: bool = True, include_teams: bool = True
) -> List[TechnicalService]:
"""
Fetch all PagerDuty technical services with their metadata.
Args:
session: Authenticated PagerDuty API session
include_integrations: Whether to include integrations data
include_teams: Whether to include teams data
Returns:
List of TechnicalService objects
"""
include_params = []
if include_integrations:
include_params.append("integrations")
if include_teams:
include_params.append("teams")
params = {}
if include_params:
params["include[]"] = include_params
# Fetch all services with the specified includes
services_data = session.list_all("services", params=params)
# Convert to TechnicalService objects
services = [TechnicalService(service) for service in services_data]
return services
def fetch_service_dependencies(
session: APISession, services: List[TechnicalService]
) -> None:
"""
Fetch and populate service dependencies using PagerDuty's service dependencies API.
This function modifies the provided services list in-place by populating
the dependencies field for each service.
Args:
session: Authenticated PagerDuty API session
services: List of TechnicalService objects to update with dependencies
"""
# Create a mapping of service_id to service for efficient lookup
service_map = {service.id: service for service in services}
print("Fetching service dependencies...")
# Process each service to find its dependencies
for service in services:
try:
# Use the service dependencies endpoint for technical services
# Format: https://api.pagerduty.com/service_dependencies/technical_services/{id}
response = session.get(
f"service_dependencies/technical_services/{service.id}"
)
# Parse the response - depending on how pdpyras works, this might already be parsed
# If it's already a dict, this will just use it as is
dependencies_data = response
if hasattr(response, "json"):
dependencies_data = response.json()
# Extract relationships from the response
if (
dependencies_data
and isinstance(dependencies_data, dict)
and "relationships" in dependencies_data
):
for relationship in dependencies_data["relationships"]:
# A dependency relationship has a supporting_service that the current service depends on
if "supporting_service" in relationship:
dep_id = relationship["supporting_service"]["id"]
if (
dep_id in service_map and dep_id != service.id
): # Avoid self-references
service.dependencies.append(service_map[dep_id])
else:
print(
f"No valid relationship data found for service {service.name} (ID: {service.id})"
)
except Exception as e:
# Log but continue if we can't fetch dependencies for a service
print(f"Error fetching dependencies for service {service.name}: {e}")
print(f"Completed fetching dependencies for {len(services)} services.")
def get_all_technical_services_with_metadata(
session: APISession,
) -> List[TechnicalService]:
"""
Fetch all PagerDuty technical services with complete metadata including dependencies.
This is the main function that should be used by the migration process.
Args:
session: Authenticated PagerDuty API session
Returns:
List of TechnicalService objects with all required metadata
"""
# Fetch services with their basic metadata
services = fetch_services(session)
# Fetch and populate dependencies
fetch_service_dependencies(session, services)
return services

View file

@ -0,0 +1,110 @@
"""
Unit tests for the Grafana Service Model transformation logic.
"""
from unittest.mock import Mock
import pytest
from lib.grafana.transform import transform_service, validate_component
from lib.pagerduty.resources.business_service import BusinessService
from lib.pagerduty.resources.services import TechnicalService
@pytest.fixture
def technical_service():
"""Create a mock technical service for testing."""
service = Mock(spec=TechnicalService)
service.name = "Test Service"
service.description = "A test service"
service.id = "P123456"
service.status = "active"
service.html_url = "https://pagerduty.com/services/P123456"
service.self_url = "https://api.pagerduty.com/services/P123456"
return service
@pytest.fixture
def business_service():
"""Create a mock business service for testing."""
service = Mock(spec=BusinessService)
service.name = "Test Business Service"
service.description = "A test business service"
service.id = "P789012"
service.html_url = "https://pagerduty.com/services/P789012"
service.self_url = "https://api.pagerduty.com/services/P789012"
return service
def test_transform_technical_service(technical_service):
"""Test transforming a technical service."""
component = transform_service(technical_service)
# Verify the component structure
assert component["apiVersion"] == "servicemodel.ext.grafana.com/v1alpha1"
assert component["kind"] == "Component"
assert component["metadata"]["name"] == "test-service"
assert component["spec"]["type"] == "service"
assert component["spec"]["description"] == "A test service"
# Verify annotations
annotations = component["metadata"]["annotations"]
assert annotations["pagerduty.com/service-id"] == "P123456"
assert annotations["pagerduty.com/status"] == "active"
assert (
annotations["pagerduty.com/html-url"]
== "https://pagerduty.com/services/P123456"
)
assert (
annotations["pagerduty.com/api-url"]
== "https://api.pagerduty.com/services/P123456"
)
def test_transform_business_service(business_service):
"""Test transforming a business service."""
component = transform_service(business_service)
# Verify the component structure
assert component["apiVersion"] == "servicemodel.ext.grafana.com/v1alpha1"
assert component["kind"] == "Component"
assert component["metadata"]["name"] == "test-business-service"
assert component["spec"]["type"] == "business_service"
assert component["spec"]["description"] == "A test business service"
# Verify annotations
annotations = component["metadata"]["annotations"]
assert annotations["pagerduty.com/service-id"] == "P789012"
assert (
annotations["pagerduty.com/html-url"]
== "https://pagerduty.com/services/P789012"
)
assert (
annotations["pagerduty.com/api-url"]
== "https://api.pagerduty.com/services/P789012"
)
def test_validate_component():
"""Test component validation."""
# Test valid component
valid_component = {
"apiVersion": "servicemodel.ext.grafana.com/v1alpha1",
"kind": "Component",
"metadata": {
"name": "test-service",
"annotations": {
"pagerduty.com/service-id": "P123456",
"pagerduty.com/status": "active",
},
},
"spec": {"type": "service", "description": "A test service"},
}
errors = validate_component(valid_component)
assert errors == []
# Test missing required field
invalid_component = valid_component.copy()
del invalid_component["spec"]
errors = validate_component(invalid_component)
assert "Missing required field: spec" in errors

View file

@ -29,7 +29,6 @@ def test_users_are_skipped_when_migrate_users_is_false(
call("escalation_policies", params={"include[]": "teams"}),
call("services", params={"include[]": ["integrations", "teams"]}),
call("vendors"),
# no user notification rules fetching
]
mock_oncall_client.list_users_with_notification_rules.assert_not_called()
@ -176,8 +175,10 @@ class TestPagerDutyMigrationFiltering:
@patch("lib.pagerduty.migrate.filter_integrations")
@patch("lib.pagerduty.migrate.APISession")
@patch("lib.pagerduty.migrate.OnCallAPIClient")
@patch("lib.pagerduty.migrate.ServiceModelClient")
def test_migrate_calls_filters(
self,
MockServiceModelClient,
MockOnCallAPIClient,
MockAPISession,
mock_filter_integrations,
@ -190,12 +191,16 @@ class TestPagerDutyMigrationFiltering:
[{"id": "U1", "name": "Test User", "email": "test@example.com"}], # users
[{"id": "S1"}], # schedules
[{"id": "P1"}], # policies
[{"id": "SVC1", "integrations": []}], # services with params
[{"id": "SVC1", "integrations": []}], # services
[{"id": "V1"}], # vendors
[{"id": "BS1"}], # business services
]
mock_session.jget.return_value = {"overrides": []} # Mock schedule overrides
mock_oncall_client = MockOnCallAPIClient.return_value
mock_oncall_client.list_all.return_value = []
mock_service_client = MockServiceModelClient.return_value
mock_service_client.get_components.return_value = []
# Run migration
migrate()
@ -225,10 +230,14 @@ class TestPagerDutyMigrationFiltering:
[{"id": "U1", "name": "Test User", "email": "test@example.com"}], # users
[{"id": "S1", "teams": [{"summary": "Team 1"}]}], # schedules
[{"id": "P1", "teams": [{"summary": "Team 1"}]}], # policies
[
{"id": "SVC1", "teams": [{"summary": "Team 1"}], "integrations": []}
], # services with params
[
{"id": "SVC1", "teams": [{"summary": "Team 1"}], "integrations": []}
], # services
[{"id": "V1"}], # vendors
[{"id": "BS1", "teams": [{"summary": "Team 1"}]}], # business services
]
mock_session.jget.return_value = {"overrides": []} # Mock schedule overrides
mock_oncall_client = MockOnCallAPIClient.return_value
@ -260,8 +269,10 @@ class TestPagerDutyMigrationFiltering:
@patch("lib.pagerduty.migrate.filter_integrations")
@patch("lib.pagerduty.migrate.APISession")
@patch("lib.pagerduty.migrate.OnCallAPIClient")
@patch("lib.pagerduty.migrate.ServiceModelClient")
def test_migrate_with_users_filter(
self,
MockServiceModelClient,
MockOnCallAPIClient,
MockAPISession,
mock_filter_integrations,
@ -286,12 +297,16 @@ class TestPagerDutyMigrationFiltering:
],
}
], # policies
[{"id": "SVC1", "integrations": []}], # services with params
[{"id": "SVC1", "integrations": []}], # services
[{"id": "V1"}], # vendors
[{"id": "BS1"}], # business services
]
mock_session.jget.return_value = {"overrides": []} # Mock schedule overrides
mock_oncall_client = MockOnCallAPIClient.return_value
mock_oncall_client.list_all.return_value = []
mock_service_client = MockServiceModelClient.return_value
mock_service_client.get_components.return_value = []
# Run migration
migrate()

View file

@ -0,0 +1,110 @@
"""
Tests for service filtering functionality.
"""
from unittest.mock import patch
import pytest
from lib.common.resources.services import filter_services
@pytest.fixture
def sample_services():
"""Sample service data for testing."""
return [
{
"id": "P123",
"name": "Production Service",
"type": "service",
"teams": [{"summary": "Platform Team"}],
"escalation_policy": {
"escalation_rules": [
{
"targets": [
{"type": "user", "id": "U123"},
{"type": "user", "id": "U456"},
]
}
]
},
},
{
"id": "P456",
"name": "Staging Service",
"type": "service",
"teams": [{"summary": "DevOps Team"}],
"escalation_policy": {
"escalation_rules": [{"targets": [{"type": "user", "id": "U789"}]}]
},
},
{
"id": "B123",
"name": "Business Service",
"type": "business_service",
"teams": [{"summary": "Platform Team"}],
},
]
def test_filter_services_by_team(sample_services):
"""Test filtering services by team."""
with patch("lib.common.resources.services.PAGERDUTY_FILTER_TEAM", "Platform Team"):
filtered = filter_services(sample_services)
assert len(filtered) == 2
assert all(
service["teams"][0]["summary"] == "Platform Team" for service in filtered
)
def test_filter_services_by_users(sample_services):
"""Test filtering services by users in escalation policy."""
with patch("lib.common.resources.services.PAGERDUTY_FILTER_USERS", ["U123"]):
filtered = filter_services(sample_services)
# Should include both the matching technical service and the business service
assert len(filtered) == 2
# Verify the technical service with matching user is included
assert any(service["id"] == "P123" for service in filtered)
# Verify the business service is included (not filtered by users)
assert any(service["type"] == "business_service" for service in filtered)
def test_filter_services_by_regex(sample_services):
"""Test filtering services by name regex pattern."""
with patch(
"lib.common.resources.services.PAGERDUTY_FILTER_SERVICE_REGEX", "Prod.*"
):
filtered = filter_services(sample_services)
assert len(filtered) == 1
assert filtered[0]["name"] == "Production Service"
def test_filter_services_no_filters(sample_services):
"""Test that no filters returns all services."""
with patch("lib.common.resources.services.PAGERDUTY_FILTER_TEAM", ""), patch(
"lib.common.resources.services.PAGERDUTY_FILTER_USERS", []
), patch("lib.common.resources.services.PAGERDUTY_FILTER_SERVICE_REGEX", ""):
filtered = filter_services(sample_services)
assert len(filtered) == len(sample_services)
def test_filter_services_multiple_filters(sample_services):
"""Test applying multiple filters together."""
with patch(
"lib.common.resources.services.PAGERDUTY_FILTER_TEAM", "Platform Team"
), patch("lib.common.resources.services.PAGERDUTY_FILTER_USERS", ["U123"]), patch(
"lib.common.resources.services.PAGERDUTY_FILTER_SERVICE_REGEX", "Prod.*"
):
filtered = filter_services(sample_services)
assert len(filtered) == 1
assert filtered[0]["id"] == "P123"
assert filtered[0]["teams"][0]["summary"] == "Platform Team"
assert filtered[0]["name"] == "Production Service"
def test_filter_services_business_services(sample_services):
"""Test that business services are not filtered by user assignments."""
with patch("lib.common.resources.services.PAGERDUTY_FILTER_USERS", ["U123"]):
filtered = filter_services(sample_services)
assert len(filtered) == 2
assert any(service["type"] == "business_service" for service in filtered)

View file

@ -0,0 +1,153 @@
"""
Tests for the PagerDuty services module.
"""
from unittest.mock import MagicMock, patch
import pytest
from lib.pagerduty.resources.services import (
TechnicalService,
fetch_service_dependencies,
fetch_services,
get_all_technical_services_with_metadata,
)
@pytest.fixture
def service_data():
"""Basic service data fixture."""
return {
"id": "SERVICE123",
"name": "Test Service",
"description": "A test service",
"status": "active",
"created_at": "2023-01-01T00:00:00Z",
"updated_at": "2023-01-02T00:00:00Z",
"html_url": "https://example.pagerduty.com/service/SERVICE123",
"self": "https://api.pagerduty.com/services/SERVICE123",
"escalation_policy": {"id": "EP123", "name": "Test Policy"},
"teams": [{"id": "TEAM1", "summary": "Team 1"}],
}
def test_technical_service_init(service_data):
"""Test TechnicalService initialization with basic fields."""
service = TechnicalService(service_data)
assert service.id == "SERVICE123"
assert service.name == "Test Service"
assert service.description == "A test service"
assert service.status == "active"
assert service.created_at == "2023-01-01T00:00:00Z"
assert service.updated_at == "2023-01-02T00:00:00Z"
assert service.html_url == "https://example.pagerduty.com/service/SERVICE123"
assert service.self_url == "https://api.pagerduty.com/services/SERVICE123"
assert service.escalation_policy == {"id": "EP123", "name": "Test Policy"}
assert service.teams == [{"id": "TEAM1", "summary": "Team 1"}]
assert service.dependencies == []
assert service.raw_data == service_data
def test_technical_service_str():
"""Test string representation of the service."""
service = TechnicalService({"id": "SERVICE123", "name": "Test Service"})
assert str(service) == "TechnicalService(id=SERVICE123, name=Test Service)"
@pytest.fixture
def mock_session():
"""Create a mock API session."""
return MagicMock()
def test_fetch_services(mock_session):
"""Test fetching services from PagerDuty API."""
mock_session.list_all.return_value = [
{"id": "SERVICE1", "name": "Service 1"},
{"id": "SERVICE2", "name": "Service 2"},
]
services = fetch_services(mock_session)
# Verify API call
mock_session.list_all.assert_called_once_with(
"services", params={"include[]": ["integrations", "teams"]}
)
# Verify results
assert len(services) == 2
assert isinstance(services[0], TechnicalService)
assert services[0].id == "SERVICE1"
assert services[1].id == "SERVICE2"
def test_fetch_services_without_includes(mock_session):
"""Test fetching services without including integrations or teams."""
mock_session.list_all.return_value = [{"id": "SERVICE1"}]
services = fetch_services(
mock_session, include_integrations=False, include_teams=False
)
# Verify API call with no includes
mock_session.list_all.assert_called_once_with("services", params={})
# Verify results
assert len(services) == 1
assert isinstance(services[0], TechnicalService)
@pytest.fixture
def mock_services():
"""Create mock services for dependency testing."""
service1 = TechnicalService({"id": "SERVICE1", "name": "Service 1"})
service2 = TechnicalService({"id": "SERVICE2", "name": "Service 2"})
return [service1, service2]
def test_fetch_service_dependencies(mock_session, mock_services):
"""Test fetching service dependencies."""
# Mock the dependencies API call - only mock for the first service to simplify
mock_session.get.side_effect = [
{
"relationships": [{"supporting_service": {"id": "SERVICE2"}}]
}, # First call returns SERVICE2 as a dependency
{"relationships": []}, # Second call returns no dependencies
]
fetch_service_dependencies(mock_session, mock_services)
# Verify API calls - should be called for each service
assert mock_session.get.call_count == 2
mock_session.get.assert_any_call("service_dependencies/technical_services/SERVICE1")
mock_session.get.assert_any_call("service_dependencies/technical_services/SERVICE2")
# Verify that service1 now has service2 as a dependency
assert len(mock_services[0].dependencies) == 1
assert mock_services[0].dependencies[0] == mock_services[1]
# Service2 should have no dependencies since the mock returned empty list
assert len(mock_services[1].dependencies) == 0
def test_get_all_technical_services_with_metadata():
"""Test getting all services with their metadata."""
mock_session = MagicMock()
mock_services = [MagicMock(), MagicMock()]
with patch(
"lib.pagerduty.resources.services.fetch_services"
) as mock_fetch_services:
with patch(
"lib.pagerduty.resources.services.fetch_service_dependencies"
) as mock_fetch_deps:
mock_fetch_services.return_value = mock_services
result = get_all_technical_services_with_metadata(mock_session)
# Verify calls
mock_fetch_services.assert_called_once_with(mock_session)
mock_fetch_deps.assert_called_once_with(mock_session, mock_services)
# Verify result
assert result == mock_services

View file

@ -2,3 +2,4 @@ requests==2.32.3
pdpyras==4.5.0
pytest==8.2.2
pytest-env==0.6.2
kubernetes==29.0.0

View file

@ -1,34 +1,69 @@
#
# This file is autogenerated by pip-compile with Python 3.11
# This file is autogenerated by pip-compile with Python 3.12
# by the following command:
#
# pip-compile requirements.in
#
cachetools==5.5.2
# via google-auth
certifi==2024.7.4
# via requests
# via
# kubernetes
# requests
charset-normalizer==3.3.2
# via requests
google-auth==2.38.0
# via kubernetes
idna==3.7
# via requests
iniconfig==2.0.0
# via pytest
kubernetes==29.0.0
# via -r requirements.in
oauthlib==3.2.2
# via
# kubernetes
# requests-oauthlib
packaging==23.2
# via pytest
pdpyras==4.5.0
# via -r requirements.in
pluggy==1.5.0
# via pytest
pyasn1==0.6.1
# via
# pyasn1-modules
# rsa
pyasn1-modules==0.4.1
# via google-auth
pytest==8.2.2
# via
# -r requirements.in
# pytest-env
pytest-env==0.6.2
# via -r requirements.in
python-dateutil==2.9.0.post0
# via kubernetes
pyyaml==6.0.2
# via kubernetes
requests==2.32.3
# via
# -r requirements.in
# kubernetes
# pdpyras
# requests-oauthlib
requests-oauthlib==2.0.0
# via kubernetes
rsa==4.9
# via google-auth
six==1.17.0
# via
# kubernetes
# python-dateutil
urllib3==2.2.2
# via
# kubernetes
# pdpyras
# requests
websocket-client==1.8.0
# via kubernetes