# What this PR does Refactors the PagerDuty migration script to be a bit more generic + adds a migration script to migrate from Splunk OnCall (VictorOps) tldr; ```bash ❯ docker build -t oncall-migrator . [+] Building 0.4s (10/10) FINISHED ❯ docker run --rm \ -e MIGRATING_FROM="pagerduty" \ -e MODE="plan" \ -e ONCALL_API_URL="http://localhost:8080" \ -e ONCALL_API_TOKEN="<ONCALL_API_TOKEN>" \ -e PAGERDUTY_API_TOKEN="<PAGERDUTY_API_TOKEN>" \ oncall-migrator running pagerduty migration script... ❯ docker run --rm \ -e MIGRATING_FROM="splunk" \ -e MODE="plan" \ -e ONCALL_API_URL="http://localhost:8080" \ -e ONCALL_API_TOKEN="<ONCALL_API_TOKEN>" \ -e SPLUNK_API_ID="<SPLUNK_API_ID>" \ -e SPLUNK_API_KEY="<SPLUNK_API_KEY>" \ oncall-migrator migrating from splunk oncall... ``` https://www.loom.com/share/a855062d436a4ef79f030e22528d8c71 ## Checklist - [x] Unit, integration, and e2e (if applicable) tests updated - [x] Documentation added (or `pr:no public docs` PR label added if not required) - [x] Added the relevant release notes label (see labels prefixed w/ `release:`). These labels dictate how your PR will show up in the autogenerated release notes.
131 lines
3.7 KiB
Python
131 lines
3.7 KiB
Python
import datetime
|
|
import typing
|
|
|
|
from lib.base_config import ONCALL_DELAY_OPTIONS
|
|
|
|
T = typing.TypeVar("T")
|
|
|
|
|
|
def find(
|
|
lst: list[T], cond: typing.Callable[[T], bool], reverse: bool = False
|
|
) -> typing.Optional[int]:
|
|
indices = range(len(lst))
|
|
|
|
if reverse:
|
|
indices = indices[::-1]
|
|
|
|
for idx in indices:
|
|
if cond(lst[idx]):
|
|
return idx
|
|
|
|
return None
|
|
|
|
|
|
def split(lst: list[T], cond: typing.Callable[[T], bool]) -> list[list[T]]:
|
|
idx = find(lst, cond)
|
|
|
|
if idx is None:
|
|
return [lst]
|
|
|
|
return [lst[: idx + 1]] + split(lst[idx + 1 :], cond)
|
|
|
|
|
|
def remove_duplicates(
|
|
lst: list[T],
|
|
split_condition: typing.Callable[[T], bool],
|
|
duplicate_condition: typing.Callable[[T], bool],
|
|
) -> list[T]:
|
|
result = []
|
|
chunks = split(lst, split_condition)
|
|
|
|
for chunk in chunks:
|
|
count = len([element for element in chunk if duplicate_condition(element)])
|
|
if count > 1:
|
|
for _ in range(count - 1):
|
|
idx = find(chunk, duplicate_condition, reverse=True)
|
|
del chunk[idx]
|
|
|
|
result += chunk
|
|
|
|
return result
|
|
|
|
|
|
def find_by_id(
|
|
objects: typing.List[T], value: typing.Any, key="id"
|
|
) -> typing.Optional[T]:
|
|
"""
|
|
Allows finding an object in a list of objects.
|
|
|
|
Returns the first object whose value for `key` matches the given `value`. Supports
|
|
nested keys by using '.' as a separator.
|
|
"""
|
|
|
|
for obj in objects:
|
|
# Split the key by '.' to handle nested keys
|
|
keys = key.split(".")
|
|
# Initialize current_value to the current object
|
|
current_value = obj
|
|
|
|
# Iterate through the keys to access nested values
|
|
for k in keys:
|
|
# If the current value is a dictionary and the key exists, update current_value
|
|
if isinstance(current_value, dict) and k in current_value:
|
|
current_value = current_value[k]
|
|
# If the current value is a list, search each element for the key
|
|
elif isinstance(current_value, list):
|
|
nested_objs = [
|
|
item[k]
|
|
for item in current_value
|
|
if isinstance(item, dict) and k in item
|
|
]
|
|
if nested_objs:
|
|
current_value = nested_objs[0]
|
|
else:
|
|
current_value = None
|
|
# If the key doesn't exist or the current value is not a dictionary, break the loop
|
|
else:
|
|
current_value = None
|
|
break
|
|
|
|
# If the current value matches the given value, return the object
|
|
if current_value == value:
|
|
return obj
|
|
|
|
# If no object matches, return None
|
|
return None
|
|
|
|
|
|
def find_closest_value(lst: list[int], value: int) -> int:
|
|
return min(lst, key=lambda v: abs(v - value))
|
|
|
|
|
|
def transform_wait_delay(delay: int) -> int:
|
|
return find_closest_value(ONCALL_DELAY_OPTIONS, delay) * 60
|
|
|
|
|
|
def duration_to_frequency_and_interval(duration: datetime.timedelta) -> tuple[str, int]:
|
|
"""
|
|
Convert a duration to shift frequency and interval.
|
|
For example, 1 day duration returns ("daily", 1), 14 days returns ("weekly", 2),
|
|
"""
|
|
seconds = int(duration.total_seconds())
|
|
|
|
assert seconds >= 3600, "Rotation must be at least 1 hour"
|
|
hours = seconds // 3600
|
|
|
|
if hours >= 24 and hours % 24 == 0:
|
|
days = hours // 24
|
|
if days >= 7 and days % 7 == 0:
|
|
weeks = days // 7
|
|
return "weekly", weeks
|
|
else:
|
|
return "daily", days
|
|
else:
|
|
return "hourly", hours
|
|
|
|
|
|
def dt_to_oncall_datetime(dt: datetime.datetime) -> str:
|
|
"""
|
|
Convert a datetime object to an OnCall datetime string.
|
|
"""
|
|
return dt.strftime("%Y-%m-%dT%H:%M:%S")
|