Splunk OnCall migration tool (#4267)

# What this PR does

Refactors the PagerDuty migration script to be a bit more generic + adds
a migration script to migrate from Splunk OnCall (VictorOps)

tldr;
```bash
❯ docker build -t oncall-migrator .
[+] Building 0.4s (10/10) FINISHED
❯ docker run --rm \
-e MIGRATING_FROM="pagerduty" \
-e MODE="plan" \
-e ONCALL_API_URL="http://localhost:8080" \
-e ONCALL_API_TOKEN="<ONCALL_API_TOKEN>" \
-e PAGERDUTY_API_TOKEN="<PAGERDUTY_API_TOKEN>" \
oncall-migrator
running pagerduty migration script...

❯ docker run --rm \
-e MIGRATING_FROM="splunk" \
-e MODE="plan" \
-e ONCALL_API_URL="http://localhost:8080" \
-e ONCALL_API_TOKEN="<ONCALL_API_TOKEN>" \
-e SPLUNK_API_ID="<SPLUNK_API_ID>" \
-e SPLUNK_API_KEY="<SPLUNK_API_KEY>" \
oncall-migrator
migrating from splunk oncall...
```

https://www.loom.com/share/a855062d436a4ef79f030e22528d8c71

## Checklist

- [x] Unit, integration, and e2e (if applicable) tests updated
- [x] Documentation added (or `pr:no public docs` PR label added if not
required)
- [x] Added the relevant release notes label (see labels prefixed w/
`release:`). These labels dictate how your PR will
    show up in the autogenerated release notes.
This commit is contained in:
Joey Orlando 2024-05-14 09:53:59 -04:00 committed by GitHub
parent 978d7c526f
commit c46dff09d9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
61 changed files with 3344 additions and 384 deletions

View file

@ -279,8 +279,8 @@ jobs:
uv pip sync --system requirements.txt requirements-dev.txt
pytest -x
unit-test-pd-migrator:
name: "Unit tests - PagerDuty Migrator"
unit-test-migrators:
name: "Unit tests - Migrators"
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
@ -288,9 +288,9 @@ jobs:
with:
python-version: "3.11.4"
cache: "pip"
cache-dependency-path: tools/pagerduty-migrator/requirements.txt
- name: Unit Test PD Migrator
working-directory: tools/pagerduty-migrator
cache-dependency-path: tools/migrators/requirements.txt
- name: Unit Test Migrators
working-directory: tools/migrators
run: |
pip install uv
uv pip sync --system requirements.txt

View file

@ -6,10 +6,9 @@ repos:
files: ^engine
args: [--settings-file=engine/pyproject.toml, --filter-files]
- id: isort
name: isort - pd-migrator
files: ^tools/pagerduty-migrator
args:
[--settings-file=tools/pagerduty-migrator/.isort.cfg, --filter-files]
name: isort - migrators
files: ^tools/migrators
args: [--settings-file=tools/migrators/.isort.cfg, --filter-files]
- id: isort
name: isort - dev/scripts
files: ^dev/scripts
@ -22,8 +21,8 @@ repos:
files: ^engine
args: [--config=engine/pyproject.toml]
- id: black
name: black - pd-migrator
files: ^tools/pagerduty-migrator
name: black - migrators
files: ^tools/migrators
- id: black
name: black - dev/scripts
files: ^dev/scripts
@ -38,8 +37,8 @@ repos:
- flake8-bugbear
- flake8-tidy-imports
- id: flake8
name: flake8 - pd-migrator
files: ^tools/pagerduty-migrator
name: flake8 - migrators
files: ^tools/migrators
# Make sure config is compatible with black
# https://black.readthedocs.io/en/stable/guides/using_black_with_other_tools.html#flake8
args: ["--max-line-length=88", "--extend-ignore=E203,E501"]

View file

@ -123,7 +123,7 @@ Have a question, comment or feedback? Don't be afraid to [open an issue](https:/
## Further Reading
- _Migration from PagerDuty_ - [Migrator](https://github.com/grafana/oncall/tree/dev/tools/pagerduty-migrator)
- _Automated migration from other on-call tools_ - [Migrator](https://github.com/grafana/oncall/tree/dev/tools/migrators)
- _Documentation_ - [Grafana OnCall](https://grafana.com/docs/oncall/latest/)
- _Overview Webinar_ - [YouTube](https://www.youtube.com/watch?v=7uSe1pulgs8)
- _How To Add Integration_ - [How to Add Integration](https://github.com/grafana/oncall/tree/dev/engine/config_integrations/README.md)

View file

@ -7,6 +7,8 @@ keywords:
- OnCall
- Migration
- Pagerduty
- Splunk OnCall
- VictorOps
- on-call tools
canonical: https://grafana.com/docs/oncall/latest/set-up/migration-from-other-tools/
aliases:
@ -17,7 +19,9 @@ aliases:
# Migration from other tools
## Migration from PagerDuty to Grafana OnCall
We currently support automated migration from the following on-call tools:
Migration from PagerDuty to Grafana OnCall could be performed in automated way using
[OSS Migrator](https://github.com/grafana/oncall/tree/dev/tools/pagerduty-migrator).
- PagerDuty
- Splunk OnCall (VictorOps)
See our [OSS Migrator](https://github.com/grafana/oncall/tree/dev/tools/migrators) for more details.

View file

@ -7,4 +7,4 @@ COPY requirements.txt requirements.txt
RUN python3 -m pip install -r requirements.txt
COPY . .
CMD ["python3", "-m" , "migrator"]
CMD ["python3", "main.py"]

View file

@ -1,48 +1,54 @@
# PagerDuty to Grafana OnCall migrator tool
# Grafana OnCall migrator tools
This tool helps to migrate your PagerDuty configuration to Grafana OnCall.
These tools will help you to migrate from various on-call tools to Grafana OnCall.
## Overview
Currently the migration tool supports migrating from:
Resources that can be migrated using this tool:
- PagerDuty
- Splunk OnCall (VictorOps)
- User notification rules
- On-call schedules
- Escalation policies
- Services (integrations)
- Event rules (experimental, only works with global event rulesets)
## Getting Started
## Limitations
1. Make sure you have `docker` installed and running
2. Build the docker image: `docker build -t oncall-migrator .`
3. Obtain a Grafana OnCall API token and API URL on the "Settings" page of your Grafana OnCall instance
4. Depending on which tool you are migrating from, see more specific instructions there:
- [PagerDuty](#prerequisites)
- [Splunk OnCall](#prerequisites-1)
5. Run a [migration plan](#migration-plan)
6. If you are pleased with the results of the migration plan, run the tool in [migrate mode](#migration)
- Not all integration types are supported
- Delays between migrated notification/escalation rules could be slightly different from original.
E.g. if you have a 4-minute delay between rules in PagerDuty, the resulting delay in Grafana OnCall will be 5 minutes
- Manual changes to PD configuration may be required to migrate some resources
## Prerequisites
1. Make sure you have `docker` installed
2. Build the docker image: `docker build -t pd-oncall-migrator .`
3. Obtain a PagerDuty API **user token**: <https://support.pagerduty.com/docs/api-access-keys#generate-a-user-token-rest-api-key>
4. Obtain a Grafana OnCall API token and API URL on the "Settings" page of your Grafana OnCall instance
## Migration plan
### Migration Plan
Before starting the migration process, it's useful to see a migration plan by running the tool in `plan` mode:
#### PagerDuty
```shell
docker run --rm \
-e PAGERDUTY_API_TOKEN="<PAGERDUTY_API_TOKEN>" \
-e MIGRATING_FROM="pagerduty" \
-e MODE="plan" \
-e ONCALL_API_URL="<ONCALL_API_URL>" \
-e ONCALL_API_TOKEN="<ONCALL_API_TOKEN>" \
-e MODE="plan" \
pd-oncall-migrator
-e PAGERDUTY_API_TOKEN="<PAGERDUTY_API_TOKEN>" \
oncall-migrator
```
Please read the generated report carefully since depending on the content of the report, some PagerDuty resources
could be not migrated and some existing Grafana OnCall resources could be deleted.
#### Splunk OnCall
### Example migration plan
```shell
docker run --rm \
-e MIGRATING_FROM="splunk" \
-e MODE="plan" \
-e ONCALL_API_URL="<ONCALL_API_URL>" \
-e ONCALL_API_TOKEN="<ONCALL_API_TOKEN>" \
-e SPLUNK_API_ID="<SPLUNK_API_ID>" \
-e SPLUNK_API_KEY="<SPLUNK_API_KEY>" \
oncall-migrator
```
Please read the generated report carefully since depending on the content of the report, some resources
could be not migrated and some existing Grafana OnCall resources could be deleted.
```text
User notification rules report:
@ -68,23 +74,63 @@ Integration report:
❌ DevOps - Email — cannot find appropriate Grafana OnCall integration type
```
## Migration
### Migration
Once you are happy with the migration report, start the migration by setting the `MODE` environment variable to `migrate`:
#### PagerDuty
```shell
docker run --rm \
-e PAGERDUTY_API_TOKEN="<PAGERDUTY_API_TOKEN>" \
-e MIGRATING_FROM="pagerduty" \
-e MODE="migrate" \
-e ONCALL_API_URL="<ONCALL_API_URL>" \
-e ONCALL_API_TOKEN="<ONCALL_API_TOKEN>" \
-e PAGERDUTY_API_TOKEN="<PAGERDUTY_API_TOKEN>" \
oncall-migrator
```
#### Splunk OnCall
```shell
docker run --rm \
-e MIGRATING_FROM="splunk" \
-e MODE="migrate" \
pd-oncall-migrator
-e ONCALL_API_URL="<ONCALL_API_URL>" \
-e ONCALL_API_TOKEN="<ONCALL_API_TOKEN>" \
-e GRAFANA_PASSWORD="<GRAFANA_PASSWORD>" \
-e SPLUNK_API_ID="<SPLUNK_API_ID>" \
-e SPLUNK_API_KEY="<SPLUNK_API_KEY>" \
oncall-migrator
```
When performing a migration, only resources that are marked with ✅ or ⚠️ on the plan stage will be migrated.
The migrator is designed to be idempotent, so it's safe to run it multiple times. On every migration run, the tool will
check if the resource already exists in Grafana OnCall and will delete it before creating a new one.
## PagerDuty
### Overview
Resources that can be migrated using this tool:
- User notification rules
- On-call schedules
- Escalation policies
- Services (integrations)
- Event rules (experimental, only works with global event rulesets)
### Limitations
- Not all integration types are supported
- Delays between migrated notification/escalation rules could be slightly different from original.
E.g. if you have a 4-minute delay between rules in PagerDuty, the resulting delay in Grafana OnCall will be 5 minutes
- Manual changes to PD configuration may be required to migrate some resources
### Prerequisites
- Obtain a PagerDuty API **user token**: <https://support.pagerduty.com/docs/api-access-keys#generate-a-user-token-rest-api-key>
### Migrate unsupported integration types
It's possible to migrate unsupported integration types to [Grafana OnCall incoming webhooks](https://grafana.com/docs/oncall/latest/integrations/available-integrations/configure-webhook/).
@ -92,23 +138,25 @@ To enable this feature, set env variable `UNSUPPORTED_INTEGRATION_TO_WEBHOOKS` t
```shell
docker run --rm \
-e PAGERDUTY_API_TOKEN="<PAGERDUTY_API_TOKEN>" \
-e MIGRATING_FROM="pagerduty" \
-e MODE="migrate" \
-e ONCALL_API_URL="<ONCALL_API_URL>" \
-e ONCALL_API_TOKEN="<ONCALL_API_TOKEN>" \
-e PAGERDUTY_API_TOKEN="<PAGERDUTY_API_TOKEN>" \
-e UNSUPPORTED_INTEGRATION_TO_WEBHOOKS="true" \
-e MODE="migrate" \
pd-oncall-migrator
oncall-migrator
```
Consider modifying [alert templates](https://grafana.com/docs/oncall/latest/alert-behavior/alert-templates/) of the created
webhook integrations to adjust them for incoming payloads.
## Configuration
### Configuration
Configuration is done via environment variables passed to the docker container.
| Name | Description | Type | Default |
| --------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | ----------------------------------- | ------- |
| `MIGRATING_FROM` | Set to `pagerduty` | String | N/A |
| `PAGERDUTY_API_TOKEN` | PagerDuty API **user token**. To create a token, refer to [PagerDuty docs](https://support.pagerduty.com/docs/api-access-keys#generate-a-user-token-rest-api-key). | String | N/A |
| `ONCALL_API_URL` | Grafana OnCall API URL. This can be found on the "Settings" page of your Grafana OnCall instance. | String | N/A |
| `ONCALL_API_TOKEN` | Grafana OnCall API Token. To create a token, navigate to the "Settings" page of your Grafana OnCall instance. | String | N/A |
@ -118,9 +166,9 @@ Configuration is done via environment variables passed to the docker container.
| `EXPERIMENTAL_MIGRATE_EVENT_RULES` | Migrate global event rulesets to Grafana OnCall integrations. | Boolean | `false` |
| `EXPERIMENTAL_MIGRATE_EVENT_RULES_LONG_NAMES` | Include service & integrations names from PD in migrated integrations (only effective when `EXPERIMENTAL_MIGRATE_EVENT_RULES` is `true`). | Boolean | `false` |
## Resources
### Resources
### User notification rules
#### User notification rules
The tool is capable of migrating user notification rules from PagerDuty to Grafana OnCall.
Notification rules from the `"When a high-urgency incident is assigned to me..."` section in PagerDuty settings are
@ -129,12 +177,9 @@ between notification rules may be slightly different in Grafana OnCall, see [Lim
When running the migration, existing notification rules in Grafana OnCall will be deleted for every affected user.
Note that users are matched by email, so if there are users in the report with "no Grafana OnCall user found with
this email" error, it's possible to fix it by adding these users to your Grafana organization.
If there is a large number of unmatched users, please also [see the script](scripts/README.md) that can automatically
create missing Grafana users via Grafana HTTP API.
See [Migrating Users](#migrating-users) for some more information on how users are migrated.
### On-call schedules
#### On-call schedules
The tool is capable of migrating on-call schedules from PagerDuty to Grafana OnCall.
There are two ways to migrate on-call schedules:
@ -155,7 +200,7 @@ These errors are expected and are caused by the fact that the tool can't always
due to differences in scheduling systems in PD and Grafana OnCall. To fix these errors, you need to manually change
on-call shifts in PD and re-run the migration.
### Escalation policies
#### Escalation policies
The tool is capable of migrating escalation policies from PagerDuty to Grafana OnCall.
Every escalation policy will be migrated to a new Grafana OnCall escalation chain with the same name.
@ -166,7 +211,7 @@ unmatched users or schedules that cannot be migrated won't be migrated as well.
Note that delays between escalation steps may be slightly different in Grafana OnCall,
see [Limitations](#limitations) for more info.
### Services (integrations)
#### Services (integrations)
The tool is capable of migrating services (integrations) from PagerDuty to Grafana OnCall.
For every service in PD, the tool will migrate all integrations to Grafana OnCall integrations.
@ -174,7 +219,7 @@ For every service in PD, the tool will migrate all integrations to Grafana OnCal
Any services that reference escalation policies that cannot be migrated won't be migrated as well.
Any integrations with unsupported type won't be migrated unless `UNSUPPORTED_INTEGRATION_TO_WEBHOOKS` is set to `true`.
### Event rules (global event rulesets)
#### Event rules (global event rulesets)
The tool is capable of migrating global event rulesets from PagerDuty to Grafana OnCall integrations. This feature is
experimental and disabled by default. To enable it, set `EXPERIMENTAL_MIGRATE_EVENT_RULES` to `true`.
@ -188,9 +233,128 @@ If you want to include service & integration names in the names of migrated inte
`EXPERIMENTAL_MIGRATE_EVENT_RULES` is `true`). This can make searching for integrations easier,
but it can also make the names of integrations too long.
## After migration
### After migration
- Connect integrations (press the "How to connect" button on the integration page)
- Make sure users connect their phone numbers, Slack accounts, etc. in their user settings
- When using `SCHEDULE_MIGRATION_MODE=ical`, at some point you would probably want to recreate schedules using
Google Calendar or Terraform to be able to modify migrated on-call schedules in Grafana OnCall
## Splunk OnCall
### Overview
Resources that can be migrated using this tool:
- Escalation Policies
- On-Call Schedules (including Rotations + Scheduled Overrides)
- Teams + team memberships
- User Paging Policies
### Limitations
- Only the Primary Paging Policy for users are migrated, no Custom Paging Policies are migrated
- Not all Splunk escalation step types are supported
- Delays between migrated notification/escalation rules could be slightly different from original.
E.g. if you have a 20-minute delay between rules in Splunk OnCall, the resulting delay in Grafana OnCall will be 15 minutes
### Prerequisites
- Obtain your Splunk API ID and an API token: <https://help.victorops.com/knowledge-base/api/#:~:text=currently%20in%20place.-,API%20Configuration%20in%20Splunk%20On%2DCall,-To%20access%20the>
### Configuration
Configuration is done via environment variables passed to the docker container.
| Name | Description | Type | Default |
| --------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | ----------------------------------- | ------- |
| `MIGRATING_FROM` | Set to `splunk` | String | N/A |
| `SPLUNK_API_KEY` | Splunk API **key**. To create an API Key, refer to [Splunk OnCall docs](https://help.victorops.com/knowledge-base/api/#:~:text=currently%20in%20place.-,API%20Configuration%20in%20Splunk%20On%2DCall,-To%20access%20the). | String | N/A |
| `SPLUNK_API_ID` | Splunk API **ID**. To retrieve this ID, refer to [Splunk OnCall docs](https://help.victorops.com/knowledge-base/api/#:~:text=currently%20in%20place.-,API%20Configuration%20in%20Splunk%20On%2DCall,-To%20access%20the). | String | N/A |
| `ONCALL_API_URL` | Grafana OnCall API URL. This can be found on the "Settings" page of your Grafana OnCall instance. | String | N/A |
| `ONCALL_API_TOKEN` | Grafana OnCall API Token. To create a token, navigate to the "Settings" page of your Grafana OnCall instance. | String | N/A |
| `MODE` | Migration mode (plan vs actual migration). | String (choices: `plan`, `migrate`) | `plan` |
### Resources
#### Escalation Policies
The tool is capable of migrating escalation policies from Splunk OnCall to Grafana OnCall.
Every escalation policy will be migrated to a new Grafana OnCall escalation chain with the same name.
Any existing escalation chains with the same name will be deleted before migration. Any escalation policies that reference
unmatched users or schedules that cannot be migrated won't be migrated as well.
##### Caveats
- delays between escalation steps may be slightly different in Grafana OnCall, see [Limitations](#limitations-1) for
more info.
- the following Splunk OnCall escalation step types are not supported and will not be migrated:
- "Notify the next user(s) in the current on-duty shift"
- "Notify the previous user(s) in the current on-duty shift"
- "Notify every member of this team"
- "Send an email to email address"
- "Execute webhook" (as Splunk OnCall webhooks are currently not migrated to Grafana OnCall webhooks)
#### On-call schedules
The tool is capable of migrating on-call schedules from Splunk OnCall to Grafana OnCall. Every Splunk On-Call Schedule
will be migrated to a new Grafana OnCall schedule chain with the name as the Splunk team's name + `schedule`
(ex. `Infra Team schedule`).
Any existing Grafana OnCall schedules with the same name will be deleted before migration.
##### Caveats
We don't currently support multi-day shifts which have a "hand-off" period set to greater than one week.
#### User Paging Policies
The tool is capable of migrating paging policies from Splunk OnCall to Grafana OnCall.
All user's **Primary** paging policy will be migrated to a new Grafana OnCall user notification policy with the same name.
Any existing personal notification policies for these users will be deleted before migration.
See [Migrating Users](#migrating-users) for some more information on how users are migrated.
##### Caveats
- The WhatsApp escalation type is not supported and will not be migrated to the Grafana OnCall
user's personal notification policy
- Note that delays between escalation steps may be slightly different in Grafana OnCall,
see [Limitations](#limitations-1) for more info.
## Migrating Users
Note that users are matched by email, so if there are users in the report with "no Grafana OnCall user found with
this email" error, it's possible to fix it by adding these users to your Grafana organization.
If there are a large number of unmatched users, you can use the following script that will automatically create missing
Grafana users via the Grafana HTTP API.
**NOTE**: The script will create users with random passwords, so they will need to reset their passwords later in Grafana.
### PagerDuty
```bash
docker run --rm \
-e MIGRATING_FROM="pagerduty" \
-e GRAFANA_URL="http://localhost:3000" \
-e GRAFANA_USERNAME="admin" \
-e GRAFANA_PASSWORD="admin" \
-e PAGERDUTY_API_TOKEN="<PAGERDUTY_API_TOKEN>" \
oncall-migrator python /app/add_users_to_grafana.py
```
### Splunk OnCall (VictorOps)
```bash
docker run --rm \
-e MIGRATING_FROM="splunk" \
-e GRAFANA_URL="http://localhost:3000" \
-e GRAFANA_USERNAME="admin" \
-e GRAFANA_PASSWORD="admin" \
-e SPLUNK_API_ID="<SPLUNK_API_ID>" \
-e SPLUNK_API_KEY="<SPLUNK_API_KEY>" \
oncall-migrator python /app/add_users_to_grafana.py
```

View file

@ -0,0 +1,58 @@
import os
import sys
from pdpyras import APISession
from lib.grafana.api_client import GrafanaAPIClient
from lib.splunk.api_client import SplunkOnCallAPIClient
MIGRATING_FROM = os.environ["MIGRATING_FROM"]
PAGERDUTY = "pagerduty"
SPLUNK = "splunk"
PAGERDUTY_API_TOKEN = os.environ.get("PAGERDUTY_API_TOKEN")
SPLUNK_API_ID = os.environ.get("SPLUNK_API_ID")
SPLUNK_API_KEY = os.environ.get("SPLUNK_API_KEY")
GRAFANA_URL = os.environ["GRAFANA_URL"] # Example: http://localhost:3000
GRAFANA_USERNAME = os.environ["GRAFANA_USERNAME"]
GRAFANA_PASSWORD = os.environ["GRAFANA_PASSWORD"]
SUCCESS_SIGN = ""
ERROR_SIGN = ""
grafana_client = GrafanaAPIClient(GRAFANA_URL, GRAFANA_USERNAME, GRAFANA_PASSWORD)
def migrate_pagerduty_users():
session = APISession(PAGERDUTY_API_TOKEN)
for user in session.list_all("users"):
create_grafana_user(user["name"], user["email"])
def migrate_splunk_users():
client = SplunkOnCallAPIClient(SPLUNK_API_ID, SPLUNK_API_KEY)
for user in client.fetch_users(include_paging_policies=False):
create_grafana_user(f"{user['firstName']} {user['lastName']}", user["email"])
def create_grafana_user(name: str, email: str):
response = grafana_client.create_user_with_random_password(name, email)
if response.status_code == 200:
print(SUCCESS_SIGN + " User created: " + email)
elif response.status_code == 401:
sys.exit(ERROR_SIGN + " Invalid username or password.")
elif response.status_code == 412:
print(ERROR_SIGN + " User " + email + " already exists.")
else:
print("{} {}".format(ERROR_SIGN, response.text))
if __name__ == "__main__":
if MIGRATING_FROM == PAGERDUTY:
migrate_pagerduty_users()
elif MIGRATING_FROM == SPLUNK:
migrate_splunk_users()
else:
raise ValueError("Invalid value for MIGRATING_FROM")

View file

@ -0,0 +1,25 @@
import os
from urllib.parse import urljoin
PAGERDUTY = "pagerduty"
SPLUNK = "splunk"
MIGRATING_FROM = os.getenv("MIGRATING_FROM")
assert MIGRATING_FROM in (PAGERDUTY, SPLUNK)
MODE_PLAN = "plan"
MODE_MIGRATE = "migrate"
MODE = os.getenv("MODE", default=MODE_PLAN)
assert MODE in (MODE_PLAN, MODE_MIGRATE)
ONCALL_API_TOKEN = os.environ["ONCALL_API_TOKEN"]
ONCALL_API_URL = urljoin(
os.environ["ONCALL_API_URL"].removesuffix("/") + "/",
"api/v1/",
)
ONCALL_DELAY_OPTIONS = [1, 5, 15, 30, 60]
SCHEDULE_MIGRATION_MODE_ICAL = "ical"
SCHEDULE_MIGRATION_MODE_WEB = "web"
SCHEDULE_MIGRATION_MODE = os.getenv(
"SCHEDULE_MIGRATION_MODE", SCHEDULE_MIGRATION_MODE_ICAL
)

View file

@ -0,0 +1,4 @@
TAB = " " * 4
SUCCESS_SIGN = ""
ERROR_SIGN = ""
WARNING_SIGN = "⚠️" # TODO: warning sign does not renders properly

View file

@ -0,0 +1,16 @@
import typing
class MatchTeam(typing.TypedDict):
name: str
oncall_team: typing.Optional[typing.Dict[str, typing.Any]]
def match_team(team: MatchTeam, oncall_teams: typing.List[MatchTeam]) -> None:
oncall_team = None
for candidate_team in oncall_teams:
if team["name"].lower() == candidate_team["name"].lower():
oncall_team = candidate_team
break
team["oncall_team"] = oncall_team

View file

@ -0,0 +1,16 @@
import typing
class MatchUser(typing.TypedDict):
email: str
oncall_user: typing.Optional[typing.Dict[str, typing.Any]]
def match_user(user: MatchUser, oncall_users: typing.List[MatchUser]) -> None:
oncall_user = None
for candidate_user in oncall_users:
if user["email"].lower() == candidate_user["email"].lower():
oncall_user = candidate_user
break
user["oncall_user"] = oncall_user

View file

@ -0,0 +1,83 @@
import secrets
from urllib.parse import urljoin
import requests
class GrafanaAPIClient:
def __init__(self, base_url, username, password):
self.base_url = base_url
self.username = username
self.password = password
def _api_call(self, method: str, path: str, **kwargs):
return requests.request(
method,
urljoin(self.base_url, path),
auth=(self.username, self.password),
**kwargs,
)
def create_user_with_random_password(self, name: str, email: str):
return self._api_call(
"POST",
"/api/admin/users",
json={
"name": name,
"email": email,
"login": email.split("@")[0],
"password": secrets.token_urlsafe(15),
},
)
def get_all_users(self):
"""
https://grafana.com/docs/grafana/v10.3/developers/http_api/user/#search-users
"""
return self._api_call("GET", "/api/users").json()
def idemopotently_create_team_and_add_users(
self, team_name: str, user_emails: list[str]
) -> int:
"""
Get team by name
https://grafana.com/docs/grafana/v10.3/developers/http_api/team/#using-the-name-parameter
Create team
https://grafana.com/docs/grafana/v10.3/developers/http_api/team/#add-team
Add team members
https://grafana.com/docs/grafana/v10.3/developers/http_api/team/#add-team-member
"""
existing_team = self._api_call(
"GET", "/api/teams/search", params={"name": team_name}
).json()
if existing_team["teams"]:
# team already exists
team_id = existing_team["teams"][0]["id"]
else:
# team doesn't exist create it
response = self._api_call("POST", "/api/teams", json={"name": team_name})
if response.status_code == 200:
team_id = response.json()["teamId"]
else:
raise Exception(f"Failed to fetch/create Grafana team '{team_name}'")
grafana_users = self.get_all_users()
grafana_user_id_to_email_map = {}
for user_email in user_emails:
for grafana_user in grafana_users:
if grafana_user["email"] == user_email:
grafana_user_id_to_email_map[grafana_user["id"]] = user_email
break
for user_id in grafana_user_id_to_email_map.keys():
self._api_call(
"POST", f"/api/teams/{team_id}/members", json={"userId": user_id}
)
return team_id

View file

@ -6,11 +6,9 @@ import requests
from requests import HTTPError
from requests.adapters import HTTPAdapter, Retry
from migrator.config import ONCALL_API_TOKEN, ONCALL_API_URL
def api_call(method: str, path: str, **kwargs) -> requests.Response:
url = urljoin(ONCALL_API_URL, path)
def api_call(method: str, base_url: str, path: str, **kwargs) -> requests.Response:
url = urljoin(base_url, path)
# Retry on network errors
session = requests.Session()
@ -18,9 +16,7 @@ def api_call(method: str, path: str, **kwargs) -> requests.Response:
session.mount("http://", HTTPAdapter(max_retries=retries))
session.mount("https://", HTTPAdapter(max_retries=retries))
response = session.request(
method, url, headers={"Authorization": ONCALL_API_TOKEN}, **kwargs
)
response = session.request(method, url, **kwargs)
try:
response.raise_for_status()
@ -50,37 +46,3 @@ def api_call(method: str, path: str, **kwargs) -> requests.Response:
raise
return response
def list_all(path: str) -> list[dict]:
response = api_call("get", path)
data = response.json()
results = data["results"]
while data["next"]:
response = api_call("get", data["next"])
data = response.json()
results += data["results"]
return results
def create(path: str, payload: dict) -> dict:
response = api_call("post", path, json=payload)
return response.json()
def delete(path: str) -> None:
try:
api_call("delete", path)
except requests.exceptions.HTTPError as e:
# ignore 404s on delete so deleting resources manually while running the script doesn't break it
if e.response.status_code != 404:
raise
def update(path: str, payload: dict) -> dict:
response = api_call("put", path, json=payload)
return response.json()

View file

View file

@ -0,0 +1,66 @@
import requests
from lib.base_config import ONCALL_API_TOKEN, ONCALL_API_URL
from lib.network import api_call as _api_call
class OnCallAPIClient:
@classmethod
def api_call(cls, method: str, path: str, **kwargs) -> requests.Response:
return _api_call(
method,
ONCALL_API_URL,
path,
headers={"Authorization": ONCALL_API_TOKEN},
**kwargs
)
@classmethod
def list_all(cls, path: str) -> list[dict]:
response = cls.api_call("get", path)
data = response.json()
results = data["results"]
while data["next"]:
response = cls.api_call("get", data["next"])
data = response.json()
results += data["results"]
return results
@classmethod
def create(cls, path: str, payload: dict) -> dict:
response = cls.api_call("post", path, json=payload)
return response.json()
@classmethod
def delete(cls, path: str) -> None:
try:
cls.api_call("delete", path)
except requests.exceptions.HTTPError as e:
# ignore 404s on delete so deleting resources manually while running the script doesn't break it
if e.response.status_code != 404:
raise
@classmethod
def update(cls, path: str, payload: dict) -> dict:
response = cls.api_call("put", path, json=payload)
return response.json()
@classmethod
def list_users_with_notification_rules(cls):
oncall_users = cls.list_all("users")
oncall_notification_rules = cls.list_all(
"personal_notification_rules/?important=false"
)
for user in oncall_users:
user["notification_rules"] = [
rule
for rule in oncall_notification_rules
if rule["user_id"] == user["id"]
]
return oncall_users

View file

@ -0,0 +1,33 @@
import typing
class OnCallUserNotificationRule(typing.TypedDict):
position: int
id: str
user_id: str
important: bool
type: str
class OnCallUser(typing.TypedDict):
id: str
email: str
slack: typing.Optional[str]
username: str
role: str
is_phone_number_verified: bool
timezone: str
teams: typing.List[str]
notification_rules: typing.List[OnCallUserNotificationRule]
class OnCallSchedule(typing.TypedDict):
pass
class OnCallEscalationChain(typing.TypedDict):
id: str
class OnCallEscalationPolicyCreatePayload(typing.TypedDict):
pass

View file

@ -1,19 +1,8 @@
import os
from urllib.parse import urljoin
MODE_PLAN = "plan"
MODE_MIGRATE = "migrate"
MODE = os.getenv("MODE", default=MODE_PLAN)
assert MODE in (MODE_PLAN, MODE_MIGRATE)
from lib.base_config import * # noqa: F401,F403
PAGERDUTY_API_TOKEN = os.environ["PAGERDUTY_API_TOKEN"]
ONCALL_API_TOKEN = os.environ["ONCALL_API_TOKEN"]
ONCALL_API_URL = urljoin(
os.environ["ONCALL_API_URL"].removesuffix("/") + "/",
"api/v1/",
)
ONCALL_DELAY_OPTIONS = [1, 5, 15, 30, 60]
PAGERDUTY_TO_ONCALL_CONTACT_METHOD_MAP = {
"sms_contact_method": "notify_by_sms",
"phone_contact_method": "notify_by_phone_call",
@ -33,12 +22,6 @@ PAGERDUTY_TO_ONCALL_VENDOR_MAP = {
"Firebase": "fabric",
}
SCHEDULE_MIGRATION_MODE_ICAL = "ical"
SCHEDULE_MIGRATION_MODE_WEB = "web"
SCHEDULE_MIGRATION_MODE = os.getenv(
"SCHEDULE_MIGRATION_MODE", SCHEDULE_MIGRATION_MODE_ICAL
)
# Experimental feature to migrate PD rulesets to OnCall integrations
EXPERIMENTAL_MIGRATE_EVENT_RULES = (
os.getenv("EXPERIMENTAL_MIGRATE_EVENT_RULES", "false").lower() == "true"

View file

@ -2,15 +2,16 @@ import datetime
from pdpyras import APISession
from migrator import oncall_api_client
from migrator.config import (
from lib.common.report import TAB
from lib.common.resources.users import match_user
from lib.oncall.api_client import OnCallAPIClient
from lib.pagerduty.config import (
EXPERIMENTAL_MIGRATE_EVENT_RULES,
MODE,
MODE_PLAN,
PAGERDUTY_API_TOKEN,
)
from migrator.report import (
TAB,
from lib.pagerduty.report import (
escalation_policy_report,
format_escalation_policy,
format_integration,
@ -22,41 +23,33 @@ from migrator.report import (
schedule_report,
user_report,
)
from migrator.resources.escalation_policies import (
from lib.pagerduty.resources.escalation_policies import (
match_escalation_policy,
match_escalation_policy_for_integration,
migrate_escalation_policy,
)
from migrator.resources.integrations import (
from lib.pagerduty.resources.integrations import (
match_integration,
match_integration_type,
migrate_integration,
)
from migrator.resources.notification_rules import migrate_notification_rules
from migrator.resources.rulesets import match_ruleset, migrate_ruleset
from migrator.resources.schedules import match_schedule, migrate_schedule
from migrator.resources.users import (
match_user,
from lib.pagerduty.resources.notification_rules import migrate_notification_rules
from lib.pagerduty.resources.rulesets import match_ruleset, migrate_ruleset
from lib.pagerduty.resources.schedules import match_schedule, migrate_schedule
from lib.pagerduty.resources.users import (
match_users_and_schedules_for_escalation_policy,
match_users_for_schedule,
)
def main() -> None:
def migrate() -> None:
session = APISession(PAGERDUTY_API_TOKEN)
session.timeout = 20
print("▶ Fetching users...")
users = session.list_all("users", params={"include[]": "notification_rules"})
oncall_users = oncall_api_client.list_all("users")
oncall_notification_rules = oncall_api_client.list_all(
"personal_notification_rules/?important=false"
)
for user in oncall_users:
user["notification_rules"] = [
rule for rule in oncall_notification_rules if rule["user_id"] == user["id"]
]
oncall_users = OnCallAPIClient.list_users_with_notification_rules()
print("▶ Fetching schedules...")
# Fetch schedules from PagerDuty
@ -77,11 +70,11 @@ def main() -> None:
schedule["overrides"] = response["overrides"]
# Fetch schedules from OnCall
oncall_schedules = oncall_api_client.list_all("schedules")
oncall_schedules = OnCallAPIClient.list_all("schedules")
print("▶ Fetching escalation policies...")
escalation_policies = session.list_all("escalation_policies")
oncall_escalation_chains = oncall_api_client.list_all("escalation_chains")
oncall_escalation_chains = OnCallAPIClient.list_all("escalation_chains")
print("▶ Fetching integrations...")
services = session.list_all("services", params={"include[]": "integrations"})
@ -94,7 +87,7 @@ def main() -> None:
integration["service"] = service
integrations.append(integration)
oncall_integrations = oncall_api_client.list_all("integrations")
oncall_integrations = OnCallAPIClient.list_all("integrations")
rulesets = None
if EXPERIMENTAL_MIGRATE_EVENT_RULES:
@ -178,7 +171,3 @@ def main() -> None:
if not ruleset["flawed_escalation_policies"]:
migrate_ruleset(ruleset, escalation_policies, services)
print(TAB + format_ruleset(ruleset))
if __name__ == "__main__":
main()

View file

@ -1,7 +1,4 @@
TAB = " " * 4
SUCCESS_SIGN = ""
ERROR_SIGN = ""
WARNING_SIGN = "⚠️" # TODO: warning sign does not renders properly
from lib.common.report import ERROR_SIGN, SUCCESS_SIGN, TAB, WARNING_SIGN
def format_user(user: dict) -> str:

View file

@ -1,5 +1,5 @@
from migrator import oncall_api_client
from migrator.utils import find_by_id, transform_wait_delay
from lib.oncall.api_client import OnCallAPIClient
from lib.utils import find_by_id, transform_wait_delay
def match_escalation_policy(policy: dict, oncall_escalation_chains: list[dict]) -> None:
@ -30,14 +30,14 @@ def migrate_escalation_policy(
num_loops = escalation_policy["num_loops"]
if escalation_policy["oncall_escalation_chain"]:
oncall_api_client.delete(
OnCallAPIClient.delete(
"escalation_chains/{}".format(
escalation_policy["oncall_escalation_chain"]["id"]
)
)
oncall_escalation_chain_payload = {"name": name, "team_id": None}
oncall_escalation_chain = oncall_api_client.create(
oncall_escalation_chain = OnCallAPIClient.create(
"escalation_chains", oncall_escalation_chain_payload
)
@ -47,7 +47,7 @@ def migrate_escalation_policy(
rules, oncall_escalation_chain["id"], users, schedules, num_loops
)
for policy in oncall_escalation_policies:
oncall_api_client.create("escalation_policies", policy)
OnCallAPIClient.create("escalation_policies", policy)
def transform_rules(

View file

@ -1,9 +1,9 @@
from migrator import oncall_api_client
from migrator.config import (
from lib.oncall.api_client import OnCallAPIClient
from lib.pagerduty.config import (
PAGERDUTY_TO_ONCALL_VENDOR_MAP,
UNSUPPORTED_INTEGRATION_TO_WEBHOOKS,
)
from migrator.utils import find_by_id
from lib.utils import find_by_id
def match_integration(integration: dict, oncall_integrations: list[dict]) -> None:
@ -55,7 +55,7 @@ def migrate_integration(integration: dict, escalation_policies: list[dict]) -> N
oncall_escalation_chain = escalation_policy["oncall_escalation_chain"]
if integration["oncall_integration"]:
oncall_api_client.delete(
OnCallAPIClient.delete(
"integrations/{}".format(integration["oncall_integration"]["id"])
)
@ -73,13 +73,13 @@ def create_integration(
) -> None:
payload = {"name": name, "type": integration_type, "team_id": None}
integration = oncall_api_client.create("integrations", payload)
integration = OnCallAPIClient.create("integrations", payload)
routes = oncall_api_client.list_all(
routes = OnCallAPIClient.list_all(
"routes/?integration_id={}".format(integration["id"])
)
default_route_id = routes[0]["id"]
oncall_api_client.update(
OnCallAPIClient.update(
f"routes/{default_route_id}", {"escalation_chain_id": escalation_chain_id}
)

View file

@ -1,8 +1,8 @@
import copy
from migrator import oncall_api_client
from migrator.config import PAGERDUTY_TO_ONCALL_CONTACT_METHOD_MAP
from migrator.utils import remove_duplicates, transform_wait_delay
from lib.oncall.api_client import OnCallAPIClient
from lib.pagerduty.config import PAGERDUTY_TO_ONCALL_CONTACT_METHOD_MAP
from lib.utils import remove_duplicates, transform_wait_delay
def remove_duplicate_rules_between_waits(rules: list[dict]) -> list[dict]:
@ -32,14 +32,12 @@ def migrate_notification_rules(user: dict) -> None:
)
for rule in oncall_rules:
oncall_api_client.create("personal_notification_rules", rule)
OnCallAPIClient.create("personal_notification_rules", rule)
if oncall_rules:
# delete old notification rules if any new rules were created
for rule in user["oncall_user"]["notification_rules"]:
oncall_api_client.delete(
"personal_notification_rules/{}".format(rule["id"])
)
OnCallAPIClient.delete("personal_notification_rules/{}".format(rule["id"]))
def transform_notification_rules(

View file

@ -1,6 +1,6 @@
from migrator import oncall_api_client
from migrator.config import EXPERIMENTAL_MIGRATE_EVENT_RULES_LONG_NAMES
from migrator.utils import find_by_id
from lib.oncall.api_client import OnCallAPIClient
from lib.pagerduty.config import EXPERIMENTAL_MIGRATE_EVENT_RULES_LONG_NAMES
from lib.utils import find_by_id
def match_ruleset(
@ -49,7 +49,7 @@ def migrate_ruleset(
) -> None:
# Delete existing integration with the same name
if ruleset["oncall_integration"]:
oncall_api_client.delete(
OnCallAPIClient.delete(
"integrations/{}".format(ruleset["oncall_integration"]["id"])
)
@ -59,7 +59,7 @@ def migrate_ruleset(
"type": "webhook",
"team_id": None,
}
integration = oncall_api_client.create("integrations", integration_payload)
integration = OnCallAPIClient.create("integrations", integration_payload)
# Migrate rules that are not disabled and not catch-all
rules = [r for r in ruleset["rules"] if not r["disabled"] and not r["catch_all"]]
@ -78,7 +78,7 @@ def migrate_ruleset(
"integration_id": integration["id"],
"escalation_chain_id": escalation_chain_id,
}
oncall_api_client.create("routes", route_payload)
OnCallAPIClient.create("routes", route_payload)
# Migrate catch-all rule
catch_all_rule = [r for r in ruleset["rules"] if r["catch_all"]][0]
@ -93,11 +93,11 @@ def migrate_ruleset(
if catch_all_escalation_chain_id:
# Get the default route and update it to use appropriate escalation chain
routes = oncall_api_client.list_all(
routes = OnCallAPIClient.list_all(
"routes/?integration_id={}".format(integration["id"])
)
default_route_id = routes[-1]["id"]
oncall_api_client.update(
OnCallAPIClient.update(
f"routes/{default_route_id}",
{"escalation_chain_id": catch_all_escalation_chain_id},
)

View file

@ -4,12 +4,13 @@ from enum import Enum
from typing import Optional
from uuid import uuid4
from migrator import oncall_api_client
from migrator.config import (
from lib.oncall.api_client import OnCallAPIClient
from lib.pagerduty.config import (
SCHEDULE_MIGRATION_MODE,
SCHEDULE_MIGRATION_MODE_ICAL,
SCHEDULE_MIGRATION_MODE_WEB,
)
from lib.utils import dt_to_oncall_datetime, duration_to_frequency_and_interval
def match_schedule(
@ -30,9 +31,7 @@ def match_schedule(
def migrate_schedule(schedule: dict, user_id_map: dict[str, str]) -> None:
if schedule["oncall_schedule"]:
oncall_api_client.delete(
"schedules/{}".format(schedule["oncall_schedule"]["id"])
)
OnCallAPIClient.delete("schedules/{}".format(schedule["oncall_schedule"]["id"]))
if SCHEDULE_MIGRATION_MODE == SCHEDULE_MIGRATION_MODE_WEB:
# Migrate shifts
@ -45,34 +44,13 @@ def migrate_schedule(schedule: dict, user_id_map: dict[str, str]) -> None:
"ical_url_primary": schedule["http_cal_url"],
"team_id": None,
}
oncall_schedule = oncall_api_client.create("schedules", payload)
oncall_schedule = OnCallAPIClient.create("schedules", payload)
else:
raise ValueError("Invalid schedule migration mode")
schedule["oncall_schedule"] = oncall_schedule
def duration_to_frequency_and_interval(duration: datetime.timedelta) -> tuple[str, int]:
"""
Convert a duration to shift frequency and interval.
For example, 1 day duration returns ("daily", 1), 14 days returns ("weekly", 2),
"""
seconds = int(duration.total_seconds())
assert seconds >= 3600, "Rotation must be at least 1 hour"
hours = seconds // 3600
if hours >= 24 and hours % 24 == 0:
days = hours // 24
if days >= 7 and days % 7 == 0:
weeks = days // 7
return "weekly", weeks
else:
return "daily", days
else:
return "hourly", hours
def _pd_datetime_to_dt(text: str) -> datetime.datetime:
"""
Convert a PagerDuty datetime string to a datetime object.
@ -81,13 +59,6 @@ def _pd_datetime_to_dt(text: str) -> datetime.datetime:
return dt.replace(tzinfo=datetime.timezone.utc)
def _dt_to_oncall_datetime(dt: datetime.datetime) -> str:
"""
Convert a datetime object to an OnCall datetime string.
"""
return dt.strftime("%Y-%m-%dT%H:%M:%S")
@dataclass
class Schedule:
"""
@ -198,12 +169,12 @@ class Schedule:
# Create shifts in OnCall
shift_ids = []
for shift in schedule["shifts"]:
created_shift = oncall_api_client.create("on_call_shifts", shift)
created_shift = OnCallAPIClient.create("on_call_shifts", shift)
shift_ids.append(created_shift["id"])
# Create schedule in OnCall with shift IDs provided
schedule["shifts"] = shift_ids
new_schedule = oncall_api_client.create("schedules", schedule)
new_schedule = OnCallAPIClient.create("schedules", schedule)
return new_schedule
@ -262,9 +233,9 @@ class Layer:
"name": uuid4().hex,
"level": self.level,
"type": "rolling_users",
"rotation_start": _dt_to_oncall_datetime(self.start),
"until": _dt_to_oncall_datetime(self.end) if self.end else None,
"start": _dt_to_oncall_datetime(self.rotation_virtual_start),
"rotation_start": dt_to_oncall_datetime(self.start),
"until": dt_to_oncall_datetime(self.end) if self.end else None,
"start": dt_to_oncall_datetime(self.rotation_virtual_start),
"duration": int(self.rotation_turn_length.total_seconds()),
"frequency": frequency,
"interval": interval,
@ -381,9 +352,9 @@ class Layer:
"name": uuid4().hex,
"level": self.level,
"type": "rolling_users",
"rotation_start": _dt_to_oncall_datetime(self.start),
"until": _dt_to_oncall_datetime(self.end) if self.end else None,
"start": _dt_to_oncall_datetime(shift[0]),
"rotation_start": dt_to_oncall_datetime(self.start),
"until": dt_to_oncall_datetime(self.end) if self.end else None,
"start": dt_to_oncall_datetime(shift[0]),
"duration": int((shift[1] - shift[0]).total_seconds()),
"frequency": frequency,
"interval": interval,
@ -610,7 +581,7 @@ class Override:
return cls(start=start, end=end, user_id=override["user"]["id"])
def to_oncall_shift(self, user_id_map: dict[str, str]) -> dict:
start = _dt_to_oncall_datetime(self.start)
start = dt_to_oncall_datetime(self.start)
duration = int((self.end - self.start).total_seconds())
user_id = user_id_map[self.user_id]

View file

@ -1,14 +1,4 @@
from migrator.utils import find_by_id
def match_user(user: dict, oncall_users: list[dict]) -> None:
oncall_user = None
for candidate_user in oncall_users:
if user["email"].lower() == candidate_user["email"].lower():
oncall_user = candidate_user
break
user["oncall_user"] = oncall_user
from lib.utils import find_by_id
def match_users_for_schedule(schedule: dict, users: list[dict]) -> None:

View file

View file

@ -0,0 +1,130 @@
import time
import typing
from lib.network import api_call as _api_call
from lib.splunk import types
class SplunkOnCallAPIClient:
"""
https://portal.victorops.com/public/api-docs.html
"""
PUBLIC_API_BASE_URL = "https://api.victorops.com/api-public/"
def __init__(self, api_id: str, api_key: str):
self.api_id = api_id
self.api_key = api_key
def _api_call(
self,
method: str,
path: str,
response_key: typing.Optional[str] = None,
**kwargs,
):
"""
According to the docs, most API endpoints may only be called a maximum of 2 times per second
(hence the built-in `time.sleep`)
"""
time.sleep(0.5)
response = _api_call(
method,
self.PUBLIC_API_BASE_URL,
path,
headers={
"X-VO-Api-Id": self.api_id,
"X-VO-Api-Key": self.api_key,
},
**kwargs,
)
return response.json()[response_key] if response_key else response.json()
def fetch_user_paging_policies(
self, user_id: str
) -> typing.List[types.SplunkUserPagingPolicy]:
"""
https://portal.victorops.com/public/api-docs.html#!/User32Paging32Policies/get_api_public_v1_user_user_policies
"""
return self._api_call("GET", f"v1/user/{user_id}/policies", "policies")
def fetch_users(
self, include_paging_policies=True
) -> typing.List[types.SplunkUserWithPagingPolicies]:
"""
https://portal.victorops.com/public/api-docs.html#!/Users/get_api_public_v2_user
"""
users: typing.List[types.SplunkUserWithPagingPolicies] = self._api_call(
"GET", "v2/user", "users"
)
if include_paging_policies:
for user in users:
user["pagingPolicies"] = self.fetch_user_paging_policies(
user["username"]
)
return users
def fetch_team_members(self, team_slug: str) -> typing.List[types.SplunkTeamMember]:
"""
https://portal.victorops.com/public/api-docs.html#!/Teams/get_api_public_v1_team_team_members
"""
return self._api_call("GET", f"v1/team/{team_slug}/members", "members")
def fetch_teams(self, include_members=False) -> typing.List[types.SplunkTeam]:
"""
https://portal.victorops.com/public/api-docs.html#!/Teams/get_api_public_v1_team
"""
teams = self._api_call("GET", "v1/team")
if include_members:
for team in teams:
team["members"] = self.fetch_team_members(team["slug"])
return teams
def fetch_rotations(self, team_slug: str) -> typing.List[types.SplunkRotation]:
"""
https://portal.victorops.com/public/api-docs.html#!/Rotations/get_api_public_v2_team_team_rotations
"""
return self._api_call("GET", f"v2/team/{team_slug}/rotations", "rotations")
def fetch_schedules(self) -> typing.List[types.SplunkScheduleWithTeamAndRotations]:
"""
Schedules in Splunk must be fetched via a team, there is no
way to list all schedules
https://portal.victorops.com/public/api-docs.html#!/On45call/get_api_public_v2_team_team_oncall_schedule
"""
schedules: typing.List[types.SplunkScheduleWithTeamAndRotations] = []
for team in self.fetch_teams():
team_slug = team["slug"]
team_rotations = self.fetch_rotations(team_slug)
for team_schedule in self._api_call(
"GET", f"v2/team/{team_slug}/oncall/schedule", "schedules"
):
team_schedule["team"] = team
team_schedule["rotations"] = team_rotations
schedules.append(team_schedule)
return schedules
def fetch_escalation_policy(self, policy_id: str) -> types.SplunkEscalationPolicy:
"""
Fetch more detailed info about a specific escalation policy
https://portal.victorops.com/public/api-docs.html#!/Escalation32Policies/get_api_public_v1_policies_policy
"""
return self._api_call("GET", f"v1/policies/{policy_id}")
def fetch_escalation_policies(self) -> typing.List[types.SplunkEscalationPolicy]:
"""
https://portal.victorops.com/public/api-docs.html#!/Escalation32Policies/get_api_public_v1_policies
"""
return [
self.fetch_escalation_policy(policy["policy"]["slug"])
for policy in self._api_call("GET", "v1/policies", "policies")
]

View file

@ -0,0 +1,23 @@
import os
from lib.base_config import * # noqa: F401,F403
SPLUNK_API_ID = os.environ["SPLUNK_API_ID"]
SPLUNK_API_KEY = os.environ["SPLUNK_API_KEY"]
SPLUNK_TO_ONCALL_CONTACT_METHOD_MAP = {
"sms": "notify_by_sms",
"phone": "notify_by_phone_call",
"email": "notify_by_email",
"push": "notify_by_mobile_app",
}
# NOTE: currently we only support `rotation_group` and `user`
UNSUPPORTED_ESCALATION_POLICY_EXECUTION_TYPES = [
"email",
"webhook",
"policy_routing",
"rotation_group_next",
"rotation_group_previous",
"team_page",
]

View file

@ -0,0 +1,128 @@
from lib.common.report import TAB, WARNING_SIGN
from lib.common.resources.users import match_user
from lib.oncall.api_client import OnCallAPIClient
from lib.splunk.api_client import SplunkOnCallAPIClient
from lib.splunk.config import MODE, MODE_PLAN, SPLUNK_API_ID, SPLUNK_API_KEY
from lib.splunk.report import (
escalation_policy_report,
format_escalation_policy,
format_schedule,
format_user,
schedule_report,
user_report,
)
from lib.splunk.resources.escalation_policies import (
match_escalation_policy,
match_users_and_schedules_for_escalation_policy,
migrate_escalation_policy,
)
from lib.splunk.resources.paging_policies import migrate_paging_policies
from lib.splunk.resources.schedules import match_schedule, migrate_schedule
def migrate():
# NOTE: uncomment out the following code if we consider auto-migration of teams
# grafana_api_client = GrafanaAPIClient(
# GRAFANA_URL, GRAFANA_USERNAME, GRAFANA_PASSWORD
# )
splunk_client = SplunkOnCallAPIClient(SPLUNK_API_ID, SPLUNK_API_KEY)
print("▶ Fetching users...")
oncall_users = OnCallAPIClient.list_users_with_notification_rules()
splunk_users = splunk_client.fetch_users()
# NOTE: uncomment out the following code if we consider auto-migration of teams
# print("▶ Fetching teams...")
# splunk_teams = splunk_client.fetch_teams(include_members=True)
# oncall_teams = OnCallAPIClient.list_all("teams")
print("▶ Fetching schedules...")
oncall_schedules = OnCallAPIClient.list_all("schedules")
splunk_schedules = splunk_client.fetch_schedules()
print("▶ Fetching escalation policies...")
splunk_escalation_policies = splunk_client.fetch_escalation_policies()
oncall_escalation_chains = OnCallAPIClient.list_all("escalation_chains")
for splunk_user in splunk_users:
match_user(splunk_user, oncall_users)
splunk_username_to_oncall_user_id_map = {
u["username"]: u["oncall_user"]["id"] if u["oncall_user"] else None
for u in splunk_users
}
# NOTE: uncomment out the following code if we consider auto-migration of teams
# splunk_username_to_email_map = {
# user["username"]: user["email"] for user in splunk_users
# }
# for splunk_team in splunk_teams:
# match_team(splunk_team, oncall_teams)
# oncall_team_name_to_id_map = {team["name"]: team["id"] for team in oncall_teams}
# splunk_team_slug_to_grafana_team_id_map: typing.Dict[str, int] = {}
# NOTE: this team mapping won't quite work.. this creates and returns a mapping of
# Splunk team slugs to Grafana team IDs.. however, we actually need to map Splunk team
# slugs to OnCall team public primary keys (IDs)
#
# NOTE: we need to map this beforehand so that we can build the Splunk team slug to Grafana team id mapping
# print("▶ Migrating teams and team members...")
# for splunk_team in splunk_teams:
# member_emails = [
# splunk_username_to_email_map[member["username"]]
# for member in splunk_team["members"]
# if member["username"] in splunk_username_to_email_map
# ]
# grafana_team_id = grafana_api_client.idemopotently_create_team_and_add_users(splunk_team["name"], member_emails)
# print(TAB + format_team(splunk_team))
# splunk_team_slug_to_grafana_team_id_map[splunk_team["slug"]] = grafana_team_id
for splunk_schedule in splunk_schedules:
match_schedule(
splunk_schedule, oncall_schedules, splunk_username_to_oncall_user_id_map
)
for splunk_escalation_policy in splunk_escalation_policies:
match_escalation_policy(splunk_escalation_policy, oncall_escalation_chains)
match_users_and_schedules_for_escalation_policy(
splunk_escalation_policy, splunk_users, splunk_schedules
)
if MODE == MODE_PLAN:
print(user_report(splunk_users), end="\n\n")
print(schedule_report(splunk_schedules), end="\n\n")
print(escalation_policy_report(splunk_escalation_policies), end="\n\n")
return
print("▶ Migrating user paging policies...")
for splunk_user in splunk_users:
if splunk_user["oncall_user"]:
migrate_paging_policies(splunk_user)
print(TAB + format_user(splunk_user))
print("▶ Migrating schedules...")
for splunk_schedule in splunk_schedules:
if not splunk_schedule["migration_errors"]:
migrate_schedule(splunk_schedule, splunk_username_to_oncall_user_id_map)
print(TAB + format_schedule(splunk_schedule))
else:
print(
TAB
+ WARNING_SIGN
+ f" skipping {splunk_schedule['name']} due to migration errors, see `plan` output for more details"
)
print("▶ Migrating escalation policies...")
for splunk_escalation_policy in splunk_escalation_policies:
if (
not splunk_escalation_policy["unmatched_users"]
and not splunk_escalation_policy["flawed_schedules"]
):
migrate_escalation_policy(
splunk_escalation_policy, splunk_users, splunk_schedules
)
print(TAB + format_escalation_policy(splunk_escalation_policy))

View file

@ -0,0 +1,105 @@
import typing
from lib.common.report import ERROR_SIGN, SUCCESS_SIGN, TAB, WARNING_SIGN
from lib.splunk import types
def format_user(user: types.SplunkUserWithPagingPolicies) -> str:
result = f"{user['firstName']} {user['lastName']} ({user['email']})"
if user["oncall_user"]:
result = f"{SUCCESS_SIGN} {result}"
else:
result = f"{ERROR_SIGN} {result} — no Grafana OnCall user found with this email"
return result
def format_team(team: types.SplunkTeam) -> str:
return f"{SUCCESS_SIGN} {team['name']} ({team['slug']})"
def format_schedule(schedule: types.SplunkScheduleWithTeamAndRotations) -> str:
schedule_name = schedule["name"]
if schedule["migration_errors"]:
result = f"{ERROR_SIGN} {schedule_name} — some layers cannot be migrated"
else:
result = f"{SUCCESS_SIGN} {schedule_name}"
return result
def format_escalation_policy(policy: types.SplunkEscalationPolicy) -> str:
policy_name = policy["name"]
unmatched_users = policy["unmatched_users"]
flawed_schedules = policy["flawed_schedules"]
if unmatched_users and flawed_schedules:
result = f"{ERROR_SIGN} {policy_name} — policy references unmatched users and schedules that cannot be migrated"
elif unmatched_users:
result = f"{ERROR_SIGN} {policy_name} — policy references unmatched users"
elif flawed_schedules:
result = f"{ERROR_SIGN} {policy_name} — policy references schedules that cannot be migrated"
else:
result = f"{SUCCESS_SIGN} {policy_name}"
return result
def user_report(users: typing.List[types.SplunkUserWithPagingPolicies]) -> str:
result = "User notification rules report:"
for user in sorted(users, key=lambda u: bool(u["oncall_user"]), reverse=True):
result += f"\n{TAB}{format_user(user)}"
if user["oncall_user"] and user["pagingPolicies"]:
result += " (existing notification rules will be deleted)"
return result
def schedule_report(schedules: list[types.SplunkScheduleWithTeamAndRotations]) -> str:
result = "Schedule report:"
for schedule in sorted(schedules, key=lambda s: s["migration_errors"]):
result += "\n" + TAB + format_schedule(schedule)
if schedule["oncall_schedule"] and not schedule["migration_errors"]:
result += " (existing schedule with name '{}' will be deleted)".format(
schedule["oncall_schedule"]["name"]
)
for error in schedule["migration_errors"]:
result += "\n" + TAB * 2 + "{} {}".format(ERROR_SIGN, error)
return result
def escalation_policy_report(policies: list[types.SplunkEscalationPolicy]) -> str:
result = "Escalation policy report: "
for policy in sorted(
policies, key=lambda p: bool(p["unmatched_users"] or p["flawed_schedules"])
):
unmatched_users = policy["unmatched_users"]
flawed_schedules = policy["flawed_schedules"]
unsupported_escalation_entry_types = policy[
"unsupported_escalation_entry_types"
]
result += f"\n{TAB}{format_escalation_policy(policy)}"
if (
not unmatched_users
and not flawed_schedules
and policy["oncall_escalation_chain"]
):
result += f" (existing escalation chain with name '{policy['oncall_escalation_chain']['name']}' will be deleted)"
for user in unmatched_users:
result += f"\n{TAB * 2}{format_user(user)}"
for schedule in policy["flawed_schedules"]:
result += f"\n{TAB * 2}{format_schedule(schedule)}"
for entry_type in unsupported_escalation_entry_types:
result += f"\n{TAB * 2}{WARNING_SIGN} unsupported escalation entry type: {entry_type}"
return result

View file

@ -0,0 +1,150 @@
import typing
from lib.oncall import types as oncall_types
from lib.oncall.api_client import OnCallAPIClient
from lib.splunk import config, types
from lib.utils import find_by_id, transform_wait_delay
def match_escalation_policy(
policy: types.SplunkEscalationPolicy,
oncall_escalation_chains: typing.List[oncall_types.OnCallEscalationChain],
) -> None:
oncall_escalation_chain = None
for candidate in oncall_escalation_chains:
if candidate["name"].lower().strip() == policy["name"].lower().strip():
oncall_escalation_chain = candidate
policy["oncall_escalation_chain"] = oncall_escalation_chain
def match_users_and_schedules_for_escalation_policy(
policy: types.SplunkEscalationPolicy,
users: list[types.SplunkUserWithPagingPolicies],
schedules: list[types.SplunkScheduleWithTeamAndRotations],
) -> None:
unmatched_user_ids = set()
flawed_schedule_team_slugs = set()
unsupported_escalation_entry_types = set()
policy_team_slug = policy["slug"]
def _find_schedule(team_slug: str):
return find_by_id(schedules, team_slug, "team.slug")
for step in policy["steps"]:
for entry in step["entries"]:
execution_type = entry["executionType"]
if execution_type in config.UNSUPPORTED_ESCALATION_POLICY_EXECUTION_TYPES:
unsupported_escalation_entry_types.add(execution_type)
elif execution_type == "rotation_group":
if (schedule := _find_schedule(policy_team_slug)) is None:
continue
elif schedule["migration_errors"]:
flawed_schedule_team_slugs.add(policy_team_slug)
elif execution_type == "user":
target_id = entry["user"]["username"]
if (user := find_by_id(users, target_id, "username")) is None:
continue
elif not user["oncall_user"]:
unmatched_user_ids.add(target_id)
policy["unsupported_escalation_entry_types"] = list(
unsupported_escalation_entry_types
)
policy["unmatched_users"] = [
find_by_id(users, user_id, "username") for user_id in unmatched_user_ids
]
policy["flawed_schedules"] = [
_find_schedule(team_slug) for team_slug in flawed_schedule_team_slugs
]
def migrate_escalation_policy(
escalation_policy: types.SplunkEscalationPolicy,
users: typing.List[types.SplunkUserWithPagingPolicies],
schedules: typing.List[types.SplunkScheduleWithTeamAndRotations],
) -> None:
name = escalation_policy["name"]
team_slug = escalation_policy["slug"]
if (
oncall_escalation_chain := escalation_policy["oncall_escalation_chain"]
) is not None:
OnCallAPIClient.delete(f"escalation_chains/{oncall_escalation_chain['id']}")
oncall_escalation_chain: oncall_types.OnCallEscalationChain = (
OnCallAPIClient.create("escalation_chains", {"name": name, "team_id": None})
)
oncall_escalation_chain_id = oncall_escalation_chain["id"]
escalation_policy["oncall_escalation_chain"] = oncall_escalation_chain
oncall_escalation_policies: typing.List[
oncall_types.OnCallEscalationPolicyCreatePayload
] = []
for step in escalation_policy["steps"]:
oncall_escalation_policies.extend(
transform_step(
step, team_slug, oncall_escalation_chain_id, users, schedules
)
)
for policy in oncall_escalation_policies:
OnCallAPIClient.create("escalation_policies", policy)
def transform_step(
step: types.SplunkEscalationPolicyStep,
team_slug: str,
escalation_chain_id: str,
users: typing.List[types.SplunkUserWithPagingPolicies],
schedules: typing.List[types.SplunkScheduleWithTeamAndRotations],
) -> typing.List[oncall_types.OnCallEscalationPolicyCreatePayload]:
escalation_policies: typing.List[
oncall_types.OnCallEscalationPolicyCreatePayload
] = []
for entry in step["entries"]:
execution_type = entry["executionType"]
if execution_type in config.UNSUPPORTED_ESCALATION_POLICY_EXECUTION_TYPES:
continue
elif execution_type == "rotation_group":
schedule = find_by_id(schedules, team_slug, "team.slug")
if schedule is None:
continue
escalation_policies.append(
{
"escalation_chain_id": escalation_chain_id,
"type": "notify_on_call_from_schedule",
"notify_on_call_from_schedule": schedule["oncall_schedule"]["id"],
}
)
continue
elif execution_type == "user":
user = find_by_id(users, entry["user"]["username"], "username")
if user is None or not user["oncall_user"]:
continue
escalation_policies.append(
{
"escalation_chain_id": escalation_chain_id,
"type": "notify_persons",
"persons_to_notify": [user["oncall_user"]["id"]],
}
)
if (timeout := step["timeout"]) > 0 and escalation_policies:
escalation_policies.insert(
0,
{
"escalation_chain_id": escalation_chain_id,
"type": "wait",
"duration": transform_wait_delay(timeout),
},
)
return escalation_policies

View file

@ -0,0 +1,60 @@
import typing
from lib.oncall.api_client import OnCallAPIClient
from lib.splunk.config import SPLUNK_TO_ONCALL_CONTACT_METHOD_MAP
from lib.splunk.types import SplunkUserPagingPolicy, SplunkUserWithPagingPolicies
from lib.utils import transform_wait_delay
def migrate_paging_policies(user: SplunkUserWithPagingPolicies) -> None:
paging_policies = user["pagingPolicies"]
oncall_rules = transform_paging_policies(paging_policies, user["oncall_user"]["id"])
for rule in oncall_rules:
OnCallAPIClient.create("personal_notification_rules", rule)
if oncall_rules:
# delete old notification rules if any new rules were created
for rule in user["oncall_user"]["notification_rules"]:
OnCallAPIClient.delete("personal_notification_rules/{}".format(rule["id"]))
def transform_paging_policies(
paging_policies: typing.List[SplunkUserPagingPolicy], user_id: str
) -> typing.List[SplunkUserPagingPolicy]:
"""
Transform Splunk user paging policies to Grafana OnCall personal notification rules.
"""
paging_policies = sorted(paging_policies, key=lambda rule: rule["order"])
oncall_notification_rules = []
for idx, paging_policy in enumerate(paging_policies):
# don't add a delay at the end
if idx == len(paging_policies) - 1:
delay = None
else:
delay = paging_policy["timeout"]
oncall_notification_rules += transform_paging_policy(
paging_policy, delay, user_id
)
return oncall_notification_rules
def transform_paging_policy(
paging_policy: SplunkUserPagingPolicy, delay: typing.Optional[int], user_id: str
) -> list[dict]:
oncall_type = SPLUNK_TO_ONCALL_CONTACT_METHOD_MAP[paging_policy["contactType"]]
notify_rule = {"user_id": user_id, "type": oncall_type, "important": False}
if not delay:
return [notify_rule]
wait_rule = {
"user_id": user_id,
"type": "wait",
"duration": transform_wait_delay(delay),
"important": False,
}
return [notify_rule, wait_rule]

View file

@ -0,0 +1,393 @@
import datetime
import typing
from dataclasses import dataclass
from typing import Optional
from uuid import uuid4
from lib.oncall import types as oncall_types
from lib.oncall.api_client import OnCallAPIClient
from lib.splunk import types
from lib.utils import dt_to_oncall_datetime, duration_to_frequency_and_interval
TIME_ZONE = "UTC"
"""
Note: The Splunk schedule rotations do return a `timezone` attribute, but I don't think
we need to worry about this as the all of the timestamps that we touch are in UTC.
"""
ONCALL_SHIFT_WEB_SOURCE = 0 # alias for "web"
def generate_splunk_schedule_name(
schedule: types.SplunkScheduleWithTeamAndRotations,
) -> str:
return f"{schedule['policy']['name']} schedule"
def match_schedule(
schedule: types.SplunkScheduleWithTeamAndRotations,
oncall_schedules: list[oncall_types.OnCallSchedule],
user_id_map: dict[str, str],
) -> None:
schedule_name = generate_splunk_schedule_name(schedule)
schedule["name"] = schedule_name
oncall_schedule = None
for candidate in oncall_schedules:
if schedule_name.lower().strip() == candidate["name"].lower().strip():
oncall_schedule = candidate
_, errors = Schedule.from_dict(schedule).to_oncall_schedule(user_id_map)
schedule["migration_errors"] = errors
schedule["oncall_schedule"] = oncall_schedule
def migrate_schedule(
schedule: types.SplunkScheduleWithTeamAndRotations,
user_id_map: dict[str, str],
) -> None:
if schedule["oncall_schedule"]:
OnCallAPIClient.delete("schedules/{}".format(schedule["oncall_schedule"]["id"]))
schedule["oncall_schedule"] = Schedule.from_dict(schedule).migrate(user_id_map)
def _splunk_datetime_to_dt(text: str) -> datetime.datetime:
"""
Convert a Splunk datetime string to a datetime object.
"""
return datetime.datetime.strptime(text, "%Y-%m-%dT%H:%M:%SZ")
@dataclass
class Schedule:
"""
Utility class for converting a Splunk schedule to an OnCall schedule.
"""
name: str
rotation_shifts: list["RotationShift"]
overrides: list["Override"]
@classmethod
def from_dict(
cls, schedule: types.SplunkScheduleWithTeamAndRotations
) -> "Schedule":
"""
Create a Schedule object from a Splunk API response for a schedule.
"""
rotation_shifts = []
num_oncall_shift_layers = len(schedule["rotations"])
for idx, rotation in enumerate(schedule["rotations"]):
for shift in rotation["shifts"]:
rotation_shifts.append(
RotationShift.from_dict(shift, num_oncall_shift_layers - idx)
)
return cls(
name=generate_splunk_schedule_name(schedule),
rotation_shifts=rotation_shifts,
overrides=[
Override.from_dict(override) for override in schedule["overrides"]
],
)
def to_oncall_schedule(
self, user_id_map: dict[str, str]
) -> tuple[Optional[dict], list[str]]:
"""
Convert a Schedule object to an OnCall schedule.
Note that it also returns shifts, but these are not created at the same time as the schedule (see migrate method for more info).
"""
shifts = []
errors = []
for rotation_shift in self.rotation_shifts:
# Check if all users in the rotation exist in OnCall
missing_user_ids = [
user_id
for user_id in rotation_shift.user_ids
if user_id_map.get(user_id) is None
]
if missing_user_ids:
errors.append(
f"{rotation_shift.name}: Users with IDs {missing_user_ids} not found. The user(s) don't seem to exist in Grafana."
)
continue
shifts.append(rotation_shift.to_oncall_shift(user_id_map))
for override in self.overrides:
user_id = override.user_id
if user_id_map.get(user_id) is None:
errors.append(
f"Override: User with ID '{user_id}' not found. The user doesn't seem to exist in Grafana."
)
continue
shifts.append(override.to_oncall_shift(user_id_map))
if errors:
return None, errors
return {
"name": self.name,
"type": "web",
"team_id": None,
"time_zone": TIME_ZONE,
"shifts": shifts,
}, []
def migrate(self, user_id_map: dict[str, str]) -> dict:
"""
Create an OnCall schedule and its shifts.
First create the shifts, then create a schedule with shift IDs provided.
"""
schedule, errors = self.to_oncall_schedule(user_id_map)
assert not errors, "Unexpected errors: {}".format(errors)
# Create shifts in OnCall
shift_ids = [
OnCallAPIClient.create("on_call_shifts", shift)["id"]
for shift in schedule["shifts"]
]
# Create schedule in OnCall with shift IDs provided
schedule["shifts"] = shift_ids
new_schedule = OnCallAPIClient.create("schedules", schedule)
return new_schedule
@dataclass
class RotationShift:
"""
Utility class for converting a Splunk schedule rotation layer to OnCall shifts.
"""
name: str
level: int
shift_type: typing.Literal["std", "pho", "cstm"]
start: datetime.datetime
duration: datetime.timedelta
mask: types.SplunkRotationShiftMask
mask2: typing.Optional[types.SplunkRotationShiftMask]
mask3: typing.Optional[types.SplunkRotationShiftMask]
user_ids: list[str]
MONDAY = "m"
TUESDAY = "t"
WEDNESDAY = "w"
THURSDAY = "th"
FRIDAY = "f"
SATURDAY = "sa"
SUNDAY = "su"
SPLUNK_TO_ONCALL_DAY_MASK_MAP = {
SUNDAY: "SU",
MONDAY: "MO",
TUESDAY: "TU",
WEDNESDAY: "WE",
THURSDAY: "TH",
FRIDAY: "FR",
SATURDAY: "SA",
}
@classmethod
def from_dict(
cls, rotation_shift: types.SplunkRotationShift, level: int
) -> "RotationShift":
"""
Create a RotationShift object from a Splunk API response for a rotation.
Converts Splunk datetime strings to datetime objects for easier manipulation.
"""
return cls(
name=rotation_shift["label"],
level=level,
shift_type=rotation_shift["shifttype"],
start=_splunk_datetime_to_dt(rotation_shift["start"]),
duration=datetime.timedelta(days=rotation_shift["duration"]),
mask=rotation_shift["mask"],
mask2=rotation_shift.get("mask2"),
mask3=rotation_shift.get("mask3"),
user_ids=[u["username"] for u in rotation_shift["shiftMembers"]],
)
def _construct_datetime_from_date_and_mask_time(
self,
date: datetime.date,
mask: types.SplunkRotationShiftMask,
mask_key: typing.Literal["start", "end"],
) -> datetime.datetime:
mask_time = mask["time"][0][mask_key]
return datetime.datetime.combine(
date,
datetime.time(hour=mask_time["hour"], minute=mask_time["minute"]),
)
def _calculate_partial_day_duration_from_mask(self) -> datetime.timedelta:
"""
Calculate the duration of the shift based on the mask.
"""
today = datetime.date.today()
start_dt = self._construct_datetime_from_date_and_mask_time(
today, self.mask, "start"
)
end_dt = self._construct_datetime_from_date_and_mask_time(
today, self.mask, "end"
)
return end_dt - start_dt
def _calculate_by_days_from_partial_day_shift_mask(self) -> list[str]:
"""
Calculate the days of the week the shift occurs based on the mask.
"""
return [
self.SPLUNK_TO_ONCALL_DAY_MASK_MAP[day]
for day, is_active in self.mask["day"].items()
if is_active
]
def _next_day_of_week(
self, starting_date: datetime.date, day_of_week: str
) -> datetime.date:
# Define a mapping of day abbreviations to their corresponding datetime weekday values
SPLUNK_DAY_ABBREVIATION_TO_DATETIME_WEEKDAY_IDX_MAP = {
self.MONDAY: 0,
self.TUESDAY: 1,
self.WEDNESDAY: 2,
self.THURSDAY: 3,
self.FRIDAY: 4,
self.SATURDAY: 5,
self.SUNDAY: 6,
}
# Calculate the difference between starting_date's weekday and the desired weekday
days_until_next_day = (
SPLUNK_DAY_ABBREVIATION_TO_DATETIME_WEEKDAY_IDX_MAP[day_of_week]
- starting_date.weekday()
+ 7
) % 7
# Calculate the date of the next desired day of the week
return starting_date + datetime.timedelta(days=days_until_next_day)
def _get_sole_active_day_from_mask(
self, mask: types.SplunkRotationShiftMask
) -> str:
"""
making a big assumption here, but it looks like for multi-day shifts, mask and mask3
only have one active day each
"""
return [day for day, is_active in mask["day"].items() if is_active][0]
def _calculate_multi_day_duration_from_masks(self) -> datetime.timedelta:
start_mask = self.mask
end_mask = self.mask3
today = datetime.date.today()
shift_start_date = self._next_day_of_week(
today, self._get_sole_active_day_from_mask(start_mask)
)
shift_end_date = self._next_day_of_week(
shift_start_date, self._get_sole_active_day_from_mask(end_mask)
)
shift_start_dt = self._construct_datetime_from_date_and_mask_time(
shift_start_date, start_mask, "start"
)
shift_end_dt = self._construct_datetime_from_date_and_mask_time(
shift_end_date, end_mask, "end"
)
return shift_end_dt - shift_start_dt
def to_oncall_shift(self, user_id_map: dict[str, str]) -> typing.Dict:
frequency, interval = duration_to_frequency_and_interval(self.duration)
start = dt_to_oncall_datetime(self.start)
duration: datetime.timedelta
extra_kwargs = {}
if self.shift_type == "std":
duration = self.duration
elif self.shift_type == "pho":
duration = self._calculate_partial_day_duration_from_mask()
extra_kwargs[
"by_day"
] = self._calculate_by_days_from_partial_day_shift_mask()
elif self.shift_type == "cstm":
num_days = self.duration.days
if num_days != 7:
# NOTE: we don't currently support multi-day Splunk shifts with a "hand-off" greater than one week
# https://raintank-corp.slack.com/archives/C04JCU51NF8/p1714581046981109?thread_ts=1714580582.883559&cid=C04JCU51NF8
raise ValueError(
f"Multi-day shifts with a duration greater than 7 days are not supported: {num_days} days"
)
duration = self._calculate_multi_day_duration_from_masks()
else:
raise ValueError(f"Unknown shift type: {self.shift_type}")
return {
"name": self.name,
"team_id": None,
"level": self.level,
"type": "rolling_users",
"rotation_start": start,
"start": start,
"until": None,
"duration": int(duration.total_seconds()),
"frequency": frequency,
"interval": interval,
"rolling_users": [[user_id_map[user_id]] for user_id in self.user_ids],
"start_rotation_from_user_index": 0,
"week_start": "MO",
"time_zone": TIME_ZONE,
"source": ONCALL_SHIFT_WEB_SOURCE,
**extra_kwargs,
}
@dataclass
class Override:
start: datetime.datetime
end: datetime.datetime
user_id: str
@classmethod
def from_dict(cls, override: types.SplunkScheduleOverride) -> "Override":
# convert start and end to datetime objects in UTC
return cls(
start=datetime.datetime.fromisoformat(override["start"]).astimezone(
datetime.timezone.utc
),
end=datetime.datetime.fromisoformat(override["end"]).astimezone(
datetime.timezone.utc
),
user_id=override["overrideOnCallUser"]["username"],
)
def to_oncall_shift(self, user_id_map: dict[str, str]) -> dict:
start = dt_to_oncall_datetime(self.start)
duration = int((self.end - self.start).total_seconds())
user_id = user_id_map[self.user_id]
return {
"name": uuid4().hex,
"team_id": None,
"type": "override",
"time_zone": TIME_ZONE,
"start": start,
"duration": duration,
"rotation_start": start,
"users": [user_id],
"source": ONCALL_SHIFT_WEB_SOURCE,
}

View file

@ -0,0 +1,232 @@
import typing
from lib.oncall import types as oncall_types
class SplunkUserPagingPolicy(typing.TypedDict):
order: int
timeout: int
contactType: typing.Literal["sms", "phone", "email", "push"]
extId: str
class SplunkUserWithPagingPolicies(typing.TypedDict):
firstName: str
lastName: str
displayName: str
username: str
email: str
createdAt: str
pagingPolicies: typing.NotRequired[typing.List[SplunkUserPagingPolicy]]
oncall_user: typing.NotRequired[oncall_types.OnCallUser]
class SplunkTeamMember(typing.TypedDict):
username: str
firstName: str
lastName: str
displayName: str
version: int
verified: bool
class SplunkTeam(typing.TypedDict):
name: str
slug: str
memberCount: int
version: int
isDefaultTeam: bool
description: str
members: typing.NotRequired[typing.List[SplunkTeamMember]]
class SplunkSchedulePolicy(typing.TypedDict):
name: str
slug: str
class _SplunkScheduleOnCallUser(typing.TypedDict):
username: str
class SplunkRotationShiftMask(typing.TypedDict):
class SplunkRotationShiftMaskDay(typing.TypedDict):
m: bool
t: bool
w: bool
th: bool
f: bool
sa: bool
su: bool
class SplunkRotationShiftMaskTime(typing.TypedDict):
class SplunkRotationShiftMaskTime(typing.TypedDict):
hour: int
minute: int
start: SplunkRotationShiftMaskTime
end: SplunkRotationShiftMaskTime
day: SplunkRotationShiftMaskDay
time: typing.List[SplunkRotationShiftMaskTime]
class SplunkRotationShiftPeriod(typing.TypedDict):
start: str
end: str
username: str
isRoll: bool
memberSlug: str
class SplunkRotationShiftMember(typing.TypedDict):
username: str
slug: str
class SplunkRotationShift(typing.TypedDict):
label: str
timezone: str
start: str
duration: int
shifttype: typing.Literal["std", "pho", "cstm"]
"""
- `std`: 24/7 shift
- `pho`: partial day shift
- `cstm`: multi-day shift
"""
mask: SplunkRotationShiftMask
mask2: typing.NotRequired[SplunkRotationShiftMask]
mask3: typing.NotRequired[SplunkRotationShiftMask]
periods: typing.List[SplunkRotationShiftPeriod]
current: SplunkRotationShiftPeriod
next: SplunkRotationShiftPeriod
shiftMembers: typing.List[SplunkRotationShiftMember]
class SplunkRotation(typing.TypedDict):
label: str
totalMembersInRotation: int
shifts: typing.List[SplunkRotationShift]
class SplunkScheduleOverride(typing.TypedDict):
origOnCallUser: _SplunkScheduleOnCallUser
overrideOnCallUser: _SplunkScheduleOnCallUser
start: str
end: str
policy: SplunkSchedulePolicy
class SplunkSchedule(typing.TypedDict):
class _SplunkSchedule(typing.TypedDict):
start: str
end: str
onCallUser: _SplunkScheduleOnCallUser
onCallType: str
rolls: typing.List[typing.Any]
name: typing.NotRequired[str]
policy: SplunkSchedulePolicy
schedule: typing.List[_SplunkSchedule]
overrides: typing.List[SplunkScheduleOverride]
oncall_schedule: typing.NotRequired[oncall_types.OnCallSchedule]
migration_errors: typing.NotRequired[typing.List[str]]
class SplunkScheduleWithTeamAndRotations(SplunkSchedule):
team: SplunkTeam
rotations: typing.List[SplunkRotation]
class SplunkEscalationPolicyStepUser(typing.TypedDict):
class _SplunkEscalationPolicyStepUser(typing.TypedDict):
username: str
firstName: str
lastName: str
executionType: typing.Literal["user"]
user: _SplunkEscalationPolicyStepUser
class SplunkEscalationPolicyStepTeamPage(typing.TypedDict):
executionType: typing.Literal["team_page"]
class SplunkEscalationPolicyStepRotationGroup(typing.TypedDict):
"""
NOTE: we don't support migrating `rotation_group_next` and `rotation_group_previous` policy step types
"""
class _SplunkEscalationPolicyStepRotationGroup(typing.TypedDict):
slug: str
label: str
executionType: typing.Literal[
"rotation_group", "rotation_group_next", "rotation_group_previous"
]
rotationGroup: _SplunkEscalationPolicyStepRotationGroup
class SplunkEscalationPolicyStepEmail(typing.TypedDict):
"""
NOTE: we don't support migrating this type of escalation policy step
"""
class _SplunkEscalationPolicyStepEmail(typing.TypedDict):
address: str
executionType: typing.Literal["email"]
email: _SplunkEscalationPolicyStepEmail
class SplunkEscalationPolicyStepWebhook(typing.TypedDict):
"""
NOTE: we don't support migrating this type of escalation policy step
"""
class _SplunkEscalationPolicyStepWebhook(typing.TypedDict):
slug: str
label: str
executionType: typing.Literal["webhook"]
webhook: _SplunkEscalationPolicyStepWebhook
class SplunkEscalationPolicyStepPolicyRouting(typing.TypedDict):
"""
NOTE: we don't support migrating this type of escalation policy step
"""
class _SplunkEscalationPolicyStepPolicyRouting(typing.TypedDict):
policySlug: str
teamSlug: str
executionType: typing.Literal["policy_routing"]
targetPolicy: _SplunkEscalationPolicyStepPolicyRouting
SplunkEscalationPolicyStepEntry = typing.Union[
SplunkEscalationPolicyStepUser,
SplunkEscalationPolicyStepTeamPage,
SplunkEscalationPolicyStepRotationGroup,
SplunkEscalationPolicyStepEmail,
SplunkEscalationPolicyStepWebhook,
SplunkEscalationPolicyStepPolicyRouting,
]
class SplunkEscalationPolicyStep(typing.TypedDict):
timeout: int
entries: typing.List[SplunkEscalationPolicyStepEntry]
class SplunkEscalationPolicy(typing.TypedDict):
name: str
slug: str
steps: typing.List[SplunkEscalationPolicyStep]
ignoreCustomPagingPolicies: bool
oncall_escalation_chain: typing.NotRequired[oncall_types.OnCallEscalationChain]
unmatched_users: typing.NotRequired[typing.List[str]]
flawed_schedules: typing.NotRequired[typing.List[str]]
unsupported_escalation_entry_types: typing.NotRequired[typing.List[str]]

View file

View file

@ -1,11 +1,14 @@
from migrator.resources.escalation_policies import (
from lib.common.resources.users import match_user
from lib.pagerduty.resources.escalation_policies import (
match_escalation_policy,
match_escalation_policy_for_integration,
)
from migrator.resources.integrations import match_integration, match_integration_type
from migrator.resources.schedules import match_schedule
from migrator.resources.users import (
match_user,
from lib.pagerduty.resources.integrations import (
match_integration,
match_integration_type,
)
from lib.pagerduty.resources.schedules import match_schedule
from lib.pagerduty.resources.users import (
match_users_and_schedules_for_escalation_policy,
match_users_for_schedule,
)

View file

@ -1,7 +1,7 @@
from migrator.resources.escalation_policies import match_escalation_policy
from migrator.resources.integrations import match_integration
from migrator.resources.schedules import match_schedule
from migrator.resources.users import match_user
from lib.common.resources.users import match_user
from lib.pagerduty.resources.escalation_policies import match_escalation_policy
from lib.pagerduty.resources.integrations import match_integration
from lib.pagerduty.resources.schedules import match_schedule
def test_match_user_email_case_insensitive():

View file

@ -1,6 +1,6 @@
from migrator.resources.escalation_policies import match_escalation_policy
from migrator.resources.integrations import match_integration
from migrator.resources.schedules import match_schedule
from lib.pagerduty.resources.escalation_policies import match_escalation_policy
from lib.pagerduty.resources.integrations import match_integration
from lib.pagerduty.resources.schedules import match_schedule
def test_match_schedule_name_extra_spaces():

View file

@ -1,6 +1,6 @@
import datetime
from migrator.resources.schedules import Restriction, Schedule
from lib.pagerduty.resources.schedules import Restriction, Schedule
user_id_map = {
"USER_ID_1": "USER_ID_1",

View file

@ -0,0 +1,281 @@
import typing
from unittest import mock
import pytest
from lib.splunk.resources import escalation_policies
def _create_escalation_policy_step_entry(execution_type, data):
return {
"executionType": execution_type,
**data,
}
def _create_user_execution_type_entry(username):
return _create_escalation_policy_step_entry(
"user",
{
"user": {
"username": username,
},
},
)
def _create_oncall_escalation_chain(id: typing.Optional[str]):
return {
"id": id,
}
def _create_escalation_policy(
team_slug,
entries,
name="my escalation policy",
timeout=0,
oncall_escalation_chain_id=None,
other_data=None,
):
return {
"name": name,
"slug": team_slug,
"steps": [
{
"timeout": timeout,
"entries": entries,
}
],
"oncall_escalation_chain": _create_oncall_escalation_chain(
oncall_escalation_chain_id
)
if oncall_escalation_chain_id is not None
else None,
**(other_data or {}),
}
def _generate_oncall_escalation_policy_create_api_payload(
type, escalation_chain_id, data
):
return {"escalation_chain_id": escalation_chain_id, "type": type, **data}
def _generate_oncall_notify_persons_escalation_policy_create_api_payload(
escalation_chain_id, persons_to_notify
):
return _generate_oncall_escalation_policy_create_api_payload(
"notify_persons",
1,
{
"persons_to_notify": persons_to_notify,
},
)
@pytest.mark.parametrize(
"oncall_escalation_chains,expected",
[
([], None),
(
[
{
"id": 1,
"name": "foo",
},
],
{
"id": 1,
"name": "foo",
},
),
],
)
def test_match_escalation_policy(oncall_escalation_chains, expected):
policy = {"name": " FOO "}
escalation_policies.match_escalation_policy(policy, oncall_escalation_chains)
assert policy["oncall_escalation_chain"] == expected
def test_match_users_and_schedules_for_escalation_policy_unmatched_users():
policy = _create_escalation_policy(
"asdfasdf",
[
_create_user_execution_type_entry("foo"),
],
)
users = [
{
"username": "foo",
"oncall_user": {
"id": 1,
},
},
{"username": "bar", "oncall_user": None},
]
escalation_policies.match_users_and_schedules_for_escalation_policy(
policy, users, []
)
assert policy["unmatched_users"] == []
policy = _create_escalation_policy(
"asdasdf",
[
_create_user_execution_type_entry("foo"),
_create_user_execution_type_entry("bar"),
],
)
escalation_policies.match_users_and_schedules_for_escalation_policy(
policy, users, []
)
assert policy["unmatched_users"] == [{"username": "bar", "oncall_user": None}]
@pytest.mark.parametrize(
"execution_type,supported",
[
("rotation_group", True),
("user", True),
("email", False),
("webhook", False),
("policy_routing", False),
("rotation_group_next", False),
("rotation_group_previous", False),
("team_page", False),
],
)
def test_test_match_users_and_schedules_for_escalation_policy_unsupported_escalation_entry_types(
execution_type, supported
):
policy = _create_escalation_policy(
"asdfasdf",
[
_create_escalation_policy_step_entry(
execution_type, {"user": {"username": "foo"}}
),
],
)
escalation_policies.match_users_and_schedules_for_escalation_policy(policy, [], [])
assert (
policy["unsupported_escalation_entry_types"] == []
if supported
else [execution_type]
)
def test_match_users_and_schedules_for_escalation_policy_flawed_schedules():
flawed_schedule_team_slug = "zxcvzxcv"
flawed_schedule = {
"team": {
"slug": flawed_schedule_team_slug,
},
"migration_errors": ["blahblahblah"],
}
policy = _create_escalation_policy(
flawed_schedule_team_slug,
[
_create_escalation_policy_step_entry("rotation_group", {}),
],
)
schedules = [
{
"team": {
"slug": "asdfasdf",
},
"migration_errors": False,
},
{
"team": {
"slug": "qwerqwer",
},
"migration_errors": False,
},
flawed_schedule,
]
escalation_policies.match_users_and_schedules_for_escalation_policy(
policy, [], schedules
)
assert policy["flawed_schedules"] == [flawed_schedule]
@pytest.mark.parametrize(
"policy,delete_called,expected_oncall_escalation_policy_create_calls",
[
(
_create_escalation_policy(
"asdfasdf",
[
_create_user_execution_type_entry("foo"),
],
name="hello",
),
False,
[
_generate_oncall_notify_persons_escalation_policy_create_api_payload(
1, [1]
)
],
),
(
_create_escalation_policy(
"asdfasdf",
[
_create_user_execution_type_entry("foo"),
],
name="hello",
oncall_escalation_chain_id="1234",
),
True,
[
_generate_oncall_notify_persons_escalation_policy_create_api_payload(
1, [1]
)
],
),
],
)
@mock.patch("lib.splunk.resources.escalation_policies.OnCallAPIClient")
def test_migrate_escalation_policy(
mock_oncall_client,
policy,
delete_called,
expected_oncall_escalation_policy_create_calls,
):
mock_oncall_client.create.return_value = {"id": 1}
users = [
{
"username": "foo",
"oncall_user": {
"id": 1,
},
},
]
schedules = []
escalation_policies.migrate_escalation_policy(policy, users, schedules)
assert policy["oncall_escalation_chain"] == {"id": 1}
if delete_called:
mock_oncall_client.delete.assert_called_once_with("escalation_chains/1234")
else:
mock_oncall_client.delete.assert_not_called()
expected_oncall_api_create_calls_args = [
("escalation_policies", policy)
for policy in expected_oncall_escalation_policy_create_calls
]
expected_oncall_api_create_calls_args.append(
("escalation_chains", {"name": "hello", "team_id": None})
)
for expected_call_args in expected_oncall_api_create_calls_args:
mock_oncall_client.create.assert_any_call(*expected_call_args)

View file

@ -0,0 +1,142 @@
from unittest import mock
import pytest
from lib.splunk.resources.paging_policies import migrate_paging_policies
ONCALL_USER_ID = "UABCD12345"
ONCALL_NOTIFICATION_POLICY_ID = "UNP12345"
def _generate_splunk_paging_policy(order: int, contactType: str, timeout: int):
return {
"order": order,
"timeout": timeout,
"contactType": contactType,
"extId": "splunk",
}
def _generate_oncall_notification_rule(id: str, user_id: str, type: str, duration=None):
data = {
"id": id,
"user_id": user_id,
"type": type,
"important": False,
}
if duration:
data["duration"] = duration
return data
def _generate_create_oncall_notification_rule_payload(
user_id: str, type: str, duration=None
):
data = {
"user_id": user_id,
"type": type,
"important": False,
}
if duration:
data["duration"] = duration
return data
@pytest.mark.parametrize(
"splunk_paging_policies,existing_oncall_notification_rules,expected_oncall_notification_rules",
[
([], [], []),
(
[
_generate_splunk_paging_policy(0, "sms", 60),
],
[],
[
_generate_create_oncall_notification_rule_payload(
ONCALL_USER_ID,
"notify_by_sms",
),
],
),
(
[
_generate_splunk_paging_policy(0, "sms", 60),
],
[
_generate_oncall_notification_rule(
ONCALL_NOTIFICATION_POLICY_ID,
ONCALL_USER_ID,
"notify_by_sms",
),
],
[
_generate_create_oncall_notification_rule_payload(
ONCALL_USER_ID,
"notify_by_sms",
),
],
),
(
[
_generate_splunk_paging_policy(0, "sms", 60),
_generate_splunk_paging_policy(0, "sms", 60),
],
[
_generate_oncall_notification_rule(
ONCALL_NOTIFICATION_POLICY_ID,
ONCALL_USER_ID,
"notify_by_sms",
),
],
[
_generate_create_oncall_notification_rule_payload(
ONCALL_USER_ID,
"notify_by_sms",
),
_generate_create_oncall_notification_rule_payload(
ONCALL_USER_ID,
"wait",
duration=3600,
),
_generate_create_oncall_notification_rule_payload(
ONCALL_USER_ID,
"notify_by_sms",
),
],
),
],
)
@mock.patch("lib.splunk.resources.paging_policies.OnCallAPIClient")
def test_migrate_paging_policies(
mock_oncall_api_client,
splunk_paging_policies,
existing_oncall_notification_rules,
expected_oncall_notification_rules,
):
migrate_paging_policies(
{
"pagingPolicies": splunk_paging_policies,
"oncall_user": {
"id": ONCALL_USER_ID,
"notification_rules": existing_oncall_notification_rules,
},
}
)
mock_oncall_api_client.create.assert_has_calls(
[
mock.call("personal_notification_rules", payload)
for payload in expected_oncall_notification_rules
]
)
mock_oncall_api_client.delete.assert_has_calls(
[
mock.call(f"personal_notification_rules/{oncall_notification_rule['id']}")
for oncall_notification_rule in existing_oncall_notification_rules
]
)

View file

@ -0,0 +1,900 @@
from unittest import mock
import pytest
from lib.splunk.resources import schedules
SPLUNK_USER1_ID = "joeyorlando"
SPLUNK_USER2_ID = "joeyorlando1"
ONCALL_USER1_ID = "UABCD12345"
ONCALL_USER2_ID = "UGEF903940"
DEFAULT_SPLUNK_USERNAME_TO_ONCALL_USER_ID_MAP = {
SPLUNK_USER1_ID: ONCALL_USER1_ID,
SPLUNK_USER2_ID: ONCALL_USER2_ID,
}
ESCALATION_POLICY_NAME = "Example"
ROTATION_SHIFT_NAME = "simple rotation shift"
ONCALL_SCHEDULE_ID = "SABCD12345"
WEB_SOURCE = 0
def _generate_splunk_schedule_rotation_shift(
shift_type="std",
shift_name=ROTATION_SHIFT_NAME,
start="2024-04-23T13:00:00Z",
duration=7,
mask=None,
shift_members=None,
**kwargs,
):
return {
"label": shift_name,
"timezone": "America/Toronto",
"start": start,
"duration": duration,
"shifttype": shift_type,
"mask": mask,
"periods": [],
"current": {},
"next": {},
"shiftMembers": shift_members
or [
{
"username": SPLUNK_USER1_ID,
"slug": "rtm-YZTYP1lUogCUvftpIEpC",
},
{
"username": SPLUNK_USER2_ID,
"slug": "rtm-U8v2awNBaDTFlTavX86p",
},
],
**kwargs,
}
def _generate_splunk_schedule_rotation_shift_mask(
off_days=None, start_hour=0, start_minute=0, end_hour=0, end_minute=0
):
off_days = off_days or []
return {
"day": {
day: (day not in off_days) for day in ["m", "t", "w", "th", "f", "sa", "su"]
},
"time": [
{
"start": {
"hour": start_hour,
"minute": start_minute,
},
"end": {
"hour": end_hour,
"minute": end_minute,
},
},
],
}
def _generate_full_day_splunk_schedule_rotation_shift(**kwargs):
return _generate_splunk_schedule_rotation_shift(
shift_type="std",
mask=_generate_splunk_schedule_rotation_shift_mask(),
**kwargs,
)
def _generate_partial_day_splunk_schedule_rotation_shift(
mask_off_days=None,
mask_start_hour=0,
mask_start_minute=0,
mask_end_hour=0,
mask_end_minute=0,
duration=1,
**kwargs,
):
return _generate_splunk_schedule_rotation_shift(
shift_type="pho",
duration=duration,
mask=_generate_splunk_schedule_rotation_shift_mask(
off_days=mask_off_days,
start_hour=mask_start_hour,
start_minute=mask_start_minute,
end_hour=mask_end_hour,
end_minute=mask_end_minute,
),
**kwargs,
)
def _generate_multi_day_splunk_schedule_rotation_shift(mask, duration=7, **kwargs):
return _generate_splunk_schedule_rotation_shift(
shift_type="cstm",
duration=duration,
mask=mask,
**kwargs,
)
def _generate_splunk_schedule_rotation(shifts=None):
return {
"label": "abcdeg",
"totalMembersInRotation": 2,
"shifts": shifts or [_generate_full_day_splunk_schedule_rotation_shift()],
}
def _generate_splunk_schedule_override(
start="2024-05-01T15:00:00Z",
end="2024-05-01T21:00:00Z",
orig_oncall_user=SPLUNK_USER1_ID,
override_oncall_user=SPLUNK_USER2_ID,
):
return {
"origOnCallUser": {
"username": orig_oncall_user,
},
"overrideOnCallUser": {
"username": override_oncall_user,
},
"start": start,
"end": end,
"policy": {
"name": ESCALATION_POLICY_NAME,
"slug": "pol-GiTwwwVXzUDtJbPu",
},
}
def _generate_schedule_name(name=ESCALATION_POLICY_NAME):
return f"{name} schedule"
def _generate_splunk_schedule(rotations=None, overrides=None, oncall_schedule=None):
team_name = "First Team"
team_slug = "team-YVFyvc0gxEhVXEFj"
schedule = {
"name": _generate_schedule_name(),
"policy": {
"name": ESCALATION_POLICY_NAME,
"slug": team_slug,
},
"schedule": [
{
"onCallUser": {
"username": SPLUNK_USER1_ID,
},
"onCallType": "rotation_group",
"rotationName": "simple rotation",
"shiftName": "simple rotation shift",
"rolls": [],
},
],
"team": {
"_selfUrl": f"/api-public/v1/team/{team_slug}",
"_membersUrl": f"/api-public/v1/team/{team_slug}/members",
"_policiesUrl": f"/api-public/v1/team/{team_slug}/policies",
"_adminsUrl": f"/api-public/v1/team/{team_slug}/admins",
"name": team_name,
"slug": team_slug,
"memberCount": 2,
"version": 3,
"isDefaultTeam": False,
"description": "this is a description",
},
"rotations": rotations or [],
"overrides": overrides or [],
}
if oncall_schedule:
schedule["oncall_schedule"] = oncall_schedule
return schedule
def _generate_oncall_schedule(id=ONCALL_SCHEDULE_ID, name=ESCALATION_POLICY_NAME):
return {
"id": id,
"name": _generate_schedule_name(name),
}
def _generate_rotation_missing_user_error_msg(
user_id, rotation_name=ROTATION_SHIFT_NAME
):
return f"{rotation_name}: Users with IDs ['{user_id}'] not found. The user(s) don't seem to exist in Grafana."
def _generate_override_missing_user_error_msg(user_id):
return f"Override: User with ID '{user_id}' not found. The user doesn't seem to exist in Grafana."
def _generate_oncall_shift_create_api_payload(data):
shift_type = data["type"]
shift_base = {
"type": shift_type,
"team_id": None,
"time_zone": "UTC",
"source": WEB_SOURCE,
}
if shift_type == "rolling_users":
shift_base.update(
{
"start_rotation_from_user_index": 0,
"week_start": "MO",
"until": None,
}
)
return {**shift_base, **data}
def _generate_oncall_schedule_create_api_payload(name, num_expected_shifts):
return {
"name": name,
"type": "web",
"team_id": None,
"time_zone": "UTC",
# these would be the string IDs of the oncall shifts created.. we'll just expect any value
"shifts": [mock.ANY for _ in range(num_expected_shifts)],
}
@pytest.mark.parametrize(
"splunk_schedule,oncall_schedules,user_id_map,expected_oncall_schedule_match,expected_errors",
[
# oncall schedule matched, all user IDs matched, no errors
(
_generate_splunk_schedule(
rotations=[_generate_splunk_schedule_rotation()],
overrides=[_generate_splunk_schedule_override()],
),
[_generate_oncall_schedule()],
DEFAULT_SPLUNK_USERNAME_TO_ONCALL_USER_ID_MAP,
_generate_oncall_schedule(),
[],
),
# no oncall schedule matched
(
_generate_splunk_schedule(
rotations=[_generate_splunk_schedule_rotation()],
overrides=[_generate_splunk_schedule_override()],
),
[_generate_oncall_schedule(name="some other random name")],
DEFAULT_SPLUNK_USERNAME_TO_ONCALL_USER_ID_MAP,
None,
[],
),
# missing user ID in a shift
(
_generate_splunk_schedule(
rotations=[_generate_splunk_schedule_rotation()],
),
[_generate_oncall_schedule()],
{
SPLUNK_USER1_ID: "user1",
},
_generate_oncall_schedule(),
[_generate_rotation_missing_user_error_msg(SPLUNK_USER2_ID)],
),
# override with a missing user ID
(
_generate_splunk_schedule(
rotations=[],
overrides=[
_generate_splunk_schedule_override(
override_oncall_user=SPLUNK_USER2_ID
)
],
),
[_generate_oncall_schedule()],
{
SPLUNK_USER1_ID: "user1",
},
_generate_oncall_schedule(),
[_generate_override_missing_user_error_msg(SPLUNK_USER2_ID)],
),
],
)
def test_match_schedule(
splunk_schedule,
oncall_schedules,
user_id_map,
expected_oncall_schedule_match,
expected_errors,
):
schedules.match_schedule(splunk_schedule, oncall_schedules, user_id_map)
assert splunk_schedule["oncall_schedule"] == expected_oncall_schedule_match
assert splunk_schedule["migration_errors"] == expected_errors
@mock.patch("lib.splunk.resources.schedules.OnCallAPIClient")
@pytest.mark.parametrize(
"splunk_schedule,user_id_map,expected_oncall_schedule_id_to_be_deleted,expected_oncall_shift_create_calls,expected_oncall_schedule_create_call",
[
# matched oncall schedule, should be deleted
# w/ a basic rotation shift and an override
(
_generate_splunk_schedule(
rotations=[_generate_splunk_schedule_rotation()],
overrides=[_generate_splunk_schedule_override()],
oncall_schedule=_generate_oncall_schedule(id=ONCALL_SCHEDULE_ID),
),
DEFAULT_SPLUNK_USERNAME_TO_ONCALL_USER_ID_MAP,
ONCALL_SCHEDULE_ID,
[
# rotation on-call shift
_generate_oncall_shift_create_api_payload(
{
"name": ROTATION_SHIFT_NAME,
"level": 1,
"type": "rolling_users",
"rotation_start": "2024-04-23T13:00:00",
"start": "2024-04-23T13:00:00",
"duration": 604800,
"frequency": "weekly",
"interval": 1,
"rolling_users": [[ONCALL_USER1_ID], [ONCALL_USER2_ID]],
}
),
# override shift
_generate_oncall_shift_create_api_payload(
{
"name": mock.ANY,
"type": "override",
"rotation_start": "2024-05-01T15:00:00",
"start": "2024-05-01T15:00:00",
"duration": 21600,
"users": [ONCALL_USER2_ID],
}
),
],
_generate_oncall_schedule_create_api_payload(_generate_schedule_name(), 2),
),
# schedule w/ one rotation which has two shift layers
(
_generate_splunk_schedule(
rotations=[
_generate_splunk_schedule_rotation(
shifts=[
_generate_full_day_splunk_schedule_rotation_shift(
shift_name="shift1",
start="2024-04-23T13:00:00Z",
duration=7,
),
_generate_full_day_splunk_schedule_rotation_shift(
shift_name="shift2",
start="2024-04-29T13:00:00Z",
duration=2,
),
]
),
],
overrides=[_generate_splunk_schedule_override()],
oncall_schedule=_generate_oncall_schedule(id=ONCALL_SCHEDULE_ID),
),
DEFAULT_SPLUNK_USERNAME_TO_ONCALL_USER_ID_MAP,
ONCALL_SCHEDULE_ID,
[
# 7 day shift
_generate_oncall_shift_create_api_payload(
{
"name": "shift1",
"level": 1,
"type": "rolling_users",
"rotation_start": "2024-04-23T13:00:00",
"start": "2024-04-23T13:00:00",
"duration": 604800,
"frequency": "weekly",
"interval": 1,
"rolling_users": [[ONCALL_USER1_ID], [ONCALL_USER2_ID]],
}
),
# 2 day shift in same rotation as shift above
_generate_oncall_shift_create_api_payload(
{
"name": "shift2",
"level": 1,
"type": "rolling_users",
"rotation_start": "2024-04-29T13:00:00",
"start": "2024-04-29T13:00:00",
"duration": 172800,
"frequency": "daily",
"interval": 2,
"rolling_users": [[ONCALL_USER1_ID], [ONCALL_USER2_ID]],
}
),
# override shift
_generate_oncall_shift_create_api_payload(
{
"name": mock.ANY,
"type": "override",
"rotation_start": "2024-05-01T15:00:00",
"start": "2024-05-01T15:00:00",
"duration": 21600,
"users": [ONCALL_USER2_ID],
}
),
],
_generate_oncall_schedule_create_api_payload(_generate_schedule_name(), 3),
),
# schedule w/ one rotation which has a partial day shift layer
(
_generate_splunk_schedule(
rotations=[
_generate_splunk_schedule_rotation(
shifts=[
_generate_partial_day_splunk_schedule_rotation_shift(
shift_name="shift1",
start="2024-04-29T13:00:00Z",
mask_off_days=["sa", "su"],
mask_start_hour=9,
mask_start_minute=30,
mask_end_hour=16,
mask_end_minute=30,
),
]
),
],
overrides=[_generate_splunk_schedule_override()],
oncall_schedule=_generate_oncall_schedule(id=ONCALL_SCHEDULE_ID),
),
DEFAULT_SPLUNK_USERNAME_TO_ONCALL_USER_ID_MAP,
ONCALL_SCHEDULE_ID,
[
# monday to friday 9h30 - 16h30 shifts
_generate_oncall_shift_create_api_payload(
{
"name": "shift1",
"level": 1,
"type": "rolling_users",
"rotation_start": "2024-04-29T13:00:00",
"start": "2024-04-29T13:00:00",
"duration": 60 * 60 * 7, # 7 hours
"frequency": "daily",
"interval": 1,
"by_day": ["MO", "TU", "WE", "TH", "FR"],
"rolling_users": [[ONCALL_USER1_ID], [ONCALL_USER2_ID]],
}
),
_generate_oncall_shift_create_api_payload(
{
"name": mock.ANY,
"type": "override",
"rotation_start": "2024-05-01T15:00:00",
"start": "2024-05-01T15:00:00",
"duration": 21600,
"users": [ONCALL_USER2_ID],
}
),
],
_generate_oncall_schedule_create_api_payload(_generate_schedule_name(), 2),
),
# schedule w/ one rotation which has two partial day shift layers
(
_generate_splunk_schedule(
rotations=[
_generate_splunk_schedule_rotation(
shifts=[
_generate_partial_day_splunk_schedule_rotation_shift(
shift_name="shift1",
start="2024-04-29T13:00:00Z",
mask_off_days=["sa", "su"],
mask_start_hour=9,
mask_start_minute=30,
mask_end_hour=16,
mask_end_minute=30,
),
_generate_partial_day_splunk_schedule_rotation_shift(
shift_name="shift2",
start="2024-05-01T00:30:00Z",
mask_off_days=["m", "t", "f"],
mask_start_hour=20,
mask_start_minute=30,
mask_end_hour=23,
mask_end_minute=0,
),
]
),
],
overrides=[_generate_splunk_schedule_override()],
oncall_schedule=_generate_oncall_schedule(id=ONCALL_SCHEDULE_ID),
),
DEFAULT_SPLUNK_USERNAME_TO_ONCALL_USER_ID_MAP,
ONCALL_SCHEDULE_ID,
[
# monday to friday 9h30 - 16h30 shifts
_generate_oncall_shift_create_api_payload(
{
"name": "shift1",
"level": 1,
"type": "rolling_users",
"rotation_start": "2024-04-29T13:00:00",
"start": "2024-04-29T13:00:00",
"duration": 60 * 60 * 7, # 7 hours
"frequency": "daily",
"interval": 1,
"by_day": ["MO", "TU", "WE", "TH", "FR"],
"rolling_users": [[ONCALL_USER1_ID], [ONCALL_USER2_ID]],
}
),
# sun, wed, thurs, sat 20h30 - 23h shifts
_generate_oncall_shift_create_api_payload(
{
"name": "shift2",
"level": 1,
"type": "rolling_users",
"rotation_start": "2024-05-01T00:30:00",
"start": "2024-05-01T00:30:00",
"duration": int(60 * 60 * 2.5), # 2.5 hours
"frequency": "daily",
"interval": 1,
"by_day": ["WE", "TH", "SA", "SU"],
"rolling_users": [[ONCALL_USER1_ID], [ONCALL_USER2_ID]],
}
),
_generate_oncall_shift_create_api_payload(
{
"name": mock.ANY,
"type": "override",
"rotation_start": "2024-05-01T15:00:00",
"start": "2024-05-01T15:00:00",
"duration": 21600,
"users": [ONCALL_USER2_ID],
}
),
],
_generate_oncall_schedule_create_api_payload(_generate_schedule_name(), 3),
),
# schedule w/ one rotation which has one partial day rotation w/ handoff every 3 days
(
_generate_splunk_schedule(
rotations=[
_generate_splunk_schedule_rotation(
shifts=[
_generate_partial_day_splunk_schedule_rotation_shift(
shift_name="partial day 3 day handoff",
start="2024-04-29T13:00:00Z",
mask_off_days=["sa", "su"],
mask_start_hour=9,
mask_start_minute=0,
mask_end_hour=17,
mask_end_minute=0,
duration=3,
),
]
),
],
overrides=[_generate_splunk_schedule_override()],
oncall_schedule=_generate_oncall_schedule(id=ONCALL_SCHEDULE_ID),
),
DEFAULT_SPLUNK_USERNAME_TO_ONCALL_USER_ID_MAP,
ONCALL_SCHEDULE_ID,
[
_generate_oncall_shift_create_api_payload(
{
"name": "partial day 3 day handoff",
"level": 1,
"type": "rolling_users",
"rotation_start": "2024-04-29T13:00:00",
"start": "2024-04-29T13:00:00",
"duration": 60 * 60 * 8, # 8 hours
"frequency": "daily",
"interval": 3,
"by_day": ["MO", "TU", "WE", "TH", "FR"],
"rolling_users": [[ONCALL_USER1_ID], [ONCALL_USER2_ID]],
}
),
_generate_oncall_shift_create_api_payload(
{
"name": mock.ANY,
"type": "override",
"rotation_start": "2024-05-01T15:00:00",
"start": "2024-05-01T15:00:00",
"duration": 21600,
"users": [ONCALL_USER2_ID],
}
),
],
_generate_oncall_schedule_create_api_payload(_generate_schedule_name(), 2),
),
# schedule w/ one rotation which has multiple multi-day shifts
(
_generate_splunk_schedule(
rotations=[
_generate_splunk_schedule_rotation(
shifts=[
_generate_multi_day_splunk_schedule_rotation_shift(
shift_name="multi day shift1",
start="2024-04-29T13:00:00Z",
mask=_generate_splunk_schedule_rotation_shift_mask(
off_days=["m", "t", "th", "f", "sa", "su"],
start_hour=17,
start_minute=0,
end_hour=0,
end_minute=0,
),
mask2=_generate_splunk_schedule_rotation_shift_mask(
off_days=["m", "t", "w", "sa", "su"],
start_hour=0,
start_minute=0,
end_hour=0,
end_minute=0,
),
mask3=_generate_splunk_schedule_rotation_shift_mask(
off_days=["m", "t", "w", "th", "f", "su"],
start_hour=0,
start_minute=0,
end_hour=9,
end_minute=0,
),
),
_generate_multi_day_splunk_schedule_rotation_shift(
shift_name="multi day shift2",
start="2024-04-29T13:00:00Z",
mask=_generate_splunk_schedule_rotation_shift_mask(
off_days=["m", "t", "th", "f", "sa", "su"],
start_hour=17,
start_minute=0,
end_hour=0,
end_minute=0,
),
mask2=_generate_splunk_schedule_rotation_shift_mask(
off_days=["m", "t", "w", "th", "f", "sa", "su"],
start_hour=0,
start_minute=0,
end_hour=0,
end_minute=0,
),
mask3=_generate_splunk_schedule_rotation_shift_mask(
off_days=["m", "t", "w", "f", "sa", "su"],
start_hour=0,
start_minute=0,
end_hour=9,
end_minute=0,
),
),
]
),
],
overrides=[],
oncall_schedule=_generate_oncall_schedule(id=ONCALL_SCHEDULE_ID),
),
DEFAULT_SPLUNK_USERNAME_TO_ONCALL_USER_ID_MAP,
ONCALL_SCHEDULE_ID,
[
_generate_oncall_shift_create_api_payload(
{
"name": "multi day shift1",
"level": 1,
"type": "rolling_users",
"rotation_start": "2024-04-29T13:00:00",
"start": "2024-04-29T13:00:00",
"duration": 60 * 60 * 64, # 8 hours
"frequency": "weekly",
"interval": 1,
"rolling_users": [[ONCALL_USER1_ID], [ONCALL_USER2_ID]],
}
),
_generate_oncall_shift_create_api_payload(
{
"name": "multi day shift2",
"level": 1,
"type": "rolling_users",
"rotation_start": "2024-04-29T13:00:00",
"start": "2024-04-29T13:00:00",
"duration": 60 * 60 * 16, # 16 hours
"frequency": "weekly",
"interval": 1,
"rolling_users": [[ONCALL_USER1_ID], [ONCALL_USER2_ID]],
}
),
],
_generate_oncall_schedule_create_api_payload(_generate_schedule_name(), 2),
),
],
)
def test_migrate_schedule(
mock_oncall_client,
splunk_schedule,
user_id_map,
expected_oncall_schedule_id_to_be_deleted,
expected_oncall_shift_create_calls,
expected_oncall_schedule_create_call,
):
schedules.migrate_schedule(splunk_schedule, user_id_map)
if expected_oncall_schedule_id_to_be_deleted is not None:
mock_oncall_client.delete.assert_called_once_with(
f"schedules/{expected_oncall_schedule_id_to_be_deleted}"
)
expected_oncall_api_create_calls_args = [
("on_call_shifts", shift) for shift in expected_oncall_shift_create_calls
]
expected_oncall_api_create_calls_args.append(
("schedules", expected_oncall_schedule_create_call)
)
for expected_call_args in expected_oncall_api_create_calls_args:
mock_oncall_client.create.assert_any_call(*expected_call_args)
@pytest.mark.parametrize(
"rotation_shift_duration_days,is_allowed",
[
# handoff every week, allowed
(7, True),
# handoff every two weeks, not currently supported
(14, False),
],
)
def test_migrate_schedule_multi_day_shift_with_non_weekly_handoff_not_supported(
rotation_shift_duration_days, is_allowed
):
shift_name = "test shift name"
multi_day_rotation_shift = schedules.RotationShift.from_dict(
_generate_multi_day_splunk_schedule_rotation_shift(
shift_name=shift_name,
start="2024-04-29T13:00:00Z",
mask=_generate_splunk_schedule_rotation_shift_mask(
off_days=["m", "t", "th", "f", "sa", "su"],
start_hour=17,
start_minute=0,
end_hour=0,
end_minute=0,
),
mask2=_generate_splunk_schedule_rotation_shift_mask(
off_days=["m", "t", "w", "sa", "su"],
start_hour=0,
start_minute=0,
end_hour=0,
end_minute=0,
),
mask3=_generate_splunk_schedule_rotation_shift_mask(
off_days=["m", "t", "w", "th", "f", "su"],
start_hour=0,
start_minute=0,
end_hour=9,
end_minute=0,
),
duration=rotation_shift_duration_days,
),
1,
)
if is_allowed:
try:
oncall_shift = multi_day_rotation_shift.to_oncall_shift(
DEFAULT_SPLUNK_USERNAME_TO_ONCALL_USER_ID_MAP
)
assert oncall_shift == _generate_oncall_shift_create_api_payload(
{
"name": shift_name,
"level": 1,
"type": "rolling_users",
"rotation_start": "2024-04-29T13:00:00",
"start": "2024-04-29T13:00:00",
"duration": 60 * 60 * 64, # 64 hours
"frequency": "weekly",
"interval": 1,
"rolling_users": [[ONCALL_USER1_ID], [ONCALL_USER2_ID]],
}
)
except: # noqa: E722
pytest.fail(
f"Multi-day rotation shift with handoff every {rotation_shift_duration_days} days should be allowed"
)
else:
with pytest.raises(ValueError) as e:
multi_day_rotation_shift.to_oncall_shift(
DEFAULT_SPLUNK_USERNAME_TO_ONCALL_USER_ID_MAP
)
assert (
str(e.value)
== f"Multi-day shifts with a duration greater than 7 days are not supported: {rotation_shift_duration_days} days"
)
@pytest.mark.parametrize(
"mask,mask2,mask3,expected_duration_seconds",
[
# wednesday 17h to saturday 9h
(
_generate_splunk_schedule_rotation_shift_mask(
off_days=["m", "t", "th", "f", "sa", "su"],
start_hour=17,
start_minute=0,
end_hour=0,
end_minute=0,
),
_generate_splunk_schedule_rotation_shift_mask(
off_days=["m", "t", "w", "sa", "su"],
start_hour=0,
start_minute=0,
end_hour=0,
end_minute=0,
),
_generate_splunk_schedule_rotation_shift_mask(
off_days=["m", "t", "w", "th", "f", "su"],
start_hour=0,
start_minute=0,
end_hour=9,
end_minute=0,
),
60 * 60 * 64, # 64 hours, in seconds
),
# wednesday 17h to thursday 9h
(
_generate_splunk_schedule_rotation_shift_mask(
off_days=["m", "t", "th", "f", "sa", "su"],
start_hour=17,
start_minute=0,
end_hour=0,
end_minute=0,
),
_generate_splunk_schedule_rotation_shift_mask(
off_days=["m", "t", "w", "th", "f", "sa", "su"],
start_hour=0,
start_minute=0,
end_hour=0,
end_minute=0,
),
_generate_splunk_schedule_rotation_shift_mask(
off_days=["m", "t", "w", "f", "sa", "su"],
start_hour=0,
start_minute=0,
end_hour=9,
end_minute=0,
),
60 * 60 * 16, # 16 hours, in seconds
),
# friday 17h to monday 9h
(
_generate_splunk_schedule_rotation_shift_mask(
off_days=["m", "t", "w", "th", "sa", "su"],
start_hour=17,
start_minute=0,
end_hour=0,
end_minute=0,
),
_generate_splunk_schedule_rotation_shift_mask(
off_days=["m", "t", "w", "th", "f", "sa", "su"],
start_hour=0,
start_minute=0,
end_hour=0,
end_minute=0,
),
_generate_splunk_schedule_rotation_shift_mask(
off_days=["t", "w", "th", "f", "sa", "su"],
start_hour=0,
start_minute=0,
end_hour=9,
end_minute=0,
),
60 * 60 * 64, # 64 hours, in seconds
),
],
)
def test_calculate_multi_day_duration_from_masks_for_multi_day_rotation_shift(
mask, mask2, mask3, expected_duration_seconds
):
rotation_shift = schedules.RotationShift.from_dict(
_generate_multi_day_splunk_schedule_rotation_shift(
shift_name="asdfasdf",
start="2024-04-29T13:00:00Z",
mask=mask,
mask2=mask2,
mask3=mask3,
),
1,
)
calculated_duration = rotation_shift._calculate_multi_day_duration_from_masks()
assert int(calculated_duration.total_seconds()) == expected_duration_seconds

View file

@ -0,0 +1,43 @@
import pytest
from lib import utils
def test_find_by_id():
data = [
{"id": "1", "name": "Alice", "details": {"age": 30, "location": "USA"}},
{"id": "2", "name": "Bob", "details": {"age": 40, "location": "UK"}},
{"id": "3", "name": "Charlie", "details": {"age": 50, "location": "Canada"}},
]
# Test case: id exists in the data
result = utils.find_by_id(data, "1")
assert result == {
"id": "1",
"name": "Alice",
"details": {"age": 30, "location": "USA"},
}
# Test case: id does not exist in the data
result = utils.find_by_id(data, "4")
assert result is None
# Test case: data is empty
result = utils.find_by_id([], "1")
assert result is None
# Test case: nested key exists
result = utils.find_by_id(data, "USA", "details.location")
assert result == {
"id": "1",
"name": "Alice",
"details": {"age": 30, "location": "USA"},
}
# Test case: nested key does not exist
result = utils.find_by_id(data, "Australia", "details.location")
assert result is None
# Test case: data is None
with pytest.raises(TypeError):
utils.find_by_id(None, "1")

View file

@ -0,0 +1,131 @@
import datetime
import typing
from lib.base_config import ONCALL_DELAY_OPTIONS
T = typing.TypeVar("T")
def find(
lst: list[T], cond: typing.Callable[[T], bool], reverse: bool = False
) -> typing.Optional[int]:
indices = range(len(lst))
if reverse:
indices = indices[::-1]
for idx in indices:
if cond(lst[idx]):
return idx
return None
def split(lst: list[T], cond: typing.Callable[[T], bool]) -> list[list[T]]:
idx = find(lst, cond)
if idx is None:
return [lst]
return [lst[: idx + 1]] + split(lst[idx + 1 :], cond)
def remove_duplicates(
lst: list[T],
split_condition: typing.Callable[[T], bool],
duplicate_condition: typing.Callable[[T], bool],
) -> list[T]:
result = []
chunks = split(lst, split_condition)
for chunk in chunks:
count = len([element for element in chunk if duplicate_condition(element)])
if count > 1:
for _ in range(count - 1):
idx = find(chunk, duplicate_condition, reverse=True)
del chunk[idx]
result += chunk
return result
def find_by_id(
objects: typing.List[T], value: typing.Any, key="id"
) -> typing.Optional[T]:
"""
Allows finding an object in a list of objects.
Returns the first object whose value for `key` matches the given `value`. Supports
nested keys by using '.' as a separator.
"""
for obj in objects:
# Split the key by '.' to handle nested keys
keys = key.split(".")
# Initialize current_value to the current object
current_value = obj
# Iterate through the keys to access nested values
for k in keys:
# If the current value is a dictionary and the key exists, update current_value
if isinstance(current_value, dict) and k in current_value:
current_value = current_value[k]
# If the current value is a list, search each element for the key
elif isinstance(current_value, list):
nested_objs = [
item[k]
for item in current_value
if isinstance(item, dict) and k in item
]
if nested_objs:
current_value = nested_objs[0]
else:
current_value = None
# If the key doesn't exist or the current value is not a dictionary, break the loop
else:
current_value = None
break
# If the current value matches the given value, return the object
if current_value == value:
return obj
# If no object matches, return None
return None
def find_closest_value(lst: list[int], value: int) -> int:
return min(lst, key=lambda v: abs(v - value))
def transform_wait_delay(delay: int) -> int:
return find_closest_value(ONCALL_DELAY_OPTIONS, delay) * 60
def duration_to_frequency_and_interval(duration: datetime.timedelta) -> tuple[str, int]:
"""
Convert a duration to shift frequency and interval.
For example, 1 day duration returns ("daily", 1), 14 days returns ("weekly", 2),
"""
seconds = int(duration.total_seconds())
assert seconds >= 3600, "Rotation must be at least 1 hour"
hours = seconds // 3600
if hours >= 24 and hours % 24 == 0:
days = hours // 24
if days >= 7 and days % 7 == 0:
weeks = days // 7
return "weekly", weeks
else:
return "daily", days
else:
return "hourly", hours
def dt_to_oncall_datetime(dt: datetime.datetime) -> str:
"""
Convert a datetime object to an OnCall datetime string.
"""
return dt.strftime("%Y-%m-%dT%H:%M:%S")

13
tools/migrators/main.py Normal file
View file

@ -0,0 +1,13 @@
from lib.base_config import MIGRATING_FROM, PAGERDUTY, SPLUNK
if __name__ == "__main__":
if MIGRATING_FROM == PAGERDUTY:
from lib.pagerduty.migrate import migrate
migrate()
elif MIGRATING_FROM == SPLUNK:
from lib.splunk.migrate import migrate
migrate()
else:
raise ValueError("Invalid MIGRATING_FROM value")

View file

@ -3,3 +3,6 @@ env =
D:PAGERDUTY_API_TOKEN=test
D:ONCALL_API_TOKEN=test
D:ONCALL_API_URL=test
D:MIGRATING_FROM=pagerduty
D:SPLUNK_API_ID=abcd
D:SPLUNK_API_KEY=abcd

View file

@ -1,65 +0,0 @@
from typing import Callable, Optional, TypeVar
from migrator.config import ONCALL_DELAY_OPTIONS
T = TypeVar("T")
def find(
lst: list[T], cond: Callable[[T], bool], reverse: bool = False
) -> Optional[int]:
indices = range(len(lst))
if reverse:
indices = indices[::-1]
for idx in indices:
if cond(lst[idx]):
return idx
return None
def split(lst: list[T], cond: Callable[[T], bool]) -> list[list[T]]:
idx = find(lst, cond)
if idx is None:
return [lst]
return [lst[: idx + 1]] + split(lst[idx + 1 :], cond)
def remove_duplicates(
lst: list[T],
split_condition: Callable[[T], bool],
duplicate_condition: Callable[[T], bool],
) -> list[T]:
result = []
chunks = split(lst, split_condition)
for chunk in chunks:
count = len([element for element in chunk if duplicate_condition(element)])
if count > 1:
for _ in range(count - 1):
idx = find(chunk, duplicate_condition, reverse=True)
del chunk[idx]
result += chunk
return result
def find_by_id(resources: list[dict], resource_id: str) -> Optional[dict]:
for resource in resources:
if resource["id"] == resource_id:
return resource
return None
def find_closest_value(lst: list[int], value: int) -> int:
return min(lst, key=lambda v: abs(v - value))
def transform_wait_delay(delay: int) -> int:
return find_closest_value(ONCALL_DELAY_OPTIONS, delay) * 60

View file

@ -1,18 +0,0 @@
# PagerDuty migrator scripts
When running the migrator in `plan` mode, it can potentially show that some users cannot be matched
(meaning that there are no users in Grafana with the same email as in PagerDuty).
If there is a large number of unmatched users, it can be easier to use the following script that
automatically creates missing Grafana users:
```bash
docker run --rm \
-e PAGERDUTY_API_TOKEN="<PAGERDUTY_API_TOKEN>" \
-e GRAFANA_URL="http://localhost:3000" \
-e GRAFANA_USERNAME="admin" \
-e GRAFANA_PASSWORD="admin" \
pd-oncall-migrator python /app/scripts/add_users_pagerduty_to_grafana.py
```
The script will create users with random passwords, so they will need to reset their passwords later in Grafana.

View file

@ -1,52 +0,0 @@
import os
import secrets
import sys
from urllib.parse import urljoin
import requests
from pdpyras import APISession
PAGERDUTY_API_TOKEN = os.environ["PAGERDUTY_API_TOKEN"]
PATH_USERS_GRAFANA = "/api/admin/users"
GRAFANA_URL = os.environ["GRAFANA_URL"] # Example: http://localhost:3000
GRAFANA_USERNAME = os.environ["GRAFANA_USERNAME"]
GRAFANA_PASSWORD = os.environ["GRAFANA_PASSWORD"]
SUCCESS_SIGN = ""
ERROR_SIGN = ""
def list_pagerduty_users():
session = APISession(PAGERDUTY_API_TOKEN)
users = session.list_all("users")
for user in users:
password = secrets.token_urlsafe(15)
username = user["email"].split("@")[0]
json = {
"name": user["name"],
"email": user["email"],
"login": username,
"password": password,
}
create_grafana_user(json)
def create_grafana_user(data):
url = urljoin(GRAFANA_URL, PATH_USERS_GRAFANA)
response = requests.request(
"POST", url, auth=(GRAFANA_USERNAME, GRAFANA_PASSWORD), json=data
)
if response.status_code == 200:
print(SUCCESS_SIGN + " User created: " + data["login"])
elif response.status_code == 401:
sys.exit(ERROR_SIGN + " Invalid username or password.")
elif response.status_code == 412:
print(ERROR_SIGN + " User " + data["login"] + " already exists.")
else:
print("{} {}".format(ERROR_SIGN, response.text))
if __name__ == "__main__":
list_pagerduty_users()