107 lines
4.2 KiB
Python
107 lines
4.2 KiB
Python
|
|
import binascii
|
||
|
|
from hmac import compare_digest
|
||
|
|
|
||
|
|
from django.db import models
|
||
|
|
|
||
|
|
from apps.api.permissions import GrafanaAPIPermissions, LegacyAccessControlRole
|
||
|
|
from apps.auth_token import constants
|
||
|
|
from apps.auth_token.crypto import hash_token_string
|
||
|
|
from apps.auth_token.exceptions import InvalidToken
|
||
|
|
from apps.auth_token.grafana.grafana_auth_token import (
|
||
|
|
get_service_account_details,
|
||
|
|
get_service_account_token_permissions,
|
||
|
|
)
|
||
|
|
from apps.auth_token.models import BaseAuthToken
|
||
|
|
from apps.user_management.models import ServiceAccount, ServiceAccountUser
|
||
|
|
|
||
|
|
|
||
|
|
class ServiceAccountTokenManager(models.Manager):
|
||
|
|
def get_queryset(self):
|
||
|
|
return super().get_queryset().select_related("service_account__organization")
|
||
|
|
|
||
|
|
|
||
|
|
class ServiceAccountToken(BaseAuthToken):
|
||
|
|
GRAFANA_SA_PREFIX = "glsa_"
|
||
|
|
|
||
|
|
objects = ServiceAccountTokenManager()
|
||
|
|
|
||
|
|
service_account: "ServiceAccount"
|
||
|
|
service_account = models.ForeignKey(ServiceAccount, on_delete=models.CASCADE, related_name="tokens")
|
||
|
|
|
||
|
|
class Meta:
|
||
|
|
unique_together = ("token_key", "service_account", "digest")
|
||
|
|
|
||
|
|
@property
|
||
|
|
def organization(self):
|
||
|
|
return self.service_account.organization
|
||
|
|
|
||
|
|
@classmethod
|
||
|
|
def validate_token(cls, organization, token):
|
||
|
|
# Grafana API request: get permissions and confirm token is valid
|
||
|
|
permissions = get_service_account_token_permissions(organization, token)
|
||
|
|
if not permissions:
|
||
|
|
# NOTE: a token can be disabled/re-enabled (not setting as revoked in oncall DB for now)
|
||
|
|
raise InvalidToken
|
||
|
|
|
||
|
|
# check if we have already seen this token
|
||
|
|
validated_token = None
|
||
|
|
service_account = None
|
||
|
|
prefix_length = len(cls.GRAFANA_SA_PREFIX)
|
||
|
|
token_key = token[prefix_length : prefix_length + constants.TOKEN_KEY_LENGTH]
|
||
|
|
try:
|
||
|
|
hashable_token = binascii.hexlify(token.encode()).decode()
|
||
|
|
digest = hash_token_string(hashable_token)
|
||
|
|
except (TypeError, binascii.Error):
|
||
|
|
raise InvalidToken
|
||
|
|
for existing_token in cls.objects.filter(service_account__organization=organization, token_key=token_key):
|
||
|
|
if compare_digest(digest, existing_token.digest):
|
||
|
|
validated_token = existing_token
|
||
|
|
service_account = existing_token.service_account
|
||
|
|
break
|
||
|
|
|
||
|
|
if not validated_token:
|
||
|
|
# if it didn't match an existing token, create a new one
|
||
|
|
# make request to Grafana API api/user using token
|
||
|
|
service_account_data = get_service_account_details(organization, token)
|
||
|
|
if not service_account_data:
|
||
|
|
# Grafana versions < 11.3 return 403 trying to get user details with service account token
|
||
|
|
# use some default values
|
||
|
|
service_account_data = {
|
||
|
|
"login": "grafana_service_account",
|
||
|
|
"uid": None, # "service-account:7"
|
||
|
|
}
|
||
|
|
|
||
|
|
grafana_id = 0 # default to zero for old Grafana versions (to keep service account unique)
|
||
|
|
if service_account_data["uid"] is not None:
|
||
|
|
# extract service account Grafana ID
|
||
|
|
try:
|
||
|
|
grafana_id = int(service_account_data["uid"].split(":")[-1])
|
||
|
|
except ValueError:
|
||
|
|
pass
|
||
|
|
|
||
|
|
# get or create service account
|
||
|
|
service_account, _ = ServiceAccount.objects.get_or_create(
|
||
|
|
organization=organization,
|
||
|
|
grafana_id=grafana_id,
|
||
|
|
defaults={
|
||
|
|
"login": service_account_data["login"],
|
||
|
|
},
|
||
|
|
)
|
||
|
|
# create token
|
||
|
|
validated_token, _ = cls.objects.get_or_create(
|
||
|
|
service_account=service_account,
|
||
|
|
token_key=token_key,
|
||
|
|
digest=digest,
|
||
|
|
)
|
||
|
|
|
||
|
|
user = ServiceAccountUser(
|
||
|
|
organization=organization,
|
||
|
|
service_account=service_account,
|
||
|
|
username=service_account.username,
|
||
|
|
public_primary_key=service_account.public_primary_key,
|
||
|
|
role=LegacyAccessControlRole.NONE,
|
||
|
|
permissions=GrafanaAPIPermissions.construct_permissions(permissions.keys()),
|
||
|
|
)
|
||
|
|
|
||
|
|
return user, validated_token
|