feat: enhance PagerDuty migrator filtering + and improve user migration operations (#5471)

## Summary of Changes

### Improved Filtering Logic
- Changed filtering logic to use OR operations between filter types
(team, users, regex)
- Resources matching ANY filter are now included in the migration
- Made filtering more intuitive and aligned with user expectations

### New `PAGERDUTY_FILTER_USERS` option for `add_users_to_grafana.py`
script
- This new config (environment variable) allows importing only a subset
of users from your PagerDuty instance.
- Added full test coverage for `add_users_to_grafana.py`
- Updated documentation to reflect latest changes

### Added Verbose Logging Option
- Added `PAGERDUTY_VERBOSE_LOGGING` environment variable to control
output verbosity
- When disabled, only shows summary counts without detailed per-resource
output
- Significantly reduces output for large PagerDuty instances

### Fixed User Handling
- Properly skips user fetching and processing when `MIGRATE_USERS=false`
- Marks schedules and escalation policies properly when not migrating
users
- When `MIGRATE_USERS=true` and `PAGERDUTY_FILTER_USERS` is set, only
those specific users are migrated

### Added Migration Progress Summary
- Shows counts of filtered resources and those eligible for migration
- Provides better visibility into the migration process

### Updated Tests
- Added comprehensive tests for the new OR-based filtering logic
- Added tests for user filtering
- Added tests for verbose and non-verbose logging modes

### Updated Documentation
- Clearly documented the new filtering behavior
- Explained the verbose logging option
- Updated configuration descriptions to be more accurate

These changes address issues with filtering behavior and user handling,
making the PagerDuty migrator more intuitive, efficient, and flexible.
This commit is contained in:
Joey Orlando 2025-03-17 08:04:05 -04:00 committed by GitHub
parent 0e1dcd2e71
commit 4c72781d6d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 956 additions and 111 deletions

View file

@ -189,6 +189,8 @@ oncall-migrator
# For more information on our script, see "Migrating Users" section below for some more information on
# how users are migrated.
#
# You can use PAGERDUTY_FILTER_USERS to only import specific users if you want to test with a small set.
#
# Alternatively this can be done with other Grafana IAM methods.
# See Grafana's "Plan your IAM integration strategy" docs for more information on this.
# https://grafana.com/docs/grafana/latest/setup-grafana/configure-security/planning-iam-strategy/
@ -198,6 +200,7 @@ docker run --rm \
-e GRAFANA_USERNAME="<GRAFANA_USERNAME>" \
-e GRAFANA_PASSWORD="<GRAFANA_PASSWORD>" \
-e PAGERDUTY_API_TOKEN="<PAGERDUTY_API_TOKEN>" \
# Optionally add: -e PAGERDUTY_FILTER_USERS="USER1,USER2,USER3" \
oncall-migrator python /app/add_users_to_grafana.py
# Step 4: When ready, run a plan of what will be migrated, including users this time
@ -219,6 +222,51 @@ docker run --rm \
oncall-migrator
```
### Resource Filtering
The PagerDuty migrator allows you to filter resources based on team, users, and name patterns.
You can use these filters to limit the scope of your migration.
When multiple filters are applied (e.g., both team and user filters), resources matching **ANY** of the
filters will be included. This is an OR operation between filter types. For example, if you set:
```bash
-e PAGERDUTY_FILTER_TEAM="DevOps"
-e PAGERDUTY_FILTER_USERS="USER1,USER2"
```
The migrator will include:
- Resources associated with the "DevOps" team
- Resources associated with USER1 or USER2
- Resources that match both criteria
Additionally, when `MIGRATE_USERS` is set to `true` and `PAGERDUTY_FILTER_USERS` is specified,
only the users with the specified PagerDuty IDs will be migrated. This allows for selective user
migration, which is useful when you want to test the migration with a small set of users before
migrating all users.
This allows for more flexible and intuitive filtering when migrating specific subsets of your PagerDuty setup.
### Output Verbosity
By default, the migrator provides a summary of filtered resources without detailed per-resource information.
You can enable verbose logging to see detailed information about each filtered resource:
```bash
docker run --rm \
-e MIGRATING_FROM="pagerduty" \
-e MODE="plan" \
-e ONCALL_API_URL="<ONCALL_API_URL>" \
-e ONCALL_API_TOKEN="<ONCALL_API_TOKEN>" \
-e PAGERDUTY_API_TOKEN="<PAGERDUTY_API_TOKEN>" \
-e PAGERDUTY_VERBOSE_LOGGING="true" \
oncall-migrator
```
This can be helpful for debugging, but otherwise keeping it disabled will significantly reduce output
when dealing with large PagerDuty instances.
### Configuration
Configuration is done via environment variables passed to the docker container.
@ -237,13 +285,14 @@ Configuration is done via environment variables passed to the docker container.
| `EXPERIMENTAL_MIGRATE_EVENT_RULES_LONG_NAMES` | Include service & integrations names from PD in migrated integrations (only effective when `EXPERIMENTAL_MIGRATE_EVENT_RULES` is `true`). | Boolean | `false` |
| `MIGRATE_USERS` | If `false`, will allow you to important all objects, while ignoring user references in schedules and escalation policies. In addition, if `false`, will also skip importing User notification rules. This may be helpful in cases where you are unable to import your list of Grafana users, but would like to experiment with OnCall using your existing PagerDuty setup as a starting point for experimentation. | Boolean | `true` |
| `PAGERDUTY_MIGRATE_SERVICES` | If `true`, will allow you to import technical and business services. | Boolean | `false` |
| `PAGERDUTY_FILTER_TEAM` | Filter resources by team name. Only resources associated with this team will be migrated. | String | N/A |
| `PAGERDUTY_FILTER_USERS` | Filter resources by PagerDuty user IDs (comma-separated). Only resources associated with these users will be migrated. | String | N/A |
| `PAGERDUTY_FILTER_SCHEDULE_REGEX` | Filter schedules by name using a regex pattern. Only schedules whose names match this pattern will be migrated. | String | N/A |
| `PAGERDUTY_FILTER_ESCALATION_POLICY_REGEX` | Filter escalation policies by name using a regex pattern. Only policies whose names match this pattern will be migrated. | String | N/A |
| `PAGERDUTY_FILTER_INTEGRATION_REGEX` | Filter integrations by name using a regex pattern. Only integrations whose names match this pattern will be migrated. | String | N/A |
| `PAGERDUTY_FILTER_TEAM` | Filter resources by team name. Resources associated with this team will be included in the migration. | String | N/A |
| `PAGERDUTY_FILTER_USERS` | Filter by PagerDuty user IDs (comma-separated). This serves two purposes: 1) Resources associated with any of these users will be included in the migration, and 2) When `MIGRATE_USERS` is `true`, only these specific users will be migrated (not all users). | String | N/A |
| `PAGERDUTY_FILTER_SCHEDULE_REGEX` | Filter schedules by name using a regex pattern. Schedules whose names match this pattern will be included in the migration. | String | N/A |
| `PAGERDUTY_FILTER_ESCALATION_POLICY_REGEX` | Filter escalation policies by name using a regex pattern. Policies whose names match this pattern will be included in the migration. | String | N/A |
| `PAGERDUTY_FILTER_INTEGRATION_REGEX` | Filter integrations by name using a regex pattern. Integrations whose names match this pattern will be included in the migration. | String | N/A |
| `PAGERDUTY_FILTER_SERVICE_REGEX` | Filter services by name using a regex pattern. Only services whose names match this pattern will be migrated. This filter applies to both technical and business services being migrated to Grafana's service model. | String | N/A |
| `PRESERVE_EXISTING_USER_NOTIFICATION_RULES` | Whether to preserve existing notification rules when migrating users | Boolean | `true` |
| `PAGERDUTY_VERBOSE_LOGGING` | Whether to display detailed per-resource information during filtering. When set to `false`, only summary counts will be shown for filtered resources. Use `true` to see why specific resources were filtered out. | Boolean | `false` |
### Resources
@ -530,6 +579,23 @@ docker run --rm \
oncall-migrator python /app/add_users_to_grafana.py
```
You can also filter which PagerDuty users are added to Grafana by using the `PAGERDUTY_FILTER_USERS` environment variable:
```bash
docker run --rm \
-e MIGRATING_FROM="pagerduty" \
-e GRAFANA_URL="<GRAFANA_API_URL>" \
-e GRAFANA_USERNAME="<GRAFANA_USERNAME>" \
-e GRAFANA_PASSWORD="<GRAFANA_PASSWORD>" \
-e PAGERDUTY_API_TOKEN="<PAGERDUTY_API_TOKEN>" \
-e PAGERDUTY_FILTER_USERS="PD_USER_ID_1,PD_USER_ID_2,PD_USER_ID_3" \
oncall-migrator python /app/add_users_to_grafana.py
```
This is useful when you want to selectively add users to Grafana, such as when testing the migration process
or when you only need to add specific users from a large PagerDuty organization.
The `PAGERDUTY_FILTER_USERS` variable should contain a comma-separated list of PagerDuty user IDs.
### Splunk OnCall (VictorOps)
```bash

View file

@ -18,6 +18,13 @@ GRAFANA_URL = os.environ["GRAFANA_URL"] # Example: http://localhost:3000
GRAFANA_USERNAME = os.environ["GRAFANA_USERNAME"]
GRAFANA_PASSWORD = os.environ["GRAFANA_PASSWORD"]
# Get optional filter for PagerDuty user IDs
PAGERDUTY_FILTER_USERS = os.environ.get("PAGERDUTY_FILTER_USERS", "")
if PAGERDUTY_FILTER_USERS:
PAGERDUTY_FILTER_USERS = PAGERDUTY_FILTER_USERS.split(",")
else:
PAGERDUTY_FILTER_USERS = []
SUCCESS_SIGN = ""
ERROR_SIGN = ""
@ -25,8 +32,28 @@ grafana_client = GrafanaAPIClient(GRAFANA_URL, GRAFANA_USERNAME, GRAFANA_PASSWOR
def migrate_pagerduty_users():
"""
Migrate users from PagerDuty to Grafana.
If PAGERDUTY_FILTER_USERS is set, only users with IDs in that list will be migrated.
"""
session = APISession(PAGERDUTY_API_TOKEN)
for user in session.list_all("users"):
all_users = session.list_all("users")
# Filter users if PAGERDUTY_FILTER_USERS is set
if PAGERDUTY_FILTER_USERS:
filtered_users = [
user for user in all_users if user["id"] in PAGERDUTY_FILTER_USERS
]
skipped_count = len(all_users) - len(filtered_users)
if skipped_count > 0:
print(f"Skipping {skipped_count} users not in PAGERDUTY_FILTER_USERS.")
users_to_migrate = filtered_users
else:
users_to_migrate = all_users
# Create Grafana users
print(f"Creating {len(users_to_migrate)} users in Grafana...")
for user in users_to_migrate:
create_grafana_user(user["name"], user["email"])

View file

@ -70,3 +70,6 @@ PAGERDUTY_FILTER_SERVICE_REGEX = os.getenv("PAGERDUTY_FILTER_SERVICE_REGEX", "")
PRESERVE_EXISTING_USER_NOTIFICATION_RULES = (
os.getenv("PRESERVE_EXISTING_USER_NOTIFICATION_RULES", "true").lower() == "true"
)
# Environment variable to control verbose logging
VERBOSE_LOGGING = os.getenv("PAGERDUTY_VERBOSE_LOGGING", "false").lower() == "true"

View file

@ -1,5 +1,6 @@
import datetime
import re
from typing import Any, Dict, List
from pdpyras import APISession
@ -21,6 +22,7 @@ from lib.pagerduty.config import (
PAGERDUTY_FILTER_TEAM,
PAGERDUTY_FILTER_USERS,
PAGERDUTY_MIGRATE_SERVICES,
VERBOSE_LOGGING,
)
from lib.pagerduty.report import (
escalation_policy_report,
@ -62,149 +64,280 @@ from lib.pagerduty.resources.users import (
)
def filter_schedules(schedules):
"""Filter schedules based on configured filters"""
filtered_schedules = []
def filter_users(users: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Filter users based on PAGERDUTY_FILTER_USERS.
When PAGERDUTY_FILTER_USERS is set, only users with IDs in that list will be included.
"""
if not PAGERDUTY_FILTER_USERS:
return users # No filtering, return all users
filtered_users = []
filtered_out = 0
for user in users:
if user["id"] in PAGERDUTY_FILTER_USERS:
filtered_users.append(user)
else:
filtered_out += 1
if filtered_out > 0:
summary = f"Filtered out {filtered_out} users (keeping only users specified in PAGERDUTY_FILTER_USERS)"
print(summary)
# Only print detailed info in verbose mode
if VERBOSE_LOGGING:
print(
f"{TAB}Keeping only users with IDs: {', '.join(PAGERDUTY_FILTER_USERS)}"
)
return filtered_users
def filter_schedules(schedules: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Filter schedules based on configured filters.
If multiple filters are specified, a schedule only needs to match one of them
to be included (OR operation between filters).
"""
if not any(
[PAGERDUTY_FILTER_TEAM, PAGERDUTY_FILTER_USERS, PAGERDUTY_FILTER_SCHEDULE_REGEX]
):
return schedules # No filters specified, return all
filtered_schedules = []
filtered_out = 0
filtered_reasons = {}
for schedule in schedules:
should_include = True
reason = None
matches_any_filter = False
reasons = []
# Filter by team
if PAGERDUTY_FILTER_TEAM:
teams = schedule.get("teams", [])
if not any(team["summary"] == PAGERDUTY_FILTER_TEAM for team in teams):
should_include = False
reason = f"No teams found for team filter: {PAGERDUTY_FILTER_TEAM}"
if any(team["summary"] == PAGERDUTY_FILTER_TEAM for team in teams):
matches_any_filter = True
else:
reasons.append(
f"No teams found for team filter: {PAGERDUTY_FILTER_TEAM}"
)
# Filter by users
if should_include and PAGERDUTY_FILTER_USERS:
if PAGERDUTY_FILTER_USERS:
schedule_users = set()
for layer in schedule.get("schedule_layers", []):
for user in layer.get("users", []):
schedule_users.add(user["user"]["id"])
if not any(user_id in schedule_users for user_id in PAGERDUTY_FILTER_USERS):
should_include = False
reason = f"No users found for user filter: {','.join(PAGERDUTY_FILTER_USERS)}"
if any(user_id in schedule_users for user_id in PAGERDUTY_FILTER_USERS):
matches_any_filter = True
else:
reasons.append(
f"No users found for user filter: {','.join(PAGERDUTY_FILTER_USERS)}"
)
# Filter by name regex
if should_include and PAGERDUTY_FILTER_SCHEDULE_REGEX:
if not re.match(PAGERDUTY_FILTER_SCHEDULE_REGEX, schedule["name"]):
should_include = False
reason = f"Schedule regex filter: {PAGERDUTY_FILTER_SCHEDULE_REGEX}"
if PAGERDUTY_FILTER_SCHEDULE_REGEX:
if re.match(PAGERDUTY_FILTER_SCHEDULE_REGEX, schedule["name"]):
matches_any_filter = True
else:
reasons.append(
f"Schedule regex filter: {PAGERDUTY_FILTER_SCHEDULE_REGEX}"
)
if should_include:
if matches_any_filter:
filtered_schedules.append(schedule)
else:
filtered_out += 1
print(f"{TAB}Schedule {schedule['id']}: {reason}")
filtered_reasons[schedule["id"]] = reasons
if filtered_out > 0:
print(f"Filtered out {filtered_out} schedules")
summary = f"Filtered out {filtered_out} schedules"
print(summary)
# Only print detailed reasons in verbose mode
if VERBOSE_LOGGING:
for schedule_id, reasons in filtered_reasons.items():
print(f"{TAB}Schedule {schedule_id}: {', '.join(reasons)}")
return filtered_schedules
def filter_escalation_policies(policies):
"""Filter escalation policies based on configured filters"""
def filter_escalation_policies(policies: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Filter escalation policies based on configured filters.
If multiple filters are specified, a policy only needs to match one of them
to be included (OR operation between filters).
"""
if not any(
[
PAGERDUTY_FILTER_TEAM,
PAGERDUTY_FILTER_USERS,
PAGERDUTY_FILTER_ESCALATION_POLICY_REGEX,
]
):
return policies # No filters specified, return all
filtered_policies = []
filtered_out = 0
filtered_reasons = {}
for policy in policies:
should_include = True
reason = None
matches_any_filter = False
reasons = []
# Filter by team
if PAGERDUTY_FILTER_TEAM:
teams = policy.get("teams", [])
if not any(team["summary"] == PAGERDUTY_FILTER_TEAM for team in teams):
should_include = False
reason = f"No teams found for team filter: {PAGERDUTY_FILTER_TEAM}"
if any(team["summary"] == PAGERDUTY_FILTER_TEAM for team in teams):
matches_any_filter = True
else:
reasons.append(
f"No teams found for team filter: {PAGERDUTY_FILTER_TEAM}"
)
# Filter by users
if should_include and PAGERDUTY_FILTER_USERS:
if PAGERDUTY_FILTER_USERS:
policy_users = set()
for rule in policy.get("escalation_rules", []):
for target in rule.get("targets", []):
if target["type"] == "user":
policy_users.add(target["id"])
if not any(user_id in policy_users for user_id in PAGERDUTY_FILTER_USERS):
should_include = False
reason = f"No users found for user filter: {','.join(PAGERDUTY_FILTER_USERS)}"
if any(user_id in policy_users for user_id in PAGERDUTY_FILTER_USERS):
matches_any_filter = True
else:
reasons.append(
f"No users found for user filter: {','.join(PAGERDUTY_FILTER_USERS)}"
)
# Filter by name regex
if should_include and PAGERDUTY_FILTER_ESCALATION_POLICY_REGEX:
if not re.match(PAGERDUTY_FILTER_ESCALATION_POLICY_REGEX, policy["name"]):
should_include = False
reason = f"Escalation policy regex filter: {PAGERDUTY_FILTER_ESCALATION_POLICY_REGEX}"
if PAGERDUTY_FILTER_ESCALATION_POLICY_REGEX:
if re.match(PAGERDUTY_FILTER_ESCALATION_POLICY_REGEX, policy["name"]):
matches_any_filter = True
else:
reasons.append(
f"Escalation policy regex filter: {PAGERDUTY_FILTER_ESCALATION_POLICY_REGEX}"
)
if should_include:
if matches_any_filter:
filtered_policies.append(policy)
else:
filtered_out += 1
print(f"{TAB}Policy {policy['id']}: {reason}")
filtered_reasons[policy["id"]] = reasons
if filtered_out > 0:
print(f"Filtered out {filtered_out} escalation policies")
summary = f"Filtered out {filtered_out} escalation policies"
print(summary)
# Only print detailed reasons in verbose mode
if VERBOSE_LOGGING:
for policy_id, reasons in filtered_reasons.items():
print(f"{TAB}Policy {policy_id}: {', '.join(reasons)}")
return filtered_policies
def filter_integrations(integrations):
"""Filter integrations based on configured filters"""
def filter_integrations(integrations: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Filter integrations based on configured filters.
If multiple filters are specified, an integration only needs to match one of them
to be included (OR operation between filters).
"""
if not any([PAGERDUTY_FILTER_TEAM, PAGERDUTY_FILTER_INTEGRATION_REGEX]):
return integrations # No filters specified, return all
filtered_integrations = []
filtered_out = 0
filtered_reasons = {}
for integration in integrations:
should_include = True
reason = None
matches_any_filter = False
reasons = []
# Filter by team
if PAGERDUTY_FILTER_TEAM:
teams = integration["service"].get("teams", [])
if not any(team["summary"] == PAGERDUTY_FILTER_TEAM for team in teams):
should_include = False
reason = f"No teams found for team filter: {PAGERDUTY_FILTER_TEAM}"
if any(team["summary"] == PAGERDUTY_FILTER_TEAM for team in teams):
matches_any_filter = True
else:
reasons.append(
f"No teams found for team filter: {PAGERDUTY_FILTER_TEAM}"
)
# Filter by name regex
if should_include and PAGERDUTY_FILTER_INTEGRATION_REGEX:
if PAGERDUTY_FILTER_INTEGRATION_REGEX:
integration_name = (
f"{integration['service']['name']} - {integration['name']}"
)
if not re.match(PAGERDUTY_FILTER_INTEGRATION_REGEX, integration_name):
should_include = False
reason = (
if re.match(PAGERDUTY_FILTER_INTEGRATION_REGEX, integration_name):
matches_any_filter = True
else:
reasons.append(
f"Integration regex filter: {PAGERDUTY_FILTER_INTEGRATION_REGEX}"
)
if should_include:
if matches_any_filter:
filtered_integrations.append(integration)
else:
filtered_out += 1
print(f"{TAB}Integration {integration['id']}: {reason}")
filtered_reasons[integration["id"]] = reasons
if filtered_out > 0:
print(f"Filtered out {filtered_out} integrations")
summary = f"Filtered out {filtered_out} integrations"
print(summary)
# Only print detailed reasons in verbose mode
if VERBOSE_LOGGING:
for integration_id, reasons in filtered_reasons.items():
print(f"{TAB}Integration {integration_id}: {', '.join(reasons)}")
return filtered_integrations
def migrate() -> None:
# Set up API sessions and timeout
session = APISession(PAGERDUTY_API_TOKEN)
session.timeout = 20
# Use a flag to track how many resources were eligible for migration in the final report
filtered_resources_summary = {
"schedules": 0,
"escalation_policies": 0,
"integrations": 0,
}
# Process users only if MIGRATE_USERS is true
users = []
oncall_users = []
user_id_map = {}
if MIGRATE_USERS:
print("▶ Fetching users...")
users = session.list_all("users", params={"include[]": "notification_rules"})
else:
print("▶ Skipping user migration as MIGRATE_USERS is false...")
users = []
oncall_users = OnCallAPIClient.list_users_with_notification_rules()
# Apply filtering to users if specified
if PAGERDUTY_FILTER_USERS:
print("▶ Filtering users based on PAGERDUTY_FILTER_USERS...")
users = filter_users(users)
# Match users with Grafana OnCall users
for user in users:
match_user(user, oncall_users)
# Create a mapping from PagerDuty user IDs to Grafana OnCall user IDs
user_id_map = {
u["id"]: u["oncall_user"]["id"] if u["oncall_user"] else None for u in users
}
else:
print("▶ Skipping user fetching and migration as MIGRATE_USERS is false...")
print("▶ Fetching schedules...")
# Fetch schedules from PagerDuty
schedules = session.list_all(
@ -214,6 +347,8 @@ def migrate() -> None:
# Apply filters to schedules
schedules = filter_schedules(schedules)
filtered_resources_summary["schedules"] = len(schedules)
print(f"Found {len(schedules)} schedules after filtering")
# Fetch overrides from PagerDuty
since = datetime.datetime.now(datetime.timezone.utc)
@ -237,6 +372,8 @@ def migrate() -> None:
# Apply filters to escalation policies
escalation_policies = filter_escalation_policies(escalation_policies)
filtered_resources_summary["escalation_policies"] = len(escalation_policies)
print(f"Found {len(escalation_policies)} escalation policies after filtering")
oncall_escalation_chains = OnCallAPIClient.list_all("escalation_chains")
@ -255,6 +392,8 @@ def migrate() -> None:
# Apply filters to integrations
integrations = filter_integrations(integrations)
filtered_resources_summary["integrations"] = len(integrations)
print(f"Found {len(integrations)} integrations after filtering")
oncall_integrations = OnCallAPIClient.list_all("integrations")
@ -266,21 +405,24 @@ def migrate() -> None:
rules = session.list_all(f"rulesets/{ruleset['id']}/rules")
ruleset["rules"] = rules
if MIGRATE_USERS:
for user in users:
match_user(user, oncall_users)
user_id_map = {
u["id"]: u["oncall_user"]["id"] if u["oncall_user"] else None for u in users
}
# Match resources if we have users
for schedule in schedules:
match_schedule(schedule, oncall_schedules, user_id_map)
if MIGRATE_USERS:
match_users_for_schedule(schedule, users)
else:
# When not migrating users, mark schedule as having no unmatched users
schedule["unmatched_users"] = []
schedule["migration_errors"] = []
for policy in escalation_policies:
match_escalation_policy(policy, oncall_escalation_chains)
if MIGRATE_USERS:
match_users_and_schedules_for_escalation_policy(policy, users, schedules)
else:
# When not migrating users, mark policy as having no unmatched users
policy["unmatched_users"] = []
policy["flawed_schedules"] = []
for integration in integrations:
match_integration(integration, oncall_integrations)
@ -296,6 +438,7 @@ def migrate() -> None:
services,
integrations,
)
if PAGERDUTY_MIGRATE_SERVICES:
client = ServiceModelClient()
# Get all services
@ -323,7 +466,25 @@ def migrate() -> None:
BusinessService(service) for service in filtered_business_data
]
# Print filtering and matching summary
print("\n▶ Migration summary after filtering and matching:")
if MIGRATE_USERS:
print(
f"Users: {sum(1 for u in users if u.get('oncall_user'))} matched of {len(users)} total"
)
print(
f"Schedules: {sum(1 for s in schedules if not s.get('unmatched_users') and not s.get('migration_errors'))} eligible of {filtered_resources_summary['schedules']} filtered"
)
print(
f"Escalation policies: {sum(1 for p in escalation_policies if not p.get('unmatched_users') and not p.get('flawed_schedules'))} eligible of {filtered_resources_summary['escalation_policies']} filtered"
)
print(
f"Integrations: {sum(1 for i in integrations if i.get('oncall_type') and not i.get('is_escalation_policy_flawed'))} eligible of {filtered_resources_summary['integrations']} filtered"
)
print("")
if MODE == MODE_PLAN:
if MIGRATE_USERS:
print(user_report(users), end="\n\n")
print(schedule_report(schedules), end="\n\n")
print(escalation_policy_report(escalation_policies), end="\n\n")

View file

@ -4,6 +4,7 @@ from lib.pagerduty.migrate import (
filter_escalation_policies,
filter_integrations,
filter_schedules,
filter_users,
migrate,
)
@ -34,6 +35,74 @@ def test_users_are_skipped_when_migrate_users_is_false(
mock_oncall_client.list_users_with_notification_rules.assert_not_called()
@patch("lib.pagerduty.migrate.MIGRATE_USERS", True)
@patch("lib.pagerduty.migrate.PAGERDUTY_FILTER_USERS", ["USER1", "USER3"])
@patch("lib.pagerduty.migrate.MODE", "migrate") # Skip report generation
@patch("lib.pagerduty.migrate.APISession")
@patch("lib.pagerduty.migrate.OnCallAPIClient")
def test_only_specified_users_are_processed_when_filter_users_is_set(
MockOnCallAPIClient, MockAPISession
):
mock_session = MockAPISession.return_value
# Create test users with required fields
users = [
{
"id": "USER1",
"name": "User 1",
"oncall_user": None,
"email": "user1@example.com",
},
{
"id": "USER2",
"name": "User 2",
"oncall_user": None,
"email": "user2@example.com",
},
{
"id": "USER3",
"name": "User 3",
"oncall_user": None,
"email": "user3@example.com",
},
{
"id": "USER4",
"name": "User 4",
"oncall_user": None,
"email": "user4@example.com",
},
]
# Configure mock to return test users for first call, empty lists for other calls
mock_session.list_all.side_effect = [
users, # users
[], # schedules
[], # escalation_policies
[], # services
[], # vendors
]
mock_session.jget.return_value = {"overrides": []}
# Mock the user matching function to set oncall_user
with patch("lib.pagerduty.migrate.match_user") as mock_match_user:
def set_oncall_user(user, _):
# Just leave oncall_user as it is (None)
pass
mock_match_user.side_effect = set_oncall_user
# Run migrate
migrate()
# Check that match_user was only called for USER1 and USER3
assert mock_match_user.call_count == 2
user_ids = [
call_args[0][0]["id"] for call_args in mock_match_user.call_args_list
]
assert set(user_ids) == {"USER1", "USER3"}
class TestPagerDutyFiltering:
def setup_method(self):
self.mock_schedule = {
@ -73,6 +142,26 @@ class TestPagerDutyFiltering:
},
}
self.users = [
{"id": "USER1", "name": "User 1"},
{"id": "USER2", "name": "User 2"},
{"id": "USER3", "name": "User 3"},
]
@patch("lib.pagerduty.migrate.PAGERDUTY_FILTER_USERS", ["USER1", "USER3"])
def test_filter_users(self):
"""Test filtering users by ID when PAGERDUTY_FILTER_USERS is set."""
filtered = filter_users(self.users)
assert len(filtered) == 2
assert {u["id"] for u in filtered} == {"USER1", "USER3"}
@patch("lib.pagerduty.migrate.PAGERDUTY_FILTER_USERS", [])
def test_filter_users_no_filter(self):
"""Test that all users are kept when PAGERDUTY_FILTER_USERS is empty."""
filtered = filter_users(self.users)
assert len(filtered) == 3
assert {u["id"] for u in filtered} == {"USER1", "USER2", "USER3"}
@patch("lib.pagerduty.migrate.PAGERDUTY_FILTER_TEAM", "Team 1")
def test_filter_schedules_by_team(self):
schedules = [
@ -100,12 +189,40 @@ class TestPagerDutyFiltering:
def test_filter_schedules_by_regex(self):
schedules = [
self.mock_schedule,
{**self.mock_schedule, "name": "Production Schedule"},
{**self.mock_schedule, "name": "Another Schedule"},
]
filtered = filter_schedules(schedules)
assert len(filtered) == 1
assert filtered[0]["id"] == "SCHEDULE1"
@patch("lib.pagerduty.migrate.PAGERDUTY_FILTER_TEAM", "Team 1")
@patch("lib.pagerduty.migrate.PAGERDUTY_FILTER_USERS", ["USER3"])
def test_filter_schedules_with_multiple_filters_or_logic(self):
"""Test that OR logic is applied between filters - a schedule matching any filter is included"""
schedules = [
self.mock_schedule, # Has Team 1 but not USER3
{
"id": "SCHEDULE2",
"name": "Test Schedule 2",
"teams": [{"summary": "Team 2"}], # Not Team 1
"schedule_layers": [
{"users": [{"user": {"id": "USER3"}}]}
], # Has USER3
},
{
"id": "SCHEDULE3",
"name": "Test Schedule 3",
"teams": [{"summary": "Team 3"}], # Not Team 1
"schedule_layers": [
{"users": [{"user": {"id": "USER4"}}]}
], # Not USER3
},
]
filtered = filter_schedules(schedules)
# SCHEDULE1 matches team filter, SCHEDULE2 matches user filter, SCHEDULE3 matches neither
assert len(filtered) == 2
assert {s["id"] for s in filtered} == {"SCHEDULE1", "SCHEDULE2"}
@patch("lib.pagerduty.migrate.PAGERDUTY_FILTER_TEAM", "Team 1")
def test_filter_escalation_policies_by_team(self):
policies = [
@ -122,7 +239,14 @@ class TestPagerDutyFiltering:
self.mock_policy,
{
**self.mock_policy,
"escalation_rules": [{"targets": [{"type": "user", "id": "USER3"}]}],
"escalation_rules": [
{
"targets": [
{"type": "user", "id": "USER3"},
{"type": "user", "id": "USER4"},
]
}
],
},
]
filtered = filter_escalation_policies(policies)
@ -133,19 +257,58 @@ class TestPagerDutyFiltering:
def test_filter_escalation_policies_by_regex(self):
policies = [
self.mock_policy,
{**self.mock_policy, "name": "Production Policy"},
{**self.mock_policy, "name": "Another Policy"},
]
filtered = filter_escalation_policies(policies)
assert len(filtered) == 1
assert filtered[0]["id"] == "POLICY1"
@patch("lib.pagerduty.migrate.PAGERDUTY_FILTER_TEAM", "Team 1")
@patch("lib.pagerduty.migrate.PAGERDUTY_FILTER_USERS", ["USER3"])
def test_filter_escalation_policies_with_multiple_filters_or_logic(self):
"""Test that OR logic is applied between filters - a policy matching any filter is included"""
policies = [
self.mock_policy, # Has Team 1 but not USER3
{
"id": "POLICY2",
"name": "Test Policy 2",
"teams": [{"summary": "Team 2"}], # Not Team 1
"escalation_rules": [
{
"targets": [
{"type": "user", "id": "USER3"}, # Has USER3
]
}
],
},
{
"id": "POLICY3",
"name": "Test Policy 3",
"teams": [{"summary": "Team 3"}], # Not Team 1
"escalation_rules": [
{
"targets": [
{"type": "user", "id": "USER4"}, # Not USER3
]
}
],
},
]
filtered = filter_escalation_policies(policies)
# POLICY1 matches team filter, POLICY2 matches user filter, POLICY3 matches neither
assert len(filtered) == 2
assert {p["id"] for p in filtered} == {"POLICY1", "POLICY2"}
@patch("lib.pagerduty.migrate.PAGERDUTY_FILTER_TEAM", "Team 1")
def test_filter_integrations_by_team(self):
integrations = [
self.mock_integration,
{
**self.mock_integration,
"service": {"teams": [{"summary": "Team 2"}]},
"service": {
"name": "Service 1",
"teams": [{"summary": "Team 2"}],
},
},
]
filtered = filter_integrations(integrations)
@ -160,14 +323,43 @@ class TestPagerDutyFiltering:
self.mock_integration,
{
**self.mock_integration,
"service": {"name": "Service 2"},
"name": "Production Integration",
"service": {"name": "Service 2", "teams": [{"summary": "Team 1"}]},
},
]
filtered = filter_integrations(integrations)
assert len(filtered) == 1
assert filtered[0]["id"] == "INTEGRATION1"
@patch("lib.pagerduty.migrate.PAGERDUTY_FILTER_TEAM", "Team 1")
@patch(
"lib.pagerduty.migrate.PAGERDUTY_FILTER_INTEGRATION_REGEX", "^Service 2 - Test"
)
def test_filter_integrations_with_multiple_filters_or_logic(self):
"""Test that OR logic is applied between filters - an integration matching any filter is included"""
integrations = [
self.mock_integration, # Has Team 1 but doesn't match regex
{
"id": "INTEGRATION2",
"name": "Test Integration",
"service": {
"name": "Service 2", # Matches regex
"teams": [{"summary": "Team 2"}], # Not Team 1
},
},
{
"id": "INTEGRATION3",
"name": "Test Integration",
"service": {
"name": "Service 3", # Doesn't match regex
"teams": [{"summary": "Team 3"}], # Not Team 1
},
},
]
filtered = filter_integrations(integrations)
# INTEGRATION1 matches team filter, INTEGRATION2 matches regex filter, INTEGRATION3 matches neither
assert len(filtered) == 2
assert {i["id"] for i in filtered} == {"INTEGRATION1", "INTEGRATION2"}
class TestPagerDutyMigrationFiltering:
@patch("lib.pagerduty.migrate.filter_schedules")
@ -202,13 +394,12 @@ class TestPagerDutyMigrationFiltering:
mock_service_client = MockServiceModelClient.return_value
mock_service_client.get_components.return_value = []
# Run migration
migrate()
# Verify filters were called with correct data
mock_filter_schedules.assert_called_once_with([{"id": "S1"}])
mock_filter_policies.assert_called_once_with([{"id": "P1"}])
mock_filter_integrations.assert_called_once() # Service data is transformed, so just check it was called
# Assert filters were called
mock_filter_schedules.assert_called_once()
mock_filter_policies.assert_called_once()
mock_filter_integrations.assert_called_once()
@patch("lib.pagerduty.migrate.PAGERDUTY_FILTER_TEAM", "Team 1")
@patch("lib.pagerduty.migrate.filter_schedules")
@ -227,42 +418,32 @@ class TestPagerDutyMigrationFiltering:
# Setup mock returns
mock_session = MockAPISession.return_value
mock_session.list_all.side_effect = [
[{"id": "U1", "name": "Test User", "email": "test@example.com"}], # users
[{"id": "S1", "teams": [{"summary": "Team 1"}]}], # schedules
[{"id": "P1", "teams": [{"summary": "Team 1"}]}], # policies
[], # users
[{"id": "SCHEDULE1", "teams": [{"summary": "Team 1"}]}], # schedules
[
{"id": "SVC1", "teams": [{"summary": "Team 1"}], "integrations": []}
{"id": "POLICY1", "teams": [{"summary": "Team 1"}]},
], # escalation_policies
[
{"id": "SVC1", "teams": [{"summary": "Team 1"}], "integrations": []},
], # services with params
[
{"id": "SVC1", "teams": [{"summary": "Team 1"}], "integrations": []}
{"id": "SVC1", "teams": [{"summary": "Team 1"}], "integrations": []},
], # services
[{"id": "V1"}], # vendors
[{"id": "BS1", "teams": [{"summary": "Team 1"}]}], # business services
]
mock_session.jget.return_value = {"overrides": []} # Mock schedule overrides
mock_oncall_client = MockOnCallAPIClient.return_value
mock_oncall_client.list_all.return_value = []
mock_session.jget.return_value = {"overrides": []}
mock_filter_schedules.return_value = []
mock_filter_policies.return_value = []
mock_filter_integrations.return_value = []
# Run migration
migrate()
# Verify filters were called and filtered by team
# Assert scheduled were filtered by team
mock_filter_schedules.assert_called_once()
mock_filter_policies.assert_called_once()
mock_filter_integrations.assert_called_once()
# Verify team parameter was included in API calls
assert mock_session.list_all.call_args_list == [
call("users", params={"include[]": "notification_rules"}),
call(
"schedules",
params={"include[]": ["schedule_layers", "teams"], "time_zone": "UTC"},
),
call("escalation_policies", params={"include[]": "teams"}),
call("services", params={"include[]": ["integrations", "teams"]}),
call("vendors"),
]
@patch("lib.pagerduty.migrate.PAGERDUTY_FILTER_USERS", ["USER1"])
@patch("lib.pagerduty.migrate.filter_schedules")
@patch("lib.pagerduty.migrate.filter_escalation_policies")
@ -282,36 +463,88 @@ class TestPagerDutyMigrationFiltering:
# Setup mock returns
mock_session = MockAPISession.return_value
mock_session.list_all.side_effect = [
[{"id": "U1", "name": "Test User", "email": "test@example.com"}], # users
[], # users
[
{
"id": "S1",
"id": "SCHEDULE1",
"schedule_layers": [{"users": [{"user": {"id": "USER1"}}]}],
}
], # schedules
[
{
"id": "P1",
"id": "POLICY1",
"escalation_rules": [
{"targets": [{"type": "user", "id": "USER1"}]}
],
}
], # policies
], # escalation_policies
[{"id": "SVC1", "integrations": []}], # services with params
[{"id": "SVC1", "integrations": []}], # services
[{"id": "V1"}], # vendors
[{"id": "BS1"}], # business services
]
mock_session.jget.return_value = {"overrides": []} # Mock schedule overrides
mock_filter_schedules.return_value = []
mock_filter_policies.return_value = []
mock_filter_integrations.return_value = []
mock_oncall_client = MockOnCallAPIClient.return_value
mock_oncall_client.list_all.return_value = []
mock_service_client = MockServiceModelClient.return_value
mock_service_client.get_components.return_value = []
# Run migration
migrate()
# Verify filters were called and filtered by users
# Assert schedule filter was called with correct parameters
mock_filter_schedules.assert_called_once()
mock_filter_policies.assert_called_once()
mock_filter_integrations.assert_called_once()
@patch("lib.pagerduty.migrate.VERBOSE_LOGGING", True)
@patch("lib.pagerduty.migrate.PAGERDUTY_FILTER_TEAM", "Team 1")
def test_verbose_logging_for_schedules(capsys):
schedules = [
{
"id": "SCHEDULE1",
"name": "Test Schedule",
"teams": [{"summary": "Team 1"}],
},
{
"id": "SCHEDULE2",
"name": "Other Schedule",
"teams": [{"summary": "Team 2"}],
},
]
filter_schedules(schedules)
# Capture the output and verify verbose messages
captured = capsys.readouterr()
assert "Filtered out 1 schedules" in captured.out
assert "Schedule SCHEDULE2: No teams found for team filter: Team 1" in captured.out
@patch("lib.pagerduty.migrate.VERBOSE_LOGGING", False)
@patch("lib.pagerduty.migrate.PAGERDUTY_FILTER_TEAM", "Team 1")
def test_non_verbose_logging_for_schedules(capsys):
schedules = [
{
"id": "SCHEDULE1",
"name": "Test Schedule",
"teams": [{"summary": "Team 1"}],
},
{
"id": "SCHEDULE2",
"name": "Other Schedule",
"teams": [{"summary": "Team 2"}],
},
]
filter_schedules(schedules)
# Capture the output and verify no verbose messages
captured = capsys.readouterr()
assert "Filtered out 1 schedules" in captured.out
assert "Schedule SCHEDULE2" not in captured.out

View file

@ -0,0 +1,355 @@
from unittest.mock import call, patch
class MockResponse:
def __init__(self, status_code, json_data=None):
self.status_code = status_code
self.json_data = json_data or {}
self.text = ""
def json(self):
return self.json_data
@patch("pdpyras.APISession")
@patch("lib.grafana.api_client.GrafanaAPIClient")
@patch("sys.exit")
@patch.dict(
"os.environ",
{
"MIGRATING_FROM": "pagerduty",
"PAGERDUTY_API_TOKEN": "test_token",
"GRAFANA_URL": "http://test.com",
"GRAFANA_USERNAME": "test_user",
"GRAFANA_PASSWORD": "test_pass",
"PAGERDUTY_FILTER_USERS": "",
},
)
def test_migrate_all_pagerduty_users(
mock_exit, mock_grafana_client_class, mock_api_session_class
):
mock_session_instance = mock_api_session_class.return_value
mock_session_instance.list_all.return_value = [
{"id": "USER1", "name": "User One", "email": "user1@example.com"},
{"id": "USER2", "name": "User Two", "email": "user2@example.com"},
{"id": "USER3", "name": "User Three", "email": "user3@example.com"},
]
mock_grafana_instance = mock_grafana_client_class.return_value
mock_grafana_instance.create_user_with_random_password.return_value = MockResponse(
200
)
# Now import the module and call the function
# Force reload to ensure our mocks are used
import importlib
import add_users_to_grafana
importlib.reload(add_users_to_grafana)
add_users_to_grafana.migrate_pagerduty_users()
assert mock_session_instance.list_all.call_args == call("users")
assert mock_grafana_instance.create_user_with_random_password.call_count == 3
mock_exit.assert_not_called()
# Verify all 3 users were processed
calls = mock_grafana_instance.create_user_with_random_password.call_args_list
call_emails = [call[0][1] for call in calls]
assert "user1@example.com" in call_emails
assert "user2@example.com" in call_emails
assert "user3@example.com" in call_emails
@patch("pdpyras.APISession")
@patch("lib.grafana.api_client.GrafanaAPIClient")
@patch("sys.exit")
@patch.dict(
"os.environ",
{
"MIGRATING_FROM": "pagerduty",
"PAGERDUTY_API_TOKEN": "test_token",
"GRAFANA_URL": "http://test.com",
"GRAFANA_USERNAME": "test_user",
"GRAFANA_PASSWORD": "test_pass",
"PAGERDUTY_FILTER_USERS": "USER1,USER3",
},
)
def test_migrate_filtered_pagerduty_users(
mock_exit, mock_grafana_client_class, mock_api_session_class
):
mock_session_instance = mock_api_session_class.return_value
mock_session_instance.list_all.return_value = [
{"id": "USER1", "name": "User One", "email": "user1@example.com"},
{"id": "USER2", "name": "User Two", "email": "user2@example.com"},
{"id": "USER3", "name": "User Three", "email": "user3@example.com"},
]
mock_grafana_instance = mock_grafana_client_class.return_value
mock_grafana_instance.create_user_with_random_password.return_value = MockResponse(
200
)
# Import the module and reload to ensure our mocks are used
import importlib
import add_users_to_grafana
importlib.reload(add_users_to_grafana)
add_users_to_grafana.migrate_pagerduty_users()
assert mock_session_instance.list_all.call_args == call("users")
assert mock_grafana_instance.create_user_with_random_password.call_count == 2
mock_exit.assert_not_called()
# Verify only USER1 and USER3 were processed
calls = mock_grafana_instance.create_user_with_random_password.call_args_list
call_emails = [call[0][1] for call in calls]
assert "user1@example.com" in call_emails
assert "user3@example.com" in call_emails
assert "user2@example.com" not in call_emails
@patch("pdpyras.APISession")
@patch("lib.grafana.api_client.GrafanaAPIClient")
@patch("sys.exit")
@patch.dict(
"os.environ",
{
"MIGRATING_FROM": "pagerduty",
"PAGERDUTY_API_TOKEN": "test_token",
"GRAFANA_URL": "http://test.com",
"GRAFANA_USERNAME": "test_user",
"GRAFANA_PASSWORD": "test_pass",
},
)
def test_pagerduty_error_handling(
mock_exit, mock_grafana_client_class, mock_api_session_class
):
mock_session_instance = mock_api_session_class.return_value
mock_session_instance.list_all.return_value = [
{"id": "USER1", "name": "User One", "email": "user1@example.com"}
]
mock_grafana_instance = mock_grafana_client_class.return_value
mock_grafana_instance.create_user_with_random_password.return_value = MockResponse(
401
)
# Import the module and reload to ensure our mocks are used
import importlib
import add_users_to_grafana
importlib.reload(add_users_to_grafana)
add_users_to_grafana.migrate_pagerduty_users()
# Verify sys.exit was called with the correct error message
mock_exit.assert_called_once()
call_args = mock_exit.call_args[0][0]
assert "Invalid username or password" in call_args
@patch("pdpyras.APISession")
@patch("lib.grafana.api_client.GrafanaAPIClient")
@patch("sys.exit")
@patch("builtins.print")
@patch.dict(
"os.environ",
{
"MIGRATING_FROM": "pagerduty",
"PAGERDUTY_API_TOKEN": "test_token",
"GRAFANA_URL": "http://test.com",
"GRAFANA_USERNAME": "test_user",
"GRAFANA_PASSWORD": "test_pass",
},
)
def test_pagerduty_user_already_exists(
mock_print, mock_exit, mock_grafana_client_class, mock_api_session_class
):
mock_session_instance = mock_api_session_class.return_value
mock_session_instance.list_all.return_value = [
{"id": "USER1", "name": "User One", "email": "user1@example.com"}
]
mock_grafana_instance = mock_grafana_client_class.return_value
mock_grafana_instance.create_user_with_random_password.return_value = MockResponse(
412
)
# Import the module and reload to ensure our mocks are used
import importlib
import add_users_to_grafana
importlib.reload(add_users_to_grafana)
add_users_to_grafana.migrate_pagerduty_users()
already_exists_message_found = False
for call_args in mock_print.call_args_list:
if (
len(call_args[0]) > 0
and isinstance(call_args[0][0], str)
and "already exists" in call_args[0][0]
):
already_exists_message_found = True
break
assert (
already_exists_message_found
), 'Expected "already exists" message not found in print calls'
# Verify sys.exit was not called
mock_exit.assert_not_called()
@patch("lib.splunk.api_client.SplunkOnCallAPIClient")
@patch("lib.grafana.api_client.GrafanaAPIClient")
@patch("sys.exit")
@patch.dict(
"os.environ",
{
"MIGRATING_FROM": "splunk",
"SPLUNK_API_ID": "test_id",
"SPLUNK_API_KEY": "test_key",
"GRAFANA_URL": "http://test.com",
"GRAFANA_USERNAME": "test_user",
"GRAFANA_PASSWORD": "test_pass",
},
)
def test_migrate_all_splunk_users(
mock_exit, mock_grafana_client_class, mock_splunk_client_class
):
mock_splunk_instance = mock_splunk_client_class.return_value
mock_splunk_instance.fetch_users.return_value = [
{"firstName": "User", "lastName": "One", "email": "user1@example.com"},
{"firstName": "User", "lastName": "Two", "email": "user2@example.com"},
{"firstName": "User", "lastName": "Three", "email": "user3@example.com"},
]
mock_grafana_instance = mock_grafana_client_class.return_value
mock_grafana_instance.create_user_with_random_password.return_value = MockResponse(
200
)
# Import the module and reload to ensure our mocks are used
import importlib
import add_users_to_grafana
importlib.reload(add_users_to_grafana)
add_users_to_grafana.migrate_splunk_users()
assert mock_splunk_instance.fetch_users.call_args == call(
include_paging_policies=False
)
assert mock_grafana_instance.create_user_with_random_password.call_count == 3
mock_exit.assert_not_called()
# Verify all 3 users were processed
calls = mock_grafana_instance.create_user_with_random_password.call_args_list
call_emails = [call[0][1] for call in calls]
assert "user1@example.com" in call_emails
assert "user2@example.com" in call_emails
assert "user3@example.com" in call_emails
@patch("lib.splunk.api_client.SplunkOnCallAPIClient")
@patch("lib.grafana.api_client.GrafanaAPIClient")
@patch("sys.exit")
@patch.dict(
"os.environ",
{
"MIGRATING_FROM": "splunk",
"SPLUNK_API_ID": "test_id",
"SPLUNK_API_KEY": "test_key",
"GRAFANA_URL": "http://test.com",
"GRAFANA_USERNAME": "test_user",
"GRAFANA_PASSWORD": "test_pass",
},
)
def test_splunk_error_handling(
mock_exit, mock_grafana_client_class, mock_splunk_client_class
):
# Setup mocks
mock_splunk_instance = mock_splunk_client_class.return_value
mock_splunk_instance.fetch_users.return_value = [
{"firstName": "User", "lastName": "One", "email": "user1@example.com"}
]
mock_grafana_instance = mock_grafana_client_class.return_value
mock_grafana_instance.create_user_with_random_password.return_value = MockResponse(
401
)
# Import the module and reload to ensure our mocks are used
import importlib
import add_users_to_grafana
importlib.reload(add_users_to_grafana)
add_users_to_grafana.migrate_splunk_users()
# Verify sys.exit was called with the correct error message
mock_exit.assert_called_once()
call_args = mock_exit.call_args[0][0]
assert "Invalid username or password" in call_args
@patch("lib.splunk.api_client.SplunkOnCallAPIClient")
@patch("lib.grafana.api_client.GrafanaAPIClient")
@patch("sys.exit")
@patch("builtins.print")
@patch.dict(
"os.environ",
{
"MIGRATING_FROM": "splunk",
"SPLUNK_API_ID": "test_id",
"SPLUNK_API_KEY": "test_key",
"GRAFANA_URL": "http://test.com",
"GRAFANA_USERNAME": "test_user",
"GRAFANA_PASSWORD": "test_pass",
},
)
def test_splunk_user_already_exists(
mock_print, mock_exit, mock_grafana_client_class, mock_splunk_client_class
):
mock_splunk_instance = mock_splunk_client_class.return_value
mock_splunk_instance.fetch_users.return_value = [
{"firstName": "User", "lastName": "One", "email": "user1@example.com"}
]
mock_grafana_instance = mock_grafana_client_class.return_value
mock_grafana_instance.create_user_with_random_password.return_value = MockResponse(
412
)
# Import the module and reload to ensure our mocks are used
import importlib
import add_users_to_grafana
importlib.reload(add_users_to_grafana)
add_users_to_grafana.migrate_splunk_users()
already_exists_message_found = False
for call_args in mock_print.call_args_list:
if (
len(call_args[0]) > 0
and isinstance(call_args[0][0], str)
and "already exists" in call_args[0][0]
):
already_exists_message_found = True
break
assert (
already_exists_message_found
), 'Expected "already exists" message not found in print calls'
# Verify sys.exit was not called
mock_exit.assert_not_called()