oncall-engine/engine/apps/auth_token/models/service_account_token.py

107 lines
4.2 KiB
Python
Raw Permalink Normal View History

import binascii
from hmac import compare_digest
from django.db import models
from apps.api.permissions import GrafanaAPIPermissions, LegacyAccessControlRole
from apps.auth_token import constants
from apps.auth_token.crypto import hash_token_string
from apps.auth_token.exceptions import InvalidToken
from apps.auth_token.grafana.grafana_auth_token import (
get_service_account_details,
get_service_account_token_permissions,
)
from apps.auth_token.models import BaseAuthToken
from apps.user_management.models import ServiceAccount, ServiceAccountUser
class ServiceAccountTokenManager(models.Manager):
def get_queryset(self):
return super().get_queryset().select_related("service_account__organization")
class ServiceAccountToken(BaseAuthToken):
GRAFANA_SA_PREFIX = "glsa_"
objects = ServiceAccountTokenManager()
service_account: "ServiceAccount"
service_account = models.ForeignKey(ServiceAccount, on_delete=models.CASCADE, related_name="tokens")
class Meta:
unique_together = ("token_key", "service_account", "digest")
@property
def organization(self):
return self.service_account.organization
@classmethod
def validate_token(cls, organization, token):
# Grafana API request: get permissions and confirm token is valid
permissions = get_service_account_token_permissions(organization, token)
if not permissions:
# NOTE: a token can be disabled/re-enabled (not setting as revoked in oncall DB for now)
raise InvalidToken
# check if we have already seen this token
validated_token = None
service_account = None
prefix_length = len(cls.GRAFANA_SA_PREFIX)
token_key = token[prefix_length : prefix_length + constants.TOKEN_KEY_LENGTH]
try:
hashable_token = binascii.hexlify(token.encode()).decode()
digest = hash_token_string(hashable_token)
except (TypeError, binascii.Error):
raise InvalidToken
for existing_token in cls.objects.filter(service_account__organization=organization, token_key=token_key):
if compare_digest(digest, existing_token.digest):
validated_token = existing_token
service_account = existing_token.service_account
break
if not validated_token:
# if it didn't match an existing token, create a new one
# make request to Grafana API api/user using token
service_account_data = get_service_account_details(organization, token)
if not service_account_data:
# Grafana versions < 11.3 return 403 trying to get user details with service account token
# use some default values
service_account_data = {
"login": "grafana_service_account",
"uid": None, # "service-account:7"
}
grafana_id = 0 # default to zero for old Grafana versions (to keep service account unique)
if service_account_data["uid"] is not None:
# extract service account Grafana ID
try:
grafana_id = int(service_account_data["uid"].split(":")[-1])
except ValueError:
pass
# get or create service account
service_account, _ = ServiceAccount.objects.get_or_create(
organization=organization,
grafana_id=grafana_id,
defaults={
"login": service_account_data["login"],
},
)
# create token
validated_token, _ = cls.objects.get_or_create(
service_account=service_account,
token_key=token_key,
digest=digest,
)
user = ServiceAccountUser(
organization=organization,
service_account=service_account,
username=service_account.username,
public_primary_key=service_account.public_primary_key,
role=LegacyAccessControlRole.NONE,
permissions=GrafanaAPIPermissions.construct_permissions(permissions.keys()),
)
return user, validated_token