Add misc useful scripts using public API (#4636)

A few useful Python scripts using the public API I had laying around to
perform misc tasks and reporting.
This commit is contained in:
Matias Bordese 2024-07-09 12:31:47 -03:00 committed by GitHub
parent 34a90134fb
commit b67e6e1aa6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 681 additions and 0 deletions

View file

@ -0,0 +1,80 @@
import json
import requests
# having setup a Discord webhook for a channel, this script will
# setup OnCall webhooks to send and update notifications for alert group created/updated triggers
# Configuration
ONCALL_API_BASE_URL = "https://oncall-prod-us-central-0.grafana.net/oncall"
ONCALL_TOKEN = "<oncall API token>"
ONCALL_WEBHOOK_PREFIX = "discord" # prefix for webhooks naming
DISCORD_WEBHOOK_URL = "<discord webhook URL>"
NOTIFICATION_TEMPLATE = """
{% if alert_group.state == 'acknowledged'%}:orange_circle:{% elif alert_group.state == 'resolved'%}:green_circle:{% elif alert_group.state == 'silenced'%}:white_circle:{% else %}:red_circle:{% endif %} **{{ alert_group.title }}**
*{{ alert_group.state }}*
{{ alert_payload.message }}
*{{ integration.name }}*
{% if event.type == 'acknowledge' %}**Acknowledged by: {{ user.username }}**{% endif %}{% if event.type == 'resolve' %}**Resolved by: {{ user.username }}**{% endif %}{% if event.type == 'silence' %}**Silenced by: {{ user.username }} (until {{ event.until }})**{% endif %}
[View in Grafana OnCall]({{ alert_group.permalinks.web }})
"""
# --- Do not edit below this line ---
def get_oncall_webhook(name):
webhook_uid = None
oncall_url = "{}/api/v1/webhooks/?name={}".format(ONCALL_API_BASE_URL, name)
oncall_api_headers = {
"Authorization": ONCALL_TOKEN
}
r = requests.get(oncall_url, headers=oncall_api_headers)
r.raise_for_status()
results = r.json().get("results", [])
if results:
webhook_uid = results[0]["id"]
return webhook_uid
def setup_oncall_webhook(name, trigger, http_method, endpoint, additional_data=None):
url = "{}{}".format(DISCORD_WEBHOOK_URL, endpoint)
data = {"content": NOTIFICATION_TEMPLATE}
if additional_data is not None:
data.update(additional_data)
webhook_name = "{}-{}".format(ONCALL_WEBHOOK_PREFIX, name)
# check if already exists
webhook_uid = get_oncall_webhook(webhook_name)
# create webhook here/ oncall api here
oncall_url = "{}/api/v1/webhooks/".format(ONCALL_API_BASE_URL)
oncall_api_headers = {
"Authorization": ONCALL_TOKEN
}
oncall_http_method = "POST"
webhook_data = {
"name": webhook_name,
"url": url,
"http_method": http_method,
"trigger_type": trigger,
"forward_all": False,
"data": json.dumps(data),
}
if webhook_uid:
webhook_data["id"] = webhook_uid
oncall_url += webhook_uid
oncall_http_method = "PUT"
r = requests.request(
oncall_http_method, oncall_url, headers=oncall_api_headers, json=webhook_data
)
r.raise_for_status()
return r
# setup webhook for new alert group
endpoint = "?wait=true"
new_ag_webhook = setup_oncall_webhook("new", "alert group created", "POST", endpoint)
# setup webhook for status changes
webhook_create_id = new_ag_webhook.json()["id"]
update_endpoint = "/messages/{{{{ responses.{}.id }}}}".format(webhook_create_id)
update_ag_webhook = setup_oncall_webhook("update", "status change", "PATCH", update_endpoint)

View file

@ -0,0 +1,114 @@
import json
import requests
# this script will get the mattermost channel ID using the mattermost API and
# setup OnCall webhooks to send and update notifications for alert group created/updated triggers
# Configuration
ONCALL_API_BASE_URL = "https://oncall-prod-us-central-0.grafana.net/oncall"
ONCALL_TOKEN = "<oncall API token>"
ONCALL_WEBHOOK_PREFIX = "mattermost" # prefix for webhooks naming
MATTERMOST_API_BASE_URL = "http://localhost:8065"
MATTERMOST_BOT_TOKEN = "<mattermost bot user token>"
MATTERMOST_TEAM_NAME = "testing" # mattermost team name to which the bot belongs to
MATTERMOST_CHANNEL_NAME = "testing" # mattermost channel the bot user will post notifications (should be a member too)
NOTIFICATION_TEMPLATE = """
{% if alert_group.state == 'acknowledged'%}:large_orange_circle:{% elif alert_group.state == 'resolved'%}:large_green_circle:{% elif alert_group.state == 'silenced'%}:white_circle:{% else %}:red_circle:{% endif %} **{{ alert_group.title }}**
*{{ alert_group.state }}*
{{ alert_payload.message }}
*{{ integration.name }}*
{% if event.type == 'acknowledge' %}**Acknowledged by: {{ user.username }}**{% endif %}{% if event.type == 'resolve' %}**Resolved by: {{ user.username }}**{% endif %}{% if event.type == 'silence' %}**Silenced by: {{ user.username }} (until {{ event.until }})**{% endif %}
[View in Grafana OnCall]({{ alert_group.permalinks.web }})
"""
# --- Do not edit below this line ---
MATTERMOST_API_HEADERS = {
"Authorization": "Bearer {}".format(MATTERMOST_BOT_TOKEN),
}
def get_mattermost_channel_id():
url = "{}/api/v4/teams/name/{}/channels/name/{}".format(
MATTERMOST_API_BASE_URL, MATTERMOST_TEAM_NAME, MATTERMOST_CHANNEL_NAME
)
r = requests.get(url, headers=MATTERMOST_API_HEADERS)
r.raise_for_status()
return r.json()["id"]
def get_oncall_webhook(name):
webhook_uid = None
oncall_url = "{}/api/v1/webhooks/?name={}".format(ONCALL_API_BASE_URL, name)
oncall_api_headers = {
"Authorization": ONCALL_TOKEN
}
r = requests.get(oncall_url, headers=oncall_api_headers)
r.raise_for_status()
results = r.json().get("results", [])
if results:
webhook_uid = results[0]["id"]
return webhook_uid
def setup_oncall_webhook(name, trigger, http_method, endpoint, additional_data=None):
url = "{}{}".format(MATTERMOST_API_BASE_URL, endpoint)
headers = MATTERMOST_API_HEADERS
data = {"message": NOTIFICATION_TEMPLATE}
if additional_data is not None:
data.update(additional_data)
webhook_name = "{}-{}".format(ONCALL_WEBHOOK_PREFIX, name)
# check if already exists
webhook_uid = get_oncall_webhook(webhook_name)
# create webhook here/ oncall api here
oncall_url = "{}/api/v1/webhooks/".format(ONCALL_API_BASE_URL)
oncall_api_headers = {
"Authorization": ONCALL_TOKEN
}
oncall_http_method = "POST"
webhook_data = {
"name": webhook_name,
"url": url,
"http_method": http_method,
"trigger_type": trigger,
"forward_all": False,
"data": json.dumps(data),
"authorization_header": MATTERMOST_API_HEADERS["Authorization"],
}
if webhook_uid:
webhook_data["id"] = webhook_uid
oncall_url += webhook_uid
oncall_http_method = "PUT"
r = requests.request(
oncall_http_method, oncall_url, headers=oncall_api_headers, json=webhook_data
)
r.raise_for_status()
return r
# get mattermost channel id from name
channel_id = get_mattermost_channel_id()
# setup webhook for new alert group
endpoint = "/api/v4/posts"
new_ag_webhook = setup_oncall_webhook(
"new", "alert group created", "POST", endpoint,
additional_data={
"channel_id": channel_id,
"metadata": {
"alert_group_id": "{{ alert_group.id }}"
}
}
)
# setup webhook for status changes
webhook_create_id = new_ag_webhook.json()["id"]
update_endpoint = "/api/v4/posts/{{{{ responses.{}.id }}}}".format(webhook_create_id)
update_ag_webhook = setup_oncall_webhook(
"update", "status change", "PUT", update_endpoint,
additional_data={
"id": "{{{{ responses.{}.id }}}}".format(webhook_create_id),
}
)

View file

@ -0,0 +1,81 @@
import csv
import requests
from datetime import datetime, timedelta
# CUSTOMIZE THE FOLLOWING VARIABLES
START_DATE = "2023-09-01"
END_DATE = "2023-09-30"
# time outside this range (or during weekends) will be considered non-working hours
WORKING_HOURS_START_TIME = timedelta(hours=0, minutes=0, seconds=0)
WORKING_HOURS_END_TIME = timedelta(hours=23, minutes=59, seconds=59)
MY_ONCALL_API_BASE_URL = "https://oncall-prod-us-central-0.grafana.net/oncall/api/v1/schedules"
MY_ONCALL_API_KEY = "<oncall API token>"
OUTPUT_FILE_NAME = f"oncall-report-{START_DATE}-to-{END_DATE}.csv"
clamp = lambda t, start, end: max(start, min(end, t))
day_delta = lambda t: t - t.replace(hour = 0, minute = 0, second = 0)
def working_hours_between(a, b):
zero = timedelta(0)
start = WORKING_HOURS_START_TIME
end = WORKING_HOURS_END_TIME
assert(zero <= start <= end <= timedelta(1))
working_day = end - start
days = (b - a).days + 1
weeks = days // 7
# exclude weekends
if a.weekday()==0 and (b.weekday()==4 or b.weekday()==5):
extra = 5
else:
extra = (max(0, 5 - a.weekday()) + min(5, 1 + b.weekday())) % 5
weekdays = weeks * 5 + extra
total = working_day * weekdays
if a.weekday() < 5:
total -= clamp(day_delta(a) - start, zero, working_day)
if b.weekday() < 5:
total -= clamp(end - day_delta(b), zero, working_day)
return total
headers = {"Authorization": MY_ONCALL_API_KEY}
schedule_ids = [schedule["id"] for schedule in requests.get(MY_ONCALL_API_BASE_URL, headers=headers).json()["results"]]
user_on_call_hours = {}
for schedule_id in schedule_ids:
response = requests.get(
f"{MY_ONCALL_API_BASE_URL}/{schedule_id}/final_shifts?start_date={START_DATE}&end_date={END_DATE}",
headers=headers)
for final_shift in response.json()["results"]:
user_pk = final_shift["user_pk"]
end = datetime.fromisoformat(final_shift["shift_end"])
start = datetime.fromisoformat(final_shift["shift_start"])
shift_time_in_seconds = (end - start).total_seconds()
shift_time_in_hours = shift_time_in_seconds / (60 * 60)
working_hours_time = working_hours_between(start, end)
working_hours_time_in_hours = working_hours_time.total_seconds() / (60 * 60)
if user_pk in user_on_call_hours:
user_on_call_hours[user_pk]["hours_on_call"] += shift_time_in_hours
user_on_call_hours[user_pk]["working_hours_time"] += working_hours_time_in_hours
else:
user_on_call_hours[user_pk] = {
"email": final_shift["user_email"],
"hours_on_call": shift_time_in_hours,
"working_hours_time": working_hours_time_in_hours,
}
with open(OUTPUT_FILE_NAME, "w") as fp:
csv_writer = csv.DictWriter(fp, ["user_pk", "user_email", "hours_on_call", "non_working_hours_on_call"])
csv_writer.writeheader()
for user_pk, user_info in user_on_call_hours.items():
csv_writer.writerow({
"user_pk": user_pk,
"user_email": user_info["email"],
"hours_on_call": user_info["hours_on_call"],
"non_working_hours_on_call": user_info["hours_on_call"] - user_info["working_hours_time"],
})

View file

@ -0,0 +1,232 @@
# requires requests (pip install requests)
# This script will output 3 .csv files:
# - oncall.escalation_chains.csv: escalation chains names and their respective serialized steps
# - oncall.orphaned_schedules.csv: schedules ID and name for schedules not linked to any escalation chain
# - oncall.users.csv: users information in the speficied period
# (team, notification policies, hours on-call, # acknowledged, # resolved)
# You can run it like this:
# $ ONCALL_API_TOKEN=<api-token> DAYS=7 python oncall.reports.py
import csv
import os
from datetime import datetime, timedelta, timezone
import requests
ONCALL_API_BASE_URL = os.environ.get(
"ONCALL_API_BASE_URL",
"https://oncall-prod-us-central-0.grafana.net/oncall",
)
ONCALL_API_TOKEN = os.environ.get("ONCALL_API_TOKEN")
# number of days to consider (default: last 30 days)
NUM_LAST_DAYS = int(os.environ.get("DAYS", 30))
# output CSV filenames with the data
ESCALATION_CHAINS_OUTPUT_FILE_NAME = "oncall.escalation_chains.csv"
ORPHANED_SCHEDULES_OUTPUT_FILE_NAME = "oncall.orphaned_schedules.csv"
USERS_OUTPUT_FILE_NAME = "oncall.users.csv"
headers = {
"Authorization": ONCALL_API_TOKEN,
}
users = {}
teams = {}
escalation_chains = {}
schedules = {}
end_date = datetime.now(timezone.utc).replace(hour=0, minute=0, microsecond=0)
start_date = end_date - timedelta(days=NUM_LAST_DAYS)
hours_field_name = "hours_on_call_last_{}d".format(NUM_LAST_DAYS)
def _serialize_step(p):
step = p["type"]
if step == "wait":
step = "{}({})".format(p["type"], p["duration"])
elif step == "trigger_webhook":
step = "{}({})".format(p["type"], p["action_to_trigger"])
elif step == "notify_user_group":
step = "{}({})".format(p["type"], p["group_to_notify"])
elif step == "notify_persons":
step = "{}({})".format(
p["type"],
','.join(users[u_id]["username"] for u_id in p["persons_to_notify"]) if p["persons_to_notify"] else '',
)
elif step == "notify_on_call_from_schedule":
schedule_id = p["notify_on_call_from_schedule"]
step = "{}({})".format(
p["type"],
schedules.get(schedule_id, "missing") if schedule_id else '',
)
elif step == "notify_if_time_from_to":
step = "{}({}-{})".format(p["type"], p["notify_if_time_from"], p["notify_if_time_to"])
return step
# fetch teams
# GET {{API_URL}}/api/v1/teams/
print("Fetching teams data...")
url = ONCALL_API_BASE_URL + "/api/v1/teams/"
r = requests.get(url, params={"perpage": 100}, headers=headers) # TODO: handle pagination
r.raise_for_status()
results = r.json().get("results")
for t in results:
teams[t["id"]] = t["name"]
# fetch users (TODO: handle pagination)
# https://grafana.com/docs/grafana-cloud/alerting-and-irm/oncall/oncall-api-reference/users/#list-users
# GET {{API_URL}}/api/v1/users/
print("Fetching users data...")
page = 1
while True:
url = ONCALL_API_BASE_URL + "/api/v1/users/"
r = requests.get(url, params={"page": page}, headers=headers)
r.raise_for_status()
response_data = r.json()
results = response_data.get("results")
for u in results:
users[u["id"]] = {
"username": u["username"],
"email": u["email"],
"teams": ",".join([teams[t] for t in u["teams"]]),
"acknowledged_count": 0,
"resolved_count": 0,
hours_field_name: 0,
}
page += 1
total_pages = int(response_data.get("total_pages"))
if page > total_pages:
break
# fetch policies
# https://grafana.com/docs/grafana-cloud/alerting-and-irm/oncall/oncall-api-reference/personal_notification_rules/#list-personal-notification-rules
# {{API_URL}}/api/v1/personal_notification_rules/ ?user_id= & important=
print("Fetching users notification policies...")
url = ONCALL_API_BASE_URL + "/api/v1/personal_notification_rules/"
for u in users:
for important in ("true", "false"):
r = requests.get(url, params={"user_id": u, "important": important}, headers=headers)
r.raise_for_status()
results = r.json().get("results")
policy = ",".join(_serialize_step(p) for p in results)
key = "important" if important == "true" else "default"
users[u][key] = policy
# get on-call schedule time
# https://grafana.com/docs/grafana-cloud/alerting-and-irm/oncall/oncall-api-reference/schedules/#export-a-schedules-final-shifts
print("Fetching schedules/shifts data...")
url = ONCALL_API_BASE_URL + "/api/v1/schedules"
r = requests.get(url, headers=headers)
r.raise_for_status()
results = r.json().get("results")
for schedule in results:
schedules[schedule["id"]] = schedule["name"]
schedule_id = schedule["id"]
url = ONCALL_API_BASE_URL + "/api/v1/schedules/{}/final_shifts".format(schedule_id)
params = {
"start_date": start_date.strftime("%Y-%m-%d"),
"end_date": end_date.strftime("%Y-%m-%d"),
}
r = requests.get(url, params=params, headers=headers)
r.raise_for_status()
shifts = r.json().get("results")
for final_shift in shifts:
user_pk = final_shift["user_pk"]
end = datetime.fromisoformat(final_shift["shift_end"].replace('Z', '+00:00'))
start = datetime.fromisoformat(final_shift["shift_start"].replace('Z', '+00:00'))
shift_time_in_seconds = (end - start).total_seconds()
shift_time_in_hours = shift_time_in_seconds / (60 * 60)
on_call_hours = users.get(user_pk, {}).get(hours_field_name, 0)
users[user_pk][hours_field_name] = on_call_hours + shift_time_in_hours
# fetch alert groups
# https://grafana.com/docs/grafana-cloud/alerting-and-irm/oncall/oncall-api-reference/alertgroups/#list-alert-groups
# GET {{API_URL}}/api/v1/alert_groups/
print("Fetching alert groups data...")
page = 1
in_range = True
while in_range:
url = ONCALL_API_BASE_URL + "/api/v1/alert_groups"
r = requests.get(url, params={"page": page}, headers=headers)
r.raise_for_status()
results = r.json().get("results")
for ag in results:
created_at = datetime.fromisoformat(ag["created_at"].replace('Z', '+00:00'))
if created_at < start_date:
in_range = False
break
ack_by = ag["acknowledged_by"]
resolved_by = ag["resolved_by"]
if ack_by:
users[ack_by]["acknowledged_count"] += 1
if resolved_by:
users[resolved_by]["resolved_count"] += 1
page += 1
# fetch escalation chains
# https://grafana.com/docs/grafana-cloud/alerting-and-irm/oncall/oncall-api-reference/escalation_chains/#list-escalation-chains
# GET {{API_URL}}/api/v1/escalation_chains/
print("Fetching escalation chains data...")
url = ONCALL_API_BASE_URL + "/api/v1/escalation_chains/"
r = requests.get(url, params={"perpage": 100}, headers=headers)
r.raise_for_status()
results = r.json().get("results")
orphaned_schedules = set(schedules.keys())
for chain in results:
chain_id = chain["id"]
# fetch policies for escalation chain
# https://grafana.com/docs/grafana-cloud/alerting-and-irm/oncall/oncall-api-reference/escalation_policies/#list-escalation-policies
# GET {{API_URL}}/api/v1/escalation_policies/
url = ONCALL_API_BASE_URL + "/api/v1/escalation_policies/"
r = requests.get(url, params={"escalation_chain_id": chain_id}, headers=headers)
r.raise_for_status()
results = r.json().get("results")
steps = ",".join(_serialize_step(p) for p in results)
escalation_chains[chain_id] = {"name": chain["name"], "steps": steps}
notify_schedules = [s for s in results if s["type"] == "notify_on_call _from_schedule"]
for s in notify_schedules:
# remove schedule from potential orphaned schedules
schedule_id = s["notify_on_call _from_schedule"]
orphaned_schedules.remove(schedule_id)
# write orphaned schedules report
with open(ORPHANED_SCHEDULES_OUTPUT_FILE_NAME, "w") as fp:
fieldnames = ["schedule_id", "name"]
csv_writer = csv.DictWriter(fp, fieldnames)
csv_writer.writeheader()
for s_id in orphaned_schedules:
row = {"schedule_id": s_id, "name": schedules[s_id]}
csv_writer.writerow(row)
# write escalation chains report
with open(ESCALATION_CHAINS_OUTPUT_FILE_NAME, "w") as fp:
fieldnames = ["name", "steps"]
csv_writer = csv.DictWriter(fp, fieldnames)
csv_writer.writeheader()
for chain_info in escalation_chains.values():
csv_writer.writerow(chain_info)
# write users report
with open(USERS_OUTPUT_FILE_NAME, "w") as fp:
fieldnames = ["username", "email", "teams", "important", "default", hours_field_name, "acknowledged_count", "resolved_count"]
csv_writer = csv.DictWriter(fp, fieldnames)
csv_writer.writeheader()
for user_info in users.values():
csv_writer.writerow(user_info)

19
tools/scripts/readme.md Normal file
View file

@ -0,0 +1,19 @@
# Sample scripts using public API
- [oncall_hours_reports.py](oncall_hours_reports.py)
Generate per-user on-call hours report
- [oncall_reports.py](oncall_reports.py)
Generate CSV user reports using public API
- [shift_shifts.py](shift_shifts.py)
Shift schedule shifts by a given delta
- [mattermost_webhooks.py](mattermost_webhooks.py)
Setup Mattermost webhooks for alert group notifications
- [discord_webhooks.py](discord_webhooks.py)
Setup Discord webhooks for alert group notifications
- [swap_requests_workday.py](swap_requests_workday.py)
Create shift swap requests using Workday absences information

View file

@ -0,0 +1,51 @@
# requires: requests
import requests
from datetime import datetime, timedelta
HOURS_DELTA = -1 # delta in hours to shift rotations
ONCALL_API_BASE_URL = "https://oncall-prod-us-central-0.grafana.net/oncall"
ONCALL_API_TOKEN = "<oncall API token>"
# update only a specific schedule, by id (e.g. "SVVGWD8W1Q38A")
# if set to None, will update all your schedules
SCHEDULE_ID = None
headers = {
"Authorization": ONCALL_API_TOKEN,
}
if SCHEDULE_ID is not None:
# filter schedule shifts
url = f"{ONCALL_API_BASE_URL}/api/v1/on_call_shifts/?schedule_id={SCHEDULE_ID}"
else:
# assuming there is up to 100 shifts only (max page size)
url = f"{ONCALL_API_BASE_URL}/api/v1/on_call_shifts/?perpage=100"
# note: overrides are not included
r = requests.get(url, headers=headers)
if not r.ok:
raise Exception(r.status_code)
now = datetime.utcnow()
shift_delta = timedelta(hours=HOURS_DELTA)
shifts = r.json()["results"]
for shift in shifts:
# get shift information
shift_id = shift["id"]
shift_start = datetime.strptime(shift["start"], "%Y-%m-%dT%H:%M:%S")
until = shift.get("until")
if until is not None:
until = datetime.strptime(shift["start"], "%Y-%m-%dT%H:%M:%S")
if until < now:
# skip finished rotation
continue
# update shift start by delta
updated_start = shift_start + shift_delta
update_data = {"start": updated_start.isoformat()}
shift_url = f"{ONCALL_API_BASE_URL}/api/v1/on_call_shifts/{shift_id}"
r = requests.put(shift_url, json=update_data, headers=headers)
if not r.ok:
print(f"Failed to update shift {shift_id}")
else:
print(f"Shift {shift_id} updated")

View file

@ -0,0 +1,104 @@
# pip install openpyxl pytz requests
# ONCALL_API_TOKEN="<YOUR-TOKEN>" python swap_requests_workday.py -u <USER_ID> -s <SCHEDULE_ID> <workday-exported-file.xlsx> -t <TIMEZONE>
# e.g. ONCALL_API_TOKEN="the-token" python swap_requests_workday.py -u UCGEIXI1MR1NZ -s SF1R2ZQZKJNLK workday.xlsx -t "America/Montevideo" -d
# TODO: handle specific events (public holidays, vacation, sick leave, etc)
import argparse
import datetime
import os
import openpyxl
import pytz
import requests
ONCALL_API_TOKEN = os.environ.get("ONCALL_API_TOKEN", "")
ONCALL_API_BASE_URL = os.environ.get(
"ONCALL_API_BASE_URL", "# https://oncall-prod-us-central-0.grafana.net/oncall"
)
parser = argparse.ArgumentParser(
description="Create shift swap requests from a Workday absences exported file"
)
parser.add_argument("-d", "--dry-run", action="store_true", help="Dry run")
parser.add_argument("-u", "--user", required=True, help="User ID, swap beneficiary")
parser.add_argument("-s", "--schedule", required=True, help="Schedule ID")
parser.add_argument(
"-t", "--timezone", required=False, default="UTC", help="User timezone"
)
parser.add_argument("filename", help="Workday export (.xlsx)")
# Read arguments from command line
args = parser.parse_args()
try:
tz = pytz.timezone(args.timezone)
except pytz.UnknownTimeZoneError:
raise
# shift swaps API
path = "/api/v1/shift_swaps/"
url = ONCALL_API_BASE_URL + path
# required auth
headers = {
"Authorization": ONCALL_API_TOKEN,
}
now = datetime.datetime.now(datetime.timezone.utc)
excel = openpyxl.load_workbook(args.filename)
sheet = excel.active
for r in list(sheet.rows)[2:]:
starting_datetime, _, absence_type, duration, unit, comment, status, _ = [
cell.value for cell in r
]
starting_datetime = tz.localize(starting_datetime).astimezone(pytz.UTC)
if starting_datetime <= now:
# ignore past absences
continue
if duration <= 0:
# ignore corrections
continue
if status != "Approved":
# only consider approved requests
continue
# check request already exists
params = {
"schedule_id": args.schedule,
"beneficiary": args.user,
"starting_after": starting_datetime.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
}
r = requests.get(url, params=params, headers=headers)
r.raise_for_status()
results = r.json().get("results")
if results and results[0]["swap_start"] == params["starting_after"]:
print("Swap request already exists for {}".format(params["starting_after"]))
continue
# assert unit == "Days"
ending_datetime = starting_datetime + datetime.timedelta(hours=24 * duration)
description = "{}: {}".format(absence_type, comment or "I will be off")
# create swap request
data = {
"schedule": args.schedule,
"swap_start": starting_datetime.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"swap_end": ending_datetime.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"description": description,
"beneficiary": args.user,
}
if args.dry_run:
print("Swap request payload would be:")
print(data)
else:
r = requests.post(url, json=data, headers=headers)
r.raise_for_status()
print(
"Swap request created for {} ({})".format(
params["starting_after"], absence_type
)
)