Add additional information to the reports script (#4665)

- Flag users missing a required notification method
- Check users have scheduled shifts in non-orphaned (web or terraform)
schedules
- Generate a `teams.csv` file with information about alert groups per
team, and their MTTA and MTTR (in the specified period)
This commit is contained in:
Matias Bordese 2024-07-15 16:36:41 -03:00 committed by GitHub
parent 7500ff0a8b
commit da67c97b8c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1,14 +1,16 @@
# requires requests (pip install requests)
# This script will output 3 .csv files:
# This script will output 4 .csv files:
# - oncall.escalation_chains.csv: escalation chains names and their respective serialized steps
# - oncall.orphaned_schedules.csv: schedules ID and name for schedules not linked to any escalation chain
# - oncall.teams.csv: teams alert groups count, mean time to acknowledge and mean time to resolve
# - oncall.users.csv: users information in the speficied period
# (team, notification policies, hours on-call, # acknowledged, # resolved)
# (team, notification policies, hours on-call, has shifts scheduled, # acknowledged, # resolved)
# You can run it like this:
# $ ONCALL_API_TOKEN=<api-token> DAYS=7 python oncall.reports.py
# $ ONCALL_API_TOKEN=<api-token> DAYS=7 python oncall_reports.py
import collections
import csv
import os
@ -24,10 +26,12 @@ ONCALL_API_TOKEN = os.environ.get("ONCALL_API_TOKEN")
# number of days to consider (default: last 30 days)
NUM_LAST_DAYS = int(os.environ.get("DAYS", 30))
REQUIRED_PERSONAL_NOTIFICATION_METHODS = ["phone_call", "mobile_app"]
# output CSV filenames with the data
ESCALATION_CHAINS_OUTPUT_FILE_NAME = "oncall.escalation_chains.csv"
ORPHANED_SCHEDULES_OUTPUT_FILE_NAME = "oncall.orphaned_schedules.csv"
TEAMS_OUTPUT_FILE_NAME = "oncall.teams.csv"
USERS_OUTPUT_FILE_NAME = "oncall.users.csv"
@ -38,7 +42,11 @@ headers = {
users = {}
teams = {}
escalation_chains = {}
integrations = {}
schedules = {}
ag_per_team = collections.defaultdict(int)
ttr_acc = collections.defaultdict(int)
tta_acc = collections.defaultdict(int)
end_date = datetime.now(timezone.utc).replace(hour=0, minute=0, microsecond=0)
start_date = end_date - timedelta(days=NUM_LAST_DAYS)
@ -99,6 +107,7 @@ while True:
"acknowledged_count": 0,
"resolved_count": 0,
hours_field_name: 0,
"shifts_scheduled": False,
}
page += 1
total_pages = int(response_data.get("total_pages"))
@ -121,6 +130,18 @@ for u in users:
users[u][key] = policy
# fetch integrations
# https://grafana.com/docs/grafana-cloud/alerting-and-irm/oncall/oncall-api-reference/integrations/#list-integrations
# GET {{API_URL}}/api/v1/integrations/
print("Fetching integrations data...")
url = ONCALL_API_BASE_URL + "/api/v1/integrations/"
r = requests.get(url, params={"perpage": 100}, headers=headers) # TODO: handle pagination
r.raise_for_status()
results = r.json().get("results")
for i in results:
integrations[i["id"]] = i
# get on-call schedule time
# https://grafana.com/docs/grafana-cloud/alerting-and-irm/oncall/oncall-api-reference/schedules/#export-a-schedules-final-shifts
@ -173,6 +194,14 @@ while in_range:
users[ack_by]["acknowledged_count"] += 1
if resolved_by:
users[resolved_by]["resolved_count"] += 1
team_id = integrations.get(ag["integration_id"], {}).get("team_id", None)
ag_per_team[team_id] += 1
if ag["acknowledged_at"]:
acknowledged_at = datetime.fromisoformat(ag["acknowledged_at"].replace('Z', '+00:00'))
tta_acc[team_id] += (acknowledged_at - created_at).total_seconds()
if ag["resolved_at"]:
resolved_at = datetime.fromisoformat(ag["resolved_at"].replace('Z', '+00:00'))
ttr_acc[team_id] += (resolved_at - created_at).total_seconds()
page += 1
@ -204,6 +233,27 @@ for chain in results:
orphaned_schedules.remove(schedule_id)
# check shifts from non-orphaned schedules, flag users shifts
# https://grafana.com/docs/grafana-cloud/alerting-and-irm/oncall/oncall-api-reference/on_call_shifts/#list-oncall-shifts
# GET {{API_URL}}/api/v1/on_call_shifts/?schedule_id=
print("Checking shifts from non-orphaned schedules...")
for schedule_id in schedules:
if schedule_id in orphaned_schedules:
continue
url = ONCALL_API_BASE_URL + "/api/v1/on_call_shifts/"
r = requests.get(url, params={"schedule_id": schedule_id}, headers=headers)
r.raise_for_status()
results = r.json().get("results")
for shift in results:
on_call_users = shift.get("users", []) + list({u for r in shift.get("rolling_users", []) for u in r})
for user_id in on_call_users:
if user_id not in users:
print("Warning: user {} from schedule {} not found".format(user_id, schedule_id))
else:
users[user_id]["shifts_scheduled"] = True
# write orphaned schedules report
with open(ORPHANED_SCHEDULES_OUTPUT_FILE_NAME, "w") as fp:
fieldnames = ["schedule_id", "name"]
@ -223,10 +273,31 @@ with open(ESCALATION_CHAINS_OUTPUT_FILE_NAME, "w") as fp:
csv_writer.writerow(chain_info)
# write teams report
with open(TEAMS_OUTPUT_FILE_NAME, "w") as fp:
fieldnames = ["team", "alert_group_count", "mtta", "mttr"]
csv_writer = csv.DictWriter(fp, fieldnames)
csv_writer.writeheader()
for team_id, ag_count in ag_per_team.items():
team_name = teams[team_id] if team_id else "(None)"
csv_writer.writerow({
"team": team_name,
"alert_group_count": ag_count,
"mtta": tta_acc[team_id] / ag_count,
"mttr": ttr_acc[team_id] / ag_count,
})
# write users report
with open(USERS_OUTPUT_FILE_NAME, "w") as fp:
fieldnames = ["username", "email", "teams", "important", "default", hours_field_name, "acknowledged_count", "resolved_count"]
fieldnames = ["username", "email", "teams", "important", "default", "warning", hours_field_name, "shifts_scheduled", "acknowledged_count", "resolved_count"]
csv_writer = csv.DictWriter(fp, fieldnames)
csv_writer.writeheader()
for user_info in users.values():
warnings = []
for method in REQUIRED_PERSONAL_NOTIFICATION_METHODS:
expected = "notify_by_{}".format(method)
if expected not in user_info["important"] and method not in user_info["default"]:
warnings.append("Missing {}".format(method))
user_info["warning"] = ','.join(warnings)
csv_writer.writerow(user_info)