oncall-engine/tools/migrators/lib/grafana/api_client.py

84 lines
2.6 KiB
Python
Raw Permalink Normal View History

import secrets
from urllib.parse import urljoin
import requests
class GrafanaAPIClient:
def __init__(self, base_url, username, password):
self.base_url = base_url
self.username = username
self.password = password
def _api_call(self, method: str, path: str, **kwargs):
return requests.request(
method,
urljoin(self.base_url, path),
auth=(self.username, self.password),
**kwargs,
)
def create_user_with_random_password(self, name: str, email: str):
return self._api_call(
"POST",
"/api/admin/users",
json={
"name": name,
"email": email,
"login": email.split("@")[0],
"password": secrets.token_urlsafe(15),
},
)
def get_all_users(self):
"""
https://grafana.com/docs/grafana/v10.3/developers/http_api/user/#search-users
"""
return self._api_call("GET", "/api/users").json()
def idemopotently_create_team_and_add_users(
self, team_name: str, user_emails: list[str]
) -> int:
"""
Get team by name
https://grafana.com/docs/grafana/v10.3/developers/http_api/team/#using-the-name-parameter
Create team
https://grafana.com/docs/grafana/v10.3/developers/http_api/team/#add-team
Add team members
https://grafana.com/docs/grafana/v10.3/developers/http_api/team/#add-team-member
"""
existing_team = self._api_call(
"GET", "/api/teams/search", params={"name": team_name}
).json()
if existing_team["teams"]:
# team already exists
team_id = existing_team["teams"][0]["id"]
else:
# team doesn't exist create it
response = self._api_call("POST", "/api/teams", json={"name": team_name})
if response.status_code == 200:
team_id = response.json()["teamId"]
else:
raise Exception(f"Failed to fetch/create Grafana team '{team_name}'")
grafana_users = self.get_all_users()
grafana_user_id_to_email_map = {}
for user_email in user_emails:
for grafana_user in grafana_users:
if grafana_user["email"] == user_email:
grafana_user_id_to_email_map[grafana_user["id"]] = user_email
break
for user_id in grafana_user_id_to_email_map.keys():
self._api_call(
"POST", f"/api/teams/{team_id}/members", json={"userId": user_id}
)
return team_id