oncall-engine/engine/apps/auth_token/models/plugin_auth_token.py
Vadim Stepanov c921674471
Improve plugin authentication (#1995)
# What this PR does
Handle different failing authentication scenarios (e.g. when token is
invalid or instance context is not a valid JSON) so endpoints return
appropriate response code (401 instead of 500).

## Which issue(s) this PR fixes
Related to https://github.com/grafana/oncall-private/issues/1633

## Checklist

- [x] Unit, integration, and e2e (if applicable) tests updated
- [x] Documentation added (or `pr:no public docs` PR label added if not
required)
- [x] `CHANGELOG.md` updated (or `pr:no changelog` PR label added if not
required)
2023-05-23 16:13:25 +00:00

55 lines
2 KiB
Python

import binascii
from hmac import compare_digest
from typing import Tuple
from django.db import models
from apps.auth_token import constants
from apps.auth_token.crypto import (
generate_plugin_token_string,
generate_plugin_token_string_and_salt,
hash_token_string,
)
from apps.auth_token.exceptions import InvalidToken
from apps.auth_token.models import BaseAuthToken
from apps.user_management.models import Organization
class PluginAuthToken(BaseAuthToken):
salt = models.CharField(max_length=constants.AUTH_TOKEN_CHARACTER_LENGTH, null=True)
organization = models.ForeignKey(to=Organization, on_delete=models.CASCADE)
@classmethod
def create_auth_token(cls, organization: Organization) -> Tuple["PluginAuthToken", str]:
old_token = cls.objects.filter(organization=organization)
if old_token.exists():
old_token.delete()
token_string, salt = generate_plugin_token_string_and_salt(organization.stack_id, organization.org_id)
digest = hash_token_string(token_string)
auth_token = cls.objects.create(
token_key=token_string[: constants.TOKEN_KEY_LENGTH],
digest=digest,
salt=salt,
organization=organization,
)
return auth_token, token_string
@classmethod
def validate_token_string(cls, token: str, *args, **kwargs) -> "PluginAuthToken":
context = kwargs["context"]
for auth_token in cls.objects.filter(token_key=token[: constants.TOKEN_KEY_LENGTH]):
try:
stack_id = int(context["stack_id"])
org_id = int(context["org_id"])
salt = binascii.unhexlify(auth_token.salt)
recreated_token = generate_plugin_token_string(salt, stack_id, org_id)
digest = hash_token_string(recreated_token)
except (TypeError, binascii.Error):
raise InvalidToken
if compare_digest(digest, auth_token.digest) and token == recreated_token:
return auth_token
raise InvalidToken