oncall-engine/engine/apps/auth_token/auth.py
Ildar Iskhakov 1fc3f6d301
Refactor plugin sync (#1200)
# What this PR does

This PR adds a shortcut in the plugin synchronisation process, so the
existing users will be able login without waiting for the sync task.
Every request still starts the background synchronisation task, to be
able to propagate the organisation changes faster than periodic task. It
means that we don't necessarily need "force reload" button in the
interface.
For all the other cases (user does not exist, organisation token "not
ok", etc) process remains same - plugin will show "Initialising
plugin..." until the background task in successfully completed

Co-authored-by: Joey Orlando <joey.orlando@grafana.com>
2023-01-25 09:12:08 +08:00

229 lines
8.7 KiB
Python

import json
import logging
from typing import Tuple
from django.conf import settings
from django.contrib.auth.models import AnonymousUser
from rest_framework import exceptions
from rest_framework.authentication import BaseAuthentication, get_authorization_header
from rest_framework.request import Request
from apps.api.permissions import RBACPermission, user_is_authorized
from apps.grafana_plugin.helpers.gcom import check_token
from apps.user_management.exceptions import OrganizationDeletedException, OrganizationMovedException
from apps.user_management.models import User
from apps.user_management.models.organization import Organization
from .constants import SCHEDULE_EXPORT_TOKEN_NAME, SLACK_AUTH_TOKEN_NAME
from .exceptions import InvalidToken
from .models import ApiAuthToken, PluginAuthToken, ScheduleExportAuthToken, SlackAuthToken, UserScheduleExportAuthToken
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
class ApiTokenAuthentication(BaseAuthentication):
model = ApiAuthToken
def authenticate(self, request):
auth = get_authorization_header(request).decode("utf-8")
user, auth_token = self.authenticate_credentials(auth)
if not user_is_authorized(user, [RBACPermission.Permissions.API_KEYS_WRITE]):
raise exceptions.AuthenticationFailed(
"Only users with Admin permissions are allowed to perform this action."
)
return user, auth_token
def authenticate_credentials(self, token):
"""
Due to the random nature of hashing a value, this must inspect
each auth_token individually to find the correct one.
"""
try:
auth_token = self.model.validate_token_string(token)
except InvalidToken:
raise exceptions.AuthenticationFailed("Invalid token.")
if auth_token.organization.deleted_at:
raise OrganizationDeletedException(auth_token.organization)
if auth_token.organization.is_moved:
raise OrganizationMovedException(auth_token.organization)
return auth_token.user, auth_token
class PluginAuthentication(BaseAuthentication):
def authenticate_header(self, request):
# Check parent's method comments
return "Bearer"
def authenticate(self, request: Request) -> Tuple[User, PluginAuthToken]:
token_string = get_authorization_header(request).decode()
if not token_string:
raise exceptions.AuthenticationFailed("No token provided")
return self.authenticate_credentials(token_string, request)
def authenticate_credentials(self, token_string: str, request: Request) -> Tuple[User, PluginAuthToken]:
context_string = request.headers.get("X-Instance-Context")
if not context_string:
raise exceptions.AuthenticationFailed("No instance context provided.")
context = json.loads(context_string)
try:
auth_token = check_token(token_string, context=context)
if not auth_token.organization:
raise exceptions.AuthenticationFailed("No organization associated with token.")
except InvalidToken:
raise exceptions.AuthenticationFailed("Invalid token.")
user = self._get_user(request, auth_token.organization)
return user, auth_token
@staticmethod
def _get_user(request: Request, organization: Organization) -> User:
context = json.loads(request.headers.get("X-Grafana-Context"))
try:
user_id = context["UserId"]
except KeyError:
user_id = context["UserID"]
try:
return organization.users.get(user_id=user_id)
except User.DoesNotExist:
logger.debug(f"Could not get user from grafana request. Context {context}")
raise exceptions.AuthenticationFailed("Non-existent or anonymous user.")
@classmethod
def is_user_from_request_present_in_organization(cls, request: Request, organization: Organization) -> User:
try:
cls._get_user(request, organization)
return True
except exceptions.AuthenticationFailed:
return False
class GrafanaIncidentUser(AnonymousUser):
@property
def is_authenticated(self):
# Always return True. This is a way to tell if
# the user has been authenticated in permissions
return True
class GrafanaIncidentStaticKeyAuth(BaseAuthentication):
def authenticate_header(self, request): # noqa
# Check parent's method comments
return "Bearer"
def authenticate(self, request: Request) -> Tuple[GrafanaIncidentUser, None]:
token_string = get_authorization_header(request).decode()
if (
not token_string == settings.GRAFANA_INCIDENT_STATIC_API_KEY
or settings.GRAFANA_INCIDENT_STATIC_API_KEY is None
):
raise exceptions.AuthenticationFailed("Wrong token")
if not token_string:
raise exceptions.AuthenticationFailed("No token provided")
return self.authenticate_credentials(token_string, request)
def authenticate_credentials(self, token_string: str, request: Request) -> Tuple[GrafanaIncidentUser, None]:
try:
user = GrafanaIncidentUser()
except InvalidToken:
raise exceptions.AuthenticationFailed("Invalid token.")
return user, None
class SlackTokenAuthentication(BaseAuthentication):
model = SlackAuthToken
def authenticate(self, request) -> Tuple[User, SlackAuthToken]:
auth = request.query_params.get(SLACK_AUTH_TOKEN_NAME)
if not auth:
raise exceptions.AuthenticationFailed("Invalid token.")
user, auth_token = self.authenticate_credentials(auth)
return user, auth_token
def authenticate_credentials(self, token_string: str) -> Tuple[User, SlackAuthToken]:
try:
auth_token = self.model.validate_token_string(token_string)
except InvalidToken:
raise exceptions.AuthenticationFailed("Invalid token.")
return auth_token.user, auth_token
class ScheduleExportAuthentication(BaseAuthentication):
model = ScheduleExportAuthToken
def authenticate(self, request) -> Tuple[User, ScheduleExportAuthToken]:
auth = request.query_params.get(SCHEDULE_EXPORT_TOKEN_NAME)
public_primary_key = request.parser_context.get("kwargs", {}).get("pk")
if not auth:
raise exceptions.AuthenticationFailed("Invalid token.")
auth_token = self.authenticate_credentials(auth, public_primary_key)
return auth_token
def authenticate_credentials(
self, token_string: str, public_primary_key: str
) -> Tuple[User, ScheduleExportAuthToken]:
try:
auth_token = self.model.validate_token_string(token_string)
except InvalidToken:
raise exceptions.AuthenticationFailed("Invalid token.")
if auth_token.organization.deleted_at:
raise OrganizationDeletedException(auth_token.organization)
if auth_token.organization.is_moved:
raise OrganizationMovedException(auth_token.organization)
if auth_token.schedule.public_primary_key != public_primary_key:
raise exceptions.AuthenticationFailed("Invalid schedule export token for schedule")
if not auth_token.active:
raise exceptions.AuthenticationFailed("Export token is deactivated")
return auth_token.user, auth_token
class UserScheduleExportAuthentication(BaseAuthentication):
model = UserScheduleExportAuthToken
def authenticate(self, request) -> Tuple[User, UserScheduleExportAuthToken]:
auth = request.query_params.get(SCHEDULE_EXPORT_TOKEN_NAME)
public_primary_key = request.parser_context.get("kwargs", {}).get("pk")
if not auth:
raise exceptions.AuthenticationFailed("Invalid token.")
auth_token = self.authenticate_credentials(auth, public_primary_key)
return auth_token
def authenticate_credentials(
self, token_string: str, public_primary_key: str
) -> Tuple[User, UserScheduleExportAuthToken]:
try:
auth_token = self.model.validate_token_string(token_string)
except InvalidToken:
raise exceptions.AuthenticationFailed("Invalid token")
if auth_token.organization.deleted_at:
raise OrganizationDeletedException(auth_token.organization)
if auth_token.organization.is_moved:
raise OrganizationMovedException(auth_token.organization)
if auth_token.user.public_primary_key != public_primary_key:
raise exceptions.AuthenticationFailed("Invalid schedule export token for user")
if not auth_token.active:
raise exceptions.AuthenticationFailed("Export token is deactivated")
return auth_token.user, auth_token