oncall-engine/engine/apps/auth_token/models/service_account_token.py
Matias Bordese 132bdf235b
feat: update service account auth not to require rbac enabled org (#5360)
Related to https://github.com/grafana/oncall-private/issues/2826

RBAC enabled or not (OSS or cloud), it is possible to get service
account permissions, enabling perm check (for service account tokens) in
public API.

Also allow empty value for users in sync (instead of returning a 400
response).
2024-12-12 22:11:59 +00:00

106 lines
4.2 KiB
Python

import binascii
from hmac import compare_digest
from django.db import models
from apps.api.permissions import GrafanaAPIPermissions, LegacyAccessControlRole
from apps.auth_token import constants
from apps.auth_token.crypto import hash_token_string
from apps.auth_token.exceptions import InvalidToken
from apps.auth_token.grafana.grafana_auth_token import (
get_service_account_details,
get_service_account_token_permissions,
)
from apps.auth_token.models import BaseAuthToken
from apps.user_management.models import ServiceAccount, ServiceAccountUser
class ServiceAccountTokenManager(models.Manager):
def get_queryset(self):
return super().get_queryset().select_related("service_account__organization")
class ServiceAccountToken(BaseAuthToken):
GRAFANA_SA_PREFIX = "glsa_"
objects = ServiceAccountTokenManager()
service_account: "ServiceAccount"
service_account = models.ForeignKey(ServiceAccount, on_delete=models.CASCADE, related_name="tokens")
class Meta:
unique_together = ("token_key", "service_account", "digest")
@property
def organization(self):
return self.service_account.organization
@classmethod
def validate_token(cls, organization, token):
# Grafana API request: get permissions and confirm token is valid
permissions = get_service_account_token_permissions(organization, token)
if not permissions:
# NOTE: a token can be disabled/re-enabled (not setting as revoked in oncall DB for now)
raise InvalidToken
# check if we have already seen this token
validated_token = None
service_account = None
prefix_length = len(cls.GRAFANA_SA_PREFIX)
token_key = token[prefix_length : prefix_length + constants.TOKEN_KEY_LENGTH]
try:
hashable_token = binascii.hexlify(token.encode()).decode()
digest = hash_token_string(hashable_token)
except (TypeError, binascii.Error):
raise InvalidToken
for existing_token in cls.objects.filter(service_account__organization=organization, token_key=token_key):
if compare_digest(digest, existing_token.digest):
validated_token = existing_token
service_account = existing_token.service_account
break
if not validated_token:
# if it didn't match an existing token, create a new one
# make request to Grafana API api/user using token
service_account_data = get_service_account_details(organization, token)
if not service_account_data:
# Grafana versions < 11.3 return 403 trying to get user details with service account token
# use some default values
service_account_data = {
"login": "grafana_service_account",
"uid": None, # "service-account:7"
}
grafana_id = 0 # default to zero for old Grafana versions (to keep service account unique)
if service_account_data["uid"] is not None:
# extract service account Grafana ID
try:
grafana_id = int(service_account_data["uid"].split(":")[-1])
except ValueError:
pass
# get or create service account
service_account, _ = ServiceAccount.objects.get_or_create(
organization=organization,
grafana_id=grafana_id,
defaults={
"login": service_account_data["login"],
},
)
# create token
validated_token, _ = cls.objects.get_or_create(
service_account=service_account,
token_key=token_key,
digest=digest,
)
user = ServiceAccountUser(
organization=organization,
service_account=service_account,
username=service_account.username,
public_primary_key=service_account.public_primary_key,
role=LegacyAccessControlRole.NONE,
permissions=GrafanaAPIPermissions.construct_permissions(permissions.keys()),
)
return user, validated_token