diff --git a/tools/scripts/oncall_reports.py b/tools/scripts/oncall_reports.py index 1c908ce8..0084bff8 100644 --- a/tools/scripts/oncall_reports.py +++ b/tools/scripts/oncall_reports.py @@ -1,14 +1,16 @@ # requires requests (pip install requests) -# This script will output 3 .csv files: +# This script will output 4 .csv files: # - oncall.escalation_chains.csv: escalation chains names and their respective serialized steps # - oncall.orphaned_schedules.csv: schedules ID and name for schedules not linked to any escalation chain +# - oncall.teams.csv: teams alert groups count, mean time to acknowledge and mean time to resolve # - oncall.users.csv: users information in the speficied period -# (team, notification policies, hours on-call, # acknowledged, # resolved) +# (team, notification policies, hours on-call, has shifts scheduled, # acknowledged, # resolved) # You can run it like this: -# $ ONCALL_API_TOKEN= DAYS=7 python oncall.reports.py +# $ ONCALL_API_TOKEN= DAYS=7 python oncall_reports.py +import collections import csv import os @@ -24,10 +26,12 @@ ONCALL_API_TOKEN = os.environ.get("ONCALL_API_TOKEN") # number of days to consider (default: last 30 days) NUM_LAST_DAYS = int(os.environ.get("DAYS", 30)) +REQUIRED_PERSONAL_NOTIFICATION_METHODS = ["phone_call", "mobile_app"] # output CSV filenames with the data ESCALATION_CHAINS_OUTPUT_FILE_NAME = "oncall.escalation_chains.csv" ORPHANED_SCHEDULES_OUTPUT_FILE_NAME = "oncall.orphaned_schedules.csv" +TEAMS_OUTPUT_FILE_NAME = "oncall.teams.csv" USERS_OUTPUT_FILE_NAME = "oncall.users.csv" @@ -38,7 +42,11 @@ headers = { users = {} teams = {} escalation_chains = {} +integrations = {} schedules = {} +ag_per_team = collections.defaultdict(int) +ttr_acc = collections.defaultdict(int) +tta_acc = collections.defaultdict(int) end_date = datetime.now(timezone.utc).replace(hour=0, minute=0, microsecond=0) start_date = end_date - timedelta(days=NUM_LAST_DAYS) @@ -99,6 +107,7 @@ while True: "acknowledged_count": 0, "resolved_count": 0, hours_field_name: 0, + "shifts_scheduled": False, } page += 1 total_pages = int(response_data.get("total_pages")) @@ -121,6 +130,18 @@ for u in users: users[u][key] = policy +# fetch integrations +# https://grafana.com/docs/grafana-cloud/alerting-and-irm/oncall/oncall-api-reference/integrations/#list-integrations +# GET {{API_URL}}/api/v1/integrations/ +print("Fetching integrations data...") +url = ONCALL_API_BASE_URL + "/api/v1/integrations/" +r = requests.get(url, params={"perpage": 100}, headers=headers) # TODO: handle pagination +r.raise_for_status() +results = r.json().get("results") +for i in results: + integrations[i["id"]] = i + + # get on-call schedule time # https://grafana.com/docs/grafana-cloud/alerting-and-irm/oncall/oncall-api-reference/schedules/#export-a-schedules-final-shifts @@ -173,6 +194,14 @@ while in_range: users[ack_by]["acknowledged_count"] += 1 if resolved_by: users[resolved_by]["resolved_count"] += 1 + team_id = integrations.get(ag["integration_id"], {}).get("team_id", None) + ag_per_team[team_id] += 1 + if ag["acknowledged_at"]: + acknowledged_at = datetime.fromisoformat(ag["acknowledged_at"].replace('Z', '+00:00')) + tta_acc[team_id] += (acknowledged_at - created_at).total_seconds() + if ag["resolved_at"]: + resolved_at = datetime.fromisoformat(ag["resolved_at"].replace('Z', '+00:00')) + ttr_acc[team_id] += (resolved_at - created_at).total_seconds() page += 1 @@ -204,6 +233,27 @@ for chain in results: orphaned_schedules.remove(schedule_id) +# check shifts from non-orphaned schedules, flag users shifts +# https://grafana.com/docs/grafana-cloud/alerting-and-irm/oncall/oncall-api-reference/on_call_shifts/#list-oncall-shifts +# GET {{API_URL}}/api/v1/on_call_shifts/?schedule_id= + +print("Checking shifts from non-orphaned schedules...") +for schedule_id in schedules: + if schedule_id in orphaned_schedules: + continue + url = ONCALL_API_BASE_URL + "/api/v1/on_call_shifts/" + r = requests.get(url, params={"schedule_id": schedule_id}, headers=headers) + r.raise_for_status() + results = r.json().get("results") + for shift in results: + on_call_users = shift.get("users", []) + list({u for r in shift.get("rolling_users", []) for u in r}) + for user_id in on_call_users: + if user_id not in users: + print("Warning: user {} from schedule {} not found".format(user_id, schedule_id)) + else: + users[user_id]["shifts_scheduled"] = True + + # write orphaned schedules report with open(ORPHANED_SCHEDULES_OUTPUT_FILE_NAME, "w") as fp: fieldnames = ["schedule_id", "name"] @@ -223,10 +273,31 @@ with open(ESCALATION_CHAINS_OUTPUT_FILE_NAME, "w") as fp: csv_writer.writerow(chain_info) +# write teams report +with open(TEAMS_OUTPUT_FILE_NAME, "w") as fp: + fieldnames = ["team", "alert_group_count", "mtta", "mttr"] + csv_writer = csv.DictWriter(fp, fieldnames) + csv_writer.writeheader() + for team_id, ag_count in ag_per_team.items(): + team_name = teams[team_id] if team_id else "(None)" + csv_writer.writerow({ + "team": team_name, + "alert_group_count": ag_count, + "mtta": tta_acc[team_id] / ag_count, + "mttr": ttr_acc[team_id] / ag_count, + }) + + # write users report with open(USERS_OUTPUT_FILE_NAME, "w") as fp: - fieldnames = ["username", "email", "teams", "important", "default", hours_field_name, "acknowledged_count", "resolved_count"] + fieldnames = ["username", "email", "teams", "important", "default", "warning", hours_field_name, "shifts_scheduled", "acknowledged_count", "resolved_count"] csv_writer = csv.DictWriter(fp, fieldnames) csv_writer.writeheader() for user_info in users.values(): + warnings = [] + for method in REQUIRED_PERSONAL_NOTIFICATION_METHODS: + expected = "notify_by_{}".format(method) + if expected not in user_info["important"] and method not in user_info["default"]: + warnings.append("Missing {}".format(method)) + user_info["warning"] = ','.join(warnings) csv_writer.writerow(user_info)