Enable service account token auth for public API (#5254)

Related to https://github.com/grafana/oncall-private/issues/2826

Continuing work started in https://github.com/grafana/oncall/pull/5211,
this adds support for Grafana service accounts tokens for API
authentication (except alert group actions which will still require a
user behind). Next steps would be updating the go client and the
terraform provider to allow service account token auth for OnCall
resources.

Following proposal 1.1 from
[doc](https://docs.google.com/document/d/1I3nFbsUEkiNPphBXT-kWefIeramTY71qqZ1OA06Kmls/edit?usp=sharing).
This commit is contained in:
Matias Bordese 2024-11-19 09:52:23 -03:00 committed by GitHub
parent 0c811e0249
commit 2bcbac8454
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
37 changed files with 816 additions and 74 deletions

View file

@ -0,0 +1,20 @@
# Generated by Django 4.2.15 on 2024-11-12 13:13
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('user_management', '0027_serviceaccount'),
('alerts', '0064_migrate_resolutionnoteslackmessage_slack_channel_id'),
]
operations = [
migrations.AddField(
model_name='alertreceivechannel',
name='service_account',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='alert_receive_channels', to='user_management.serviceaccount'),
),
]

View file

@ -234,6 +234,13 @@ class AlertReceiveChannel(IntegrationOptionsMixin, MaintainableObject):
author = models.ForeignKey(
"user_management.User", on_delete=models.SET_NULL, related_name="alert_receive_channels", blank=True, null=True
)
service_account = models.ForeignKey(
"user_management.ServiceAccount",
on_delete=models.SET_NULL,
related_name="alert_receive_channels",
blank=True,
null=True,
)
team = models.ForeignKey(
"user_management.Team",
on_delete=models.SET_NULL,
@ -764,15 +771,16 @@ def listen_for_alertreceivechannel_model_save(
from apps.heartbeat.models import IntegrationHeartBeat
if created:
write_resource_insight_log(instance=instance, author=instance.author, event=EntityEvent.CREATED)
author = instance.author or instance.service_account
write_resource_insight_log(instance=instance, author=author, event=EntityEvent.CREATED)
default_filter = ChannelFilter(alert_receive_channel=instance, filtering_term=None, is_default=True)
default_filter.save()
write_resource_insight_log(instance=default_filter, author=instance.author, event=EntityEvent.CREATED)
write_resource_insight_log(instance=default_filter, author=author, event=EntityEvent.CREATED)
TEN_MINUTES = 600 # this is timeout for cloud heartbeats
if instance.is_available_for_integration_heartbeat:
heartbeat = IntegrationHeartBeat.objects.create(alert_receive_channel=instance, timeout_seconds=TEN_MINUTES)
write_resource_insight_log(instance=heartbeat, author=instance.author, event=EntityEvent.CREATED)
write_resource_insight_log(instance=heartbeat, author=author, event=EntityEvent.CREATED)
metrics_add_integrations_to_cache([instance], instance.organization)

View file

@ -18,6 +18,7 @@ if typing.TYPE_CHECKING:
RBAC_PERMISSIONS_ATTR = "rbac_permissions"
RBAC_OBJECT_PERMISSIONS_ATTR = "rbac_object_permissions"
ViewSetOrAPIView = typing.Union[ViewSet, APIView]

View file

@ -9,7 +9,6 @@ from rest_framework import exceptions
from rest_framework.authentication import BaseAuthentication, get_authorization_header
from rest_framework.request import Request
from apps.api.permissions import GrafanaAPIPermissions, LegacyAccessControlRole
from apps.grafana_plugin.helpers.gcom import check_token
from apps.grafana_plugin.sync_data import SyncPermission, SyncUser
from apps.user_management.exceptions import OrganizationDeletedException, OrganizationMovedException
@ -20,13 +19,13 @@ from settings.base import SELF_HOSTED_SETTINGS
from .constants import GOOGLE_OAUTH2_AUTH_TOKEN_NAME, SCHEDULE_EXPORT_TOKEN_NAME, SLACK_AUTH_TOKEN_NAME
from .exceptions import InvalidToken
from .grafana.grafana_auth_token import get_service_account_token_permissions
from .models import (
ApiAuthToken,
GoogleOAuth2Token,
IntegrationBacksyncAuthToken,
PluginAuthToken,
ScheduleExportAuthToken,
ServiceAccountToken,
SlackAuthToken,
UserScheduleExportAuthToken,
)
@ -336,8 +335,8 @@ class UserScheduleExportAuthentication(BaseAuthentication):
return auth_token.user, auth_token
X_GRAFANA_URL = "X-Grafana-URL"
X_GRAFANA_INSTANCE_ID = "X-Grafana-Instance-ID"
GRAFANA_SA_PREFIX = "glsa_"
class GrafanaServiceAccountAuthentication(BaseAuthentication):
@ -345,7 +344,7 @@ class GrafanaServiceAccountAuthentication(BaseAuthentication):
auth = get_authorization_header(request).decode("utf-8")
if not auth:
raise exceptions.AuthenticationFailed("Invalid token.")
if not auth.startswith(GRAFANA_SA_PREFIX):
if not auth.startswith(ServiceAccountToken.GRAFANA_SA_PREFIX):
return None
organization = self.get_organization(request)
@ -359,6 +358,13 @@ class GrafanaServiceAccountAuthentication(BaseAuthentication):
return self.authenticate_credentials(organization, auth)
def get_organization(self, request):
grafana_url = request.headers.get(X_GRAFANA_URL)
if grafana_url:
organization = Organization.objects.filter(grafana_url=grafana_url).first()
if not organization:
raise exceptions.AuthenticationFailed("Invalid Grafana URL.")
return organization
if settings.LICENSE == settings.CLOUD_LICENSE_NAME:
instance_id = request.headers.get(X_GRAFANA_INSTANCE_ID)
if not instance_id:
@ -370,36 +376,13 @@ class GrafanaServiceAccountAuthentication(BaseAuthentication):
return Organization.objects.filter(org_slug=org_slug, stack_slug=instance_slug).first()
def authenticate_credentials(self, organization, token):
permissions = get_service_account_token_permissions(organization, token)
if not permissions:
try:
user, auth_token = ServiceAccountToken.validate_token(organization, token)
except InvalidToken:
raise exceptions.AuthenticationFailed("Invalid token.")
role = LegacyAccessControlRole.NONE
if not organization.is_rbac_permissions_enabled:
role = self.determine_role_from_permissions(permissions)
user = User(
organization_id=organization.pk,
name="Grafana Service Account",
username="grafana_service_account",
role=role,
permissions=GrafanaAPIPermissions.construct_permissions(permissions.keys()),
)
auth_token = ApiAuthToken(organization=organization, user=user, name="Grafana Service Account")
return user, auth_token
# Using default permissions as proxies for roles since we cannot explicitly get role from the service account token
def determine_role_from_permissions(self, permissions):
if "plugins:write" in permissions:
return LegacyAccessControlRole.ADMIN
if "dashboards:write" in permissions:
return LegacyAccessControlRole.EDITOR
if "dashboards:read" in permissions:
return LegacyAccessControlRole.VIEWER
return LegacyAccessControlRole.NONE
class IntegrationBacksyncAuthentication(BaseAuthentication):
model = IntegrationBacksyncAuthToken

View file

@ -46,3 +46,9 @@ def get_service_account_token_permissions(organization: Organization, token: str
grafana_api_client = GrafanaAPIClient(api_url=organization.grafana_url, api_token=token)
permissions, _ = grafana_api_client.get_service_account_token_permissions()
return permissions
def get_service_account_details(organization: Organization, token: str) -> typing.Dict[str, typing.List[str]]:
grafana_api_client = GrafanaAPIClient(api_url=organization.grafana_url, api_token=token)
user_data, _ = grafana_api_client.get_current_user()
return user_data

View file

@ -0,0 +1,29 @@
# Generated by Django 4.2.15 on 2024-11-12 13:13
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('user_management', '0027_serviceaccount'),
('auth_token', '0006_googleoauth2token'),
]
operations = [
migrations.CreateModel(
name='ServiceAccountToken',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('token_key', models.CharField(db_index=True, max_length=8)),
('digest', models.CharField(max_length=128)),
('created_at', models.DateTimeField(auto_now_add=True)),
('revoked_at', models.DateTimeField(null=True)),
('service_account', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='tokens', to='user_management.serviceaccount')),
],
options={
'unique_together': {('token_key', 'service_account', 'digest')},
},
),
]

View file

@ -4,5 +4,6 @@ from .google_oauth2_token import GoogleOAuth2Token # noqa: F401
from .integration_backsync_auth_token import IntegrationBacksyncAuthToken # noqa: F401
from .plugin_auth_token import PluginAuthToken # noqa: F401
from .schedule_export_auth_token import ScheduleExportAuthToken # noqa: F401
from .service_account_token import ServiceAccountToken # noqa: F401
from .slack_auth_token import SlackAuthToken # noqa: F401
from .user_schedule_export_auth_token import UserScheduleExportAuthToken # noqa: F401

View file

@ -0,0 +1,110 @@
import binascii
from hmac import compare_digest
from django.db import models
from apps.api.permissions import GrafanaAPIPermissions, LegacyAccessControlRole
from apps.auth_token import constants
from apps.auth_token.crypto import hash_token_string
from apps.auth_token.exceptions import InvalidToken
from apps.auth_token.grafana.grafana_auth_token import (
get_service_account_details,
get_service_account_token_permissions,
)
from apps.auth_token.models import BaseAuthToken
from apps.user_management.models import ServiceAccount, ServiceAccountUser
class ServiceAccountTokenManager(models.Manager):
def get_queryset(self):
return super().get_queryset().select_related("service_account__organization")
class ServiceAccountToken(BaseAuthToken):
GRAFANA_SA_PREFIX = "glsa_"
objects = ServiceAccountTokenManager()
service_account: "ServiceAccount"
service_account = models.ForeignKey(ServiceAccount, on_delete=models.CASCADE, related_name="tokens")
class Meta:
unique_together = ("token_key", "service_account", "digest")
@property
def organization(self):
return self.service_account.organization
@classmethod
def validate_token(cls, organization, token):
# require RBAC enabled to allow service account auth
if not organization.is_rbac_permissions_enabled:
raise InvalidToken
# Grafana API request: get permissions and confirm token is valid
permissions = get_service_account_token_permissions(organization, token)
if not permissions:
# NOTE: a token can be disabled/re-enabled (not setting as revoked in oncall DB for now)
raise InvalidToken
# check if we have already seen this token
validated_token = None
service_account = None
prefix_length = len(cls.GRAFANA_SA_PREFIX)
token_key = token[prefix_length : prefix_length + constants.TOKEN_KEY_LENGTH]
try:
hashable_token = binascii.hexlify(token.encode()).decode()
digest = hash_token_string(hashable_token)
except (TypeError, binascii.Error):
raise InvalidToken
for existing_token in cls.objects.filter(service_account__organization=organization, token_key=token_key):
if compare_digest(digest, existing_token.digest):
validated_token = existing_token
service_account = existing_token.service_account
break
if not validated_token:
# if it didn't match an existing token, create a new one
# make request to Grafana API api/user using token
service_account_data = get_service_account_details(organization, token)
if not service_account_data:
# Grafana versions < 11.3 return 403 trying to get user details with service account token
# use some default values
service_account_data = {
"login": "grafana_service_account",
"uid": None, # "service-account:7"
}
grafana_id = 0 # default to zero for old Grafana versions (to keep service account unique)
if service_account_data["uid"] is not None:
# extract service account Grafana ID
try:
grafana_id = int(service_account_data["uid"].split(":")[-1])
except ValueError:
pass
# get or create service account
service_account, _ = ServiceAccount.objects.get_or_create(
organization=organization,
grafana_id=grafana_id,
defaults={
"login": service_account_data["login"],
},
)
# create token
validated_token, _ = cls.objects.get_or_create(
service_account=service_account,
token_key=token_key,
digest=digest,
)
user = ServiceAccountUser(
organization=organization,
service_account=service_account,
username=service_account.username,
public_primary_key=service_account.public_primary_key,
role=LegacyAccessControlRole.NONE,
permissions=GrafanaAPIPermissions.construct_permissions(permissions.keys()),
)
return user, validated_token

View file

@ -0,0 +1,18 @@
import json
import httpretty
def setup_service_account_api_mocks(organization, perms=None, user_data=None, perms_status=200, user_status=200):
# requires enabling httpretty
if perms is None:
perms = {}
mock_response = httpretty.Response(status=perms_status, body=json.dumps(perms))
perms_url = f"{organization.grafana_url}/api/access-control/user/permissions"
httpretty.register_uri(httpretty.GET, perms_url, responses=[mock_response])
if user_data is None:
user_data = {"login": "some-login", "uid": "service-account:42"}
mock_response = httpretty.Response(status=user_status, body=json.dumps(user_data))
user_url = f"{organization.grafana_url}/api/user"
httpretty.register_uri(httpretty.GET, user_url, responses=[mock_response])

View file

@ -1,11 +1,16 @@
import typing
from unittest.mock import patch
import httpretty
import pytest
from rest_framework import exceptions
from rest_framework.test import APIRequestFactory
from apps.auth_token.auth import GRAFANA_SA_PREFIX, X_GRAFANA_INSTANCE_ID, GrafanaServiceAccountAuthentication
from apps.api.permissions import LegacyAccessControlRole
from apps.auth_token.auth import X_GRAFANA_INSTANCE_ID, GrafanaServiceAccountAuthentication
from apps.auth_token.models import ServiceAccountToken
from apps.auth_token.tests.helpers import setup_service_account_api_mocks
from apps.user_management.models import ServiceAccountUser
from settings.base import CLOUD_LICENSE_NAME, OPEN_SOURCE_LICENSE_NAME, SELF_HOSTED_SETTINGS
@ -53,7 +58,7 @@ def test_grafana_authentication_cloud_inputs(make_organization, settings):
mock.assert_called_once_with(organization, token)
def check_common_inputs() -> (dict[str, typing.Any], str):
def check_common_inputs() -> tuple[dict[str, typing.Any], str]:
request = APIRequestFactory().get("/")
with pytest.raises(exceptions.AuthenticationFailed):
GrafanaServiceAccountAuthentication().authenticate(request)
@ -65,7 +70,7 @@ def check_common_inputs() -> (dict[str, typing.Any], str):
result = GrafanaServiceAccountAuthentication().authenticate(request)
assert result is None
token = f"{GRAFANA_SA_PREFIX}xyz"
token = f"{ServiceAccountToken.GRAFANA_SA_PREFIX}xyz"
headers = {
"HTTP_AUTHORIZATION": token,
}
@ -74,3 +79,221 @@ def check_common_inputs() -> (dict[str, typing.Any], str):
GrafanaServiceAccountAuthentication().authenticate(request)
return headers, token
@pytest.mark.django_db
@httpretty.activate(verbose=True, allow_net_connect=False)
def test_grafana_authentication_missing_org():
token = f"{ServiceAccountToken.GRAFANA_SA_PREFIX}xyz"
headers = {
"HTTP_AUTHORIZATION": token,
}
request = APIRequestFactory().get("/", **headers)
with pytest.raises(exceptions.AuthenticationFailed) as exc:
GrafanaServiceAccountAuthentication().authenticate(request)
assert exc.value.detail == "Invalid organization."
@pytest.mark.django_db
@httpretty.activate(verbose=True, allow_net_connect=False)
def test_grafana_authentication_invalid_grafana_url():
token = f"{ServiceAccountToken.GRAFANA_SA_PREFIX}xyz"
headers = {
"HTTP_AUTHORIZATION": token,
"HTTP_X_GRAFANA_URL": "http://grafana.test", # no org for this URL
}
request = APIRequestFactory().get("/", **headers)
with pytest.raises(exceptions.AuthenticationFailed) as exc:
GrafanaServiceAccountAuthentication().authenticate(request)
assert exc.value.detail == "Invalid Grafana URL."
@pytest.mark.django_db
@httpretty.activate(verbose=True, allow_net_connect=False)
def test_grafana_authentication_rbac_disabled_fails(make_organization):
organization = make_organization(grafana_url="http://grafana.test")
if organization.is_rbac_permissions_enabled:
return
token = f"{ServiceAccountToken.GRAFANA_SA_PREFIX}xyz"
headers = {
"HTTP_AUTHORIZATION": token,
"HTTP_X_GRAFANA_URL": organization.grafana_url,
}
request = APIRequestFactory().get("/", **headers)
with pytest.raises(exceptions.AuthenticationFailed) as exc:
GrafanaServiceAccountAuthentication().authenticate(request)
assert exc.value.detail == "Invalid token."
@pytest.mark.django_db
@httpretty.activate(verbose=True, allow_net_connect=False)
def test_grafana_authentication_permissions_call_fails(make_organization):
organization = make_organization(grafana_url="http://grafana.test")
if not organization.is_rbac_permissions_enabled:
return
token = f"{ServiceAccountToken.GRAFANA_SA_PREFIX}xyz"
headers = {
"HTTP_AUTHORIZATION": token,
"HTTP_X_GRAFANA_URL": organization.grafana_url,
}
request = APIRequestFactory().get("/", **headers)
# setup Grafana API responses
# permissions endpoint returns a 401
setup_service_account_api_mocks(organization, perms_status=401)
with pytest.raises(exceptions.AuthenticationFailed) as exc:
GrafanaServiceAccountAuthentication().authenticate(request)
assert exc.value.detail == "Invalid token."
last_request = httpretty.last_request()
assert last_request.method == "GET"
expected_url = f"{organization.grafana_url}/api/access-control/user/permissions"
assert last_request.url == expected_url
# the request uses the given token
assert last_request.headers["Authorization"] == f"Bearer {token}"
@pytest.mark.django_db
@httpretty.activate(verbose=True, allow_net_connect=False)
def test_grafana_authentication_existing_token(
make_organization, make_service_account_for_organization, make_token_for_service_account
):
organization = make_organization(grafana_url="http://grafana.test")
if not organization.is_rbac_permissions_enabled:
return
service_account = make_service_account_for_organization(organization)
token_string = "glsa_the-token"
token = make_token_for_service_account(service_account, token_string)
headers = {
"HTTP_AUTHORIZATION": token_string,
"HTTP_X_GRAFANA_URL": organization.grafana_url,
}
request = APIRequestFactory().get("/", **headers)
# setup Grafana API responses
setup_service_account_api_mocks(organization, {"some-perm": "value"})
user, auth_token = GrafanaServiceAccountAuthentication().authenticate(request)
assert isinstance(user, ServiceAccountUser)
assert user.service_account == service_account
assert user.public_primary_key == service_account.public_primary_key
assert user.username == service_account.username
assert user.role == LegacyAccessControlRole.NONE
assert auth_token == token
last_request = httpretty.last_request()
assert last_request.method == "GET"
expected_url = f"{organization.grafana_url}/api/access-control/user/permissions"
assert last_request.url == expected_url
# the request uses the given token
assert last_request.headers["Authorization"] == f"Bearer {token_string}"
@pytest.mark.django_db
@httpretty.activate(verbose=True, allow_net_connect=False)
def test_grafana_authentication_token_created(make_organization):
organization = make_organization(grafana_url="http://grafana.test")
if not organization.is_rbac_permissions_enabled:
return
token_string = "glsa_the-token"
headers = {
"HTTP_AUTHORIZATION": token_string,
"HTTP_X_GRAFANA_URL": organization.grafana_url,
}
request = APIRequestFactory().get("/", **headers)
# setup Grafana API responses
permissions = {"some-perm": "value"}
user_data = {"login": "some-login", "uid": "service-account:42"}
setup_service_account_api_mocks(organization, permissions, user_data)
user, auth_token = GrafanaServiceAccountAuthentication().authenticate(request)
assert isinstance(user, ServiceAccountUser)
service_account = user.service_account
assert service_account.organization == organization
assert user.public_primary_key == service_account.public_primary_key
assert user.username == service_account.username
assert service_account.grafana_id == 42
assert service_account.login == "some-login"
assert user.role == LegacyAccessControlRole.NONE
assert user.permissions == [{"action": p} for p in permissions]
assert auth_token.service_account == user.service_account
perms_request, user_request = httpretty.latest_requests()
for req in (perms_request, user_request):
assert req.method == "GET"
assert req.headers["Authorization"] == f"Bearer {token_string}"
perms_url = f"{organization.grafana_url}/api/access-control/user/permissions"
assert perms_request.url == perms_url
user_url = f"{organization.grafana_url}/api/user"
assert user_request.url == user_url
@pytest.mark.django_db
@httpretty.activate(verbose=True, allow_net_connect=False)
def test_grafana_authentication_token_created_older_grafana(make_organization):
organization = make_organization(grafana_url="http://grafana.test")
if not organization.is_rbac_permissions_enabled:
return
token_string = "glsa_the-token"
headers = {
"HTTP_AUTHORIZATION": token_string,
"HTTP_X_GRAFANA_URL": organization.grafana_url,
}
request = APIRequestFactory().get("/", **headers)
# setup Grafana API responses
permissions = {"some-perm": "value"}
# User API fails for older Grafana versions
setup_service_account_api_mocks(organization, permissions, user_status=400)
user, auth_token = GrafanaServiceAccountAuthentication().authenticate(request)
assert isinstance(user, ServiceAccountUser)
service_account = user.service_account
assert service_account.organization == organization
# use fallback data
assert service_account.grafana_id == 0
assert service_account.login == "grafana_service_account"
assert auth_token.service_account == user.service_account
@pytest.mark.django_db
@httpretty.activate(verbose=True, allow_net_connect=False)
def test_grafana_authentication_token_reuse_service_account(make_organization, make_service_account_for_organization):
organization = make_organization(grafana_url="http://grafana.test")
if not organization.is_rbac_permissions_enabled:
return
service_account = make_service_account_for_organization(organization)
token_string = "glsa_the-token"
headers = {
"HTTP_AUTHORIZATION": token_string,
"HTTP_X_GRAFANA_URL": organization.grafana_url,
}
request = APIRequestFactory().get("/", **headers)
# setup Grafana API responses
permissions = {"some-perm": "value"}
user_data = {
"login": service_account.login,
"uid": f"service-account:{service_account.grafana_id}",
}
setup_service_account_api_mocks(organization, permissions, user_data)
user, auth_token = GrafanaServiceAccountAuthentication().authenticate(request)
assert isinstance(user, ServiceAccountUser)
assert user.service_account == service_account
assert auth_token.service_account == service_account

View file

@ -315,6 +315,9 @@ class GrafanaAPIClient(APIClient):
def get_grafana_irm_plugin_settings(self) -> APIClientResponse["GrafanaAPIClient.Types.PluginSettings"]:
return self.get_grafana_plugin_settings(PluginID.IRM)
def get_current_user(self) -> APIClientResponse[typing.Dict[str, typing.List[str]]]:
return self.api_get("api/user")
def get_service_account(self, login: str) -> APIClientResponse["GrafanaAPIClient.Types.ServiceAccountResponse"]:
return self.api_get(f"api/serviceaccounts/search?query={login}")

View file

@ -7,6 +7,7 @@ from apps.alerts.grafana_alerting_sync_manager.grafana_alerting_sync import Graf
from apps.alerts.models import AlertReceiveChannel
from apps.base.messaging import get_messaging_backends
from apps.integrations.legacy_prefix import has_legacy_prefix, remove_legacy_prefix
from apps.user_management.models import ServiceAccountUser
from common.api_helpers.custom_fields import TeamPrimaryKeyRelatedField
from common.api_helpers.exceptions import BadRequest
from common.api_helpers.mixins import PHONE_CALL, SLACK, SMS, TELEGRAM, WEB, EagerLoadingMixin
@ -123,11 +124,13 @@ class IntegrationSerializer(EagerLoadingMixin, serializers.ModelSerializer, Main
connection_error = GrafanaAlertingSyncManager.check_for_connection_errors(organization)
if connection_error:
raise serializers.ValidationError(connection_error)
user = self.context["request"].user
with transaction.atomic():
try:
instance = AlertReceiveChannel.create(
**validated_data,
author=self.context["request"].user,
author=user if not isinstance(user, ServiceAccountUser) else None,
service_account=user.service_account if isinstance(user, ServiceAccountUser) else None,
organization=organization,
)
except AlertReceiveChannel.DuplicateDirectPagingError:

View file

@ -1,5 +1,6 @@
from unittest.mock import patch
import httpretty
import pytest
from django.urls import reverse
from django.utils import timezone
@ -9,6 +10,8 @@ from rest_framework.test import APIClient
from apps.alerts.constants import ActionSource
from apps.alerts.models import AlertGroup, AlertReceiveChannel
from apps.alerts.tasks import delete_alert_group, wipe
from apps.api import permissions
from apps.auth_token.tests.helpers import setup_service_account_api_mocks
def construct_expected_response_from_alert_groups(alert_groups):
@ -736,3 +739,34 @@ def test_alert_group_unsilence(
assert alert_group.silenced == silenced
assert response.status_code == status_code
assert response_msg == response.json()["detail"]
@pytest.mark.django_db
@httpretty.activate(verbose=True, allow_net_connect=False)
def test_actions_disabled_for_service_accounts(
make_organization,
make_service_account_for_organization,
make_token_for_service_account,
make_escalation_chain,
):
organization = make_organization(grafana_url="http://grafana.test")
service_account = make_service_account_for_organization(organization)
token_string = "glsa_token"
make_token_for_service_account(service_account, token_string)
make_escalation_chain(organization)
perms = {
permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE.value: ["*"],
}
setup_service_account_api_mocks(organization, perms=perms)
client = APIClient()
disabled_actions = ["acknowledge", "unacknowledge", "resolve", "unresolve", "silence", "unsilence"]
for action in disabled_actions:
url = reverse(f"api-public:alert_groups-{action}", kwargs={"pk": "ABCDEFG"})
response = client.post(
url,
HTTP_AUTHORIZATION=f"{token_string}",
HTTP_X_GRAFANA_URL=organization.grafana_url,
)
assert response.status_code == status.HTTP_403_FORBIDDEN

View file

@ -1,9 +1,12 @@
import httpretty
import pytest
from django.urls import reverse
from rest_framework import status
from rest_framework.test import APIClient
from apps.alerts.models import AlertReceiveChannel
from apps.api import permissions
from apps.auth_token.tests.helpers import setup_service_account_api_mocks
from apps.base.tests.messaging_backend import TestOnlyBackend
TEST_MESSAGING_BACKEND_FIELD = TestOnlyBackend.backend_id.lower()
@ -104,6 +107,47 @@ def test_create_integration(
assert response.status_code == status.HTTP_201_CREATED
@pytest.mark.django_db
@httpretty.activate(verbose=True, allow_net_connect=False)
def test_create_integration_via_service_account(
make_organization,
make_service_account_for_organization,
make_token_for_service_account,
make_escalation_chain,
):
organization = make_organization(grafana_url="http://grafana.test")
service_account = make_service_account_for_organization(organization)
token_string = "glsa_token"
make_token_for_service_account(service_account, token_string)
make_escalation_chain(organization)
perms = {
permissions.RBACPermission.Permissions.INTEGRATIONS_WRITE.value: ["*"],
}
setup_service_account_api_mocks(organization, perms)
client = APIClient()
data_for_create = {
"type": "grafana",
"name": "grafana_created",
"team_id": None,
}
url = reverse("api-public:integrations-list")
response = client.post(
url,
data=data_for_create,
format="json",
HTTP_AUTHORIZATION=f"{token_string}",
HTTP_X_GRAFANA_URL=organization.grafana_url,
)
if not organization.is_rbac_permissions_enabled:
assert response.status_code == status.HTTP_403_FORBIDDEN
else:
assert response.status_code == status.HTTP_201_CREATED
integration = AlertReceiveChannel.objects.get(public_primary_key=response.data["id"])
assert integration.service_account == service_account
@pytest.mark.django_db
def test_integration_name_uniqueness(
make_organization_and_user_with_token,

View file

@ -1,5 +1,7 @@
import json
from unittest.mock import patch
import httpretty
import pytest
from django.urls import reverse
from rest_framework import status
@ -9,6 +11,13 @@ from rest_framework.test import APIClient
from apps.api.permissions import GrafanaAPIPermission, LegacyAccessControlRole, get_most_authorized_role
from apps.public_api.urls import router
VIEWS_REQUIRING_USER_AUTH = (
"EscalationView",
"PersonalNotificationView",
"MakeCallView",
"SendSMSView",
)
@pytest.mark.parametrize(
"rbac_enabled,role,give_perm",
@ -96,3 +105,98 @@ def test_rbac_permissions(
with patch(method_path, return_value=success):
response = client.generic(path=url, method=http_method, HTTP_AUTHORIZATION=token)
assert response.status_code == expected
@pytest.mark.parametrize(
"rbac_enabled,role,give_perm",
[
# rbac disabled: auth is disabled
(False, LegacyAccessControlRole.ADMIN, None),
# rbac enabled: having role None, check the perm is required
(True, LegacyAccessControlRole.NONE, False),
(True, LegacyAccessControlRole.NONE, True),
],
)
@pytest.mark.django_db
@httpretty.activate(verbose=True, allow_net_connect=False)
def test_service_account_auth(
make_organization,
make_service_account_for_organization,
make_token_for_service_account,
rbac_enabled,
role,
give_perm,
):
# APIView default actions
# (name, http method, detail-based)
default_actions = {
"create": ("post", False),
"list": ("get", False),
"retrieve": ("get", True),
"update": ("put", True),
"partial_update": ("patch", True),
"destroy": ("delete", True),
}
organization = make_organization(grafana_url="http://grafana.test")
service_account = make_service_account_for_organization(organization)
token_string = "glsa_token"
make_token_for_service_account(service_account, token_string)
if organization.is_rbac_permissions_enabled != rbac_enabled:
# skip if the organization's rbac_enabled is not the expected by the test
return
client = APIClient()
# check all actions for all public API viewsets
for _, viewset, _basename in router.registry:
if viewset.__name__ == "ActionView":
# old actions (webhooks) are deprecated, no RBAC or service account support
continue
for viewset_method_name, required_perms in viewset.rbac_permissions.items():
# setup Grafana API permissions response
if rbac_enabled:
permissions = {"perm": "value"}
expected = status.HTTP_403_FORBIDDEN
if give_perm:
permissions = {perm.value: "value" for perm in required_perms}
expected = status.HTTP_200_OK
mock_response = httpretty.Response(status=200, body=json.dumps(permissions))
perms_url = f"{organization.grafana_url}/api/access-control/user/permissions"
httpretty.register_uri(httpretty.GET, perms_url, responses=[mock_response])
else:
# service account auth is disabled
expected = status.HTTP_403_FORBIDDEN
# iterate over all viewset actions, making an API request for each,
# using the user's token and confirming the response status code
if viewset_method_name in default_actions:
http_method, detail = default_actions[viewset_method_name]
else:
action_method = getattr(viewset, viewset_method_name)
http_method = list(action_method.mapping.keys())[0]
detail = action_method.detail
method_path = f"{viewset.__module__}.{viewset.__name__}.{viewset_method_name}"
success = Response(status=status.HTTP_200_OK)
kwargs = {"pk": "NONEXISTENT"} if detail else None
if viewset_method_name in default_actions and detail:
url = reverse(f"api-public:{_basename}-detail", kwargs=kwargs)
elif viewset_method_name in default_actions and not detail:
url = reverse(f"api-public:{_basename}-list", kwargs=kwargs)
else:
name = viewset_method_name.replace("_", "-")
url = reverse(f"api-public:{_basename}-{name}", kwargs=kwargs)
with patch(method_path, return_value=success):
headers = {
"HTTP_AUTHORIZATION": token_string,
"HTTP_X_GRAFANA_URL": organization.grafana_url,
}
response = client.generic(path=url, method=http_method, **headers)
assert (
response.status_code == expected
if viewset.__name__ not in VIEWS_REQUIRING_USER_AUTH
# user-specific APIs do not support service account auth
else status.HTTP_403_FORBIDDEN
)

View file

@ -6,8 +6,8 @@ from rest_framework import status
from rest_framework.test import APIClient
from apps.alerts.models import ResolutionNote
from apps.auth_token.auth import GRAFANA_SA_PREFIX, ApiTokenAuthentication, GrafanaServiceAccountAuthentication
from apps.auth_token.models import ApiAuthToken
from apps.auth_token.auth import ApiTokenAuthentication, GrafanaServiceAccountAuthentication
from apps.auth_token.models import ApiAuthToken, ServiceAccountToken
@pytest.mark.django_db
@ -366,7 +366,7 @@ def test_create_resolution_note_grafana_auth(make_organization_and_user, make_al
mock_api_key_auth.assert_called_once()
assert response.status_code == status.HTTP_403_FORBIDDEN
token = f"{GRAFANA_SA_PREFIX}123"
token = f"{ServiceAccountToken.GRAFANA_SA_PREFIX}123"
# GrafanaServiceAccountAuthentication handle invalid token
with patch(
"apps.auth_token.auth.ApiTokenAuthentication.authenticate", wraps=api_token_auth.authenticate

View file

@ -12,12 +12,13 @@ from apps.alerts.models import AlertGroup, AlertReceiveChannel
from apps.alerts.tasks import delete_alert_group, wipe
from apps.api.label_filtering import parse_label_query
from apps.api.permissions import RBACPermission
from apps.auth_token.auth import ApiTokenAuthentication
from apps.auth_token.auth import ApiTokenAuthentication, GrafanaServiceAccountAuthentication
from apps.public_api.constants import VALID_DATE_FOR_DELETE_INCIDENT
from apps.public_api.helpers import is_valid_group_creation_date, team_has_slack_token_for_deleting
from apps.public_api.serializers import AlertGroupSerializer
from apps.public_api.throttlers.user_throttle import UserThrottle
from common.api_helpers.exceptions import BadRequest
from apps.user_management.models import ServiceAccountUser
from common.api_helpers.exceptions import BadRequest, Forbidden
from common.api_helpers.filters import (
NO_TEAM_VALUE,
ByTeamModelFieldFilterMixin,
@ -57,7 +58,7 @@ class AlertGroupView(
mixins.DestroyModelMixin,
GenericViewSet,
):
authentication_classes = (ApiTokenAuthentication,)
authentication_classes = (GrafanaServiceAccountAuthentication, ApiTokenAuthentication)
permission_classes = (IsAuthenticated, RBACPermission)
rbac_permissions = {
@ -170,6 +171,9 @@ class AlertGroupView(
@action(methods=["post"], detail=True)
def acknowledge(self, request, pk):
if isinstance(request.user, ServiceAccountUser):
raise Forbidden(detail="Service accounts are not allowed to acknowledge alert groups")
alert_group = self.get_object()
if alert_group.acknowledged:
@ -189,6 +193,9 @@ class AlertGroupView(
@action(methods=["post"], detail=True)
def unacknowledge(self, request, pk):
if isinstance(request.user, ServiceAccountUser):
raise Forbidden(detail="Service accounts are not allowed to unacknowledge alert groups")
alert_group = self.get_object()
if not alert_group.acknowledged:
@ -208,6 +215,9 @@ class AlertGroupView(
@action(methods=["post"], detail=True)
def resolve(self, request, pk):
if isinstance(request.user, ServiceAccountUser):
raise Forbidden(detail="Service accounts are not allowed to resolve alert groups")
alert_group = self.get_object()
if alert_group.resolved:
@ -225,6 +235,9 @@ class AlertGroupView(
@action(methods=["post"], detail=True)
def unresolve(self, request, pk):
if isinstance(request.user, ServiceAccountUser):
raise Forbidden(detail="Service accounts are not allowed to unresolve alert groups")
alert_group = self.get_object()
if not alert_group.resolved:
@ -241,6 +254,9 @@ class AlertGroupView(
@action(methods=["post"], detail=True)
def silence(self, request, pk=None):
if isinstance(request.user, ServiceAccountUser):
raise Forbidden(detail="Service accounts are not allowed to silence alert groups")
alert_group = self.get_object()
delay = request.data.get("delay")
@ -267,6 +283,9 @@ class AlertGroupView(
@action(methods=["post"], detail=True)
def unsilence(self, request, pk=None):
if isinstance(request.user, ServiceAccountUser):
raise Forbidden(detail="Service accounts are not allowed to unsilence alert groups")
alert_group = self.get_object()
if not alert_group.silenced:

View file

@ -7,7 +7,7 @@ from rest_framework.viewsets import GenericViewSet
from apps.alerts.models import Alert
from apps.api.permissions import RBACPermission
from apps.auth_token.auth import ApiTokenAuthentication
from apps.auth_token.auth import ApiTokenAuthentication, GrafanaServiceAccountAuthentication
from apps.public_api.serializers.alerts import AlertSerializer
from apps.public_api.throttlers.user_throttle import UserThrottle
from common.api_helpers.mixins import RateLimitHeadersMixin
@ -19,7 +19,7 @@ class AlertFilter(filters.FilterSet):
class AlertView(RateLimitHeadersMixin, mixins.ListModelMixin, GenericViewSet):
authentication_classes = (ApiTokenAuthentication,)
authentication_classes = (GrafanaServiceAccountAuthentication, ApiTokenAuthentication)
permission_classes = (IsAuthenticated, RBACPermission)
rbac_permissions = {

View file

@ -5,7 +5,7 @@ from rest_framework.viewsets import ModelViewSet
from apps.alerts.models import EscalationChain
from apps.api.permissions import RBACPermission
from apps.auth_token.auth import ApiTokenAuthentication
from apps.auth_token.auth import ApiTokenAuthentication, GrafanaServiceAccountAuthentication
from apps.public_api.serializers import EscalationChainSerializer
from apps.public_api.throttlers.user_throttle import UserThrottle
from common.api_helpers.filters import ByTeamFilter
@ -15,7 +15,7 @@ from common.insight_log import EntityEvent, write_resource_insight_log
class EscalationChainView(RateLimitHeadersMixin, ModelViewSet):
authentication_classes = (ApiTokenAuthentication,)
authentication_classes = (GrafanaServiceAccountAuthentication, ApiTokenAuthentication)
permission_classes = (IsAuthenticated, RBACPermission)
rbac_permissions = {

View file

@ -5,7 +5,7 @@ from rest_framework.viewsets import ModelViewSet
from apps.alerts.models import EscalationPolicy
from apps.api.permissions import RBACPermission
from apps.auth_token.auth import ApiTokenAuthentication
from apps.auth_token.auth import ApiTokenAuthentication, GrafanaServiceAccountAuthentication
from apps.public_api.serializers import EscalationPolicySerializer, EscalationPolicyUpdateSerializer
from apps.public_api.throttlers.user_throttle import UserThrottle
from common.api_helpers.mixins import RateLimitHeadersMixin, UpdateSerializerMixin
@ -14,7 +14,7 @@ from common.insight_log import EntityEvent, write_resource_insight_log
class EscalationPolicyView(RateLimitHeadersMixin, UpdateSerializerMixin, ModelViewSet):
authentication_classes = (ApiTokenAuthentication,)
authentication_classes = (GrafanaServiceAccountAuthentication, ApiTokenAuthentication)
permission_classes = (IsAuthenticated, RBACPermission)
rbac_permissions = {

View file

@ -5,7 +5,7 @@ from rest_framework.viewsets import ModelViewSet
from apps.alerts.models import AlertReceiveChannel
from apps.api.permissions import RBACPermission
from apps.auth_token.auth import ApiTokenAuthentication
from apps.auth_token.auth import ApiTokenAuthentication, GrafanaServiceAccountAuthentication
from apps.public_api.serializers import IntegrationSerializer, IntegrationUpdateSerializer
from apps.public_api.throttlers.user_throttle import UserThrottle
from common.api_helpers.exceptions import BadRequest
@ -24,7 +24,7 @@ class IntegrationView(
MaintainableObjectMixin,
ModelViewSet,
):
authentication_classes = (ApiTokenAuthentication,)
authentication_classes = (GrafanaServiceAccountAuthentication, ApiTokenAuthentication)
permission_classes = (IsAuthenticated, RBACPermission)
rbac_permissions = {

View file

@ -5,7 +5,7 @@ from rest_framework.permissions import IsAuthenticated
from rest_framework.viewsets import ModelViewSet
from apps.api.permissions import RBACPermission
from apps.auth_token.auth import ApiTokenAuthentication
from apps.auth_token.auth import ApiTokenAuthentication, GrafanaServiceAccountAuthentication
from apps.public_api.serializers import CustomOnCallShiftSerializer, CustomOnCallShiftUpdateSerializer
from apps.public_api.throttlers.user_throttle import UserThrottle
from apps.schedules.models import CustomOnCallShift
@ -16,7 +16,7 @@ from common.insight_log import EntityEvent, write_resource_insight_log
class CustomOnCallShiftView(RateLimitHeadersMixin, UpdateSerializerMixin, ModelViewSet):
authentication_classes = (ApiTokenAuthentication,)
authentication_classes = (GrafanaServiceAccountAuthentication, ApiTokenAuthentication)
permission_classes = (IsAuthenticated, RBACPermission)
rbac_permissions = {

View file

@ -3,7 +3,7 @@ from rest_framework.settings import api_settings
from rest_framework.viewsets import ReadOnlyModelViewSet
from apps.api.permissions import RBACPermission
from apps.auth_token.auth import ApiTokenAuthentication
from apps.auth_token.auth import ApiTokenAuthentication, GrafanaServiceAccountAuthentication
from apps.public_api.serializers import OrganizationSerializer
from apps.public_api.throttlers.user_throttle import UserThrottle
from apps.user_management.models import Organization
@ -15,7 +15,7 @@ class OrganizationView(
RateLimitHeadersMixin,
ReadOnlyModelViewSet,
):
authentication_classes = (ApiTokenAuthentication,)
authentication_classes = (GrafanaServiceAccountAuthentication, ApiTokenAuthentication)
permission_classes = (IsAuthenticated, RBACPermission)
rbac_permissions = {

View file

@ -7,7 +7,7 @@ from rest_framework.viewsets import ModelViewSet
from apps.alerts.models import ChannelFilter
from apps.api.permissions import RBACPermission
from apps.auth_token.auth import ApiTokenAuthentication
from apps.auth_token.auth import ApiTokenAuthentication, GrafanaServiceAccountAuthentication
from apps.public_api.serializers import ChannelFilterSerializer, ChannelFilterUpdateSerializer
from apps.public_api.throttlers.user_throttle import UserThrottle
from common.api_helpers.exceptions import BadRequest
@ -17,7 +17,7 @@ from common.insight_log import EntityEvent, write_resource_insight_log
class ChannelFilterView(RateLimitHeadersMixin, UpdateSerializerMixin, ModelViewSet):
authentication_classes = (ApiTokenAuthentication,)
authentication_classes = (GrafanaServiceAccountAuthentication, ApiTokenAuthentication)
permission_classes = (IsAuthenticated, RBACPermission)
rbac_permissions = {

View file

@ -9,7 +9,11 @@ from rest_framework.views import Response
from rest_framework.viewsets import ModelViewSet
from apps.api.permissions import RBACPermission
from apps.auth_token.auth import ApiTokenAuthentication, ScheduleExportAuthentication
from apps.auth_token.auth import (
ApiTokenAuthentication,
GrafanaServiceAccountAuthentication,
ScheduleExportAuthentication,
)
from apps.public_api.custom_renderers import CalendarRenderer
from apps.public_api.serializers import PolymorphicScheduleSerializer, PolymorphicScheduleUpdateSerializer
from apps.public_api.serializers.schedules_base import FinalShiftQueryParamsSerializer
@ -28,7 +32,7 @@ logger = logging.getLogger(__name__)
class OnCallScheduleChannelView(RateLimitHeadersMixin, UpdateSerializerMixin, ModelViewSet):
authentication_classes = (ApiTokenAuthentication,)
authentication_classes = (GrafanaServiceAccountAuthentication, ApiTokenAuthentication)
permission_classes = (IsAuthenticated, RBACPermission)
rbac_permissions = {

View file

@ -10,7 +10,7 @@ from rest_framework.serializers import BaseSerializer
from apps.api.permissions import AuthenticatedRequest, RBACPermission
from apps.api.views.shift_swap import BaseShiftSwapViewSet
from apps.auth_token.auth import ApiTokenAuthentication
from apps.auth_token.auth import ApiTokenAuthentication, GrafanaServiceAccountAuthentication
from apps.public_api.throttlers.user_throttle import UserThrottle
from apps.schedules.models import ShiftSwapRequest
from apps.user_management.models import User
@ -23,7 +23,7 @@ logger = logging.getLogger(__name__)
class ShiftSwapViewSet(RateLimitHeadersMixin, BaseShiftSwapViewSet):
# set authentication and permission classes
authentication_classes = (ApiTokenAuthentication,)
authentication_classes = (GrafanaServiceAccountAuthentication, ApiTokenAuthentication)
permission_classes = (IsAuthenticated, RBACPermission)
rbac_permissions = {

View file

@ -3,7 +3,7 @@ from rest_framework.permissions import IsAuthenticated
from rest_framework.viewsets import GenericViewSet
from apps.api.permissions import RBACPermission
from apps.auth_token.auth import ApiTokenAuthentication
from apps.auth_token.auth import ApiTokenAuthentication, GrafanaServiceAccountAuthentication
from apps.public_api.serializers.slack_channel import SlackChannelSerializer
from apps.public_api.throttlers.user_throttle import UserThrottle
from apps.slack.models import SlackChannel
@ -12,7 +12,7 @@ from common.api_helpers.paginators import FiftyPageSizePaginator
class SlackChannelView(RateLimitHeadersMixin, mixins.ListModelMixin, GenericViewSet):
authentication_classes = (ApiTokenAuthentication,)
authentication_classes = (GrafanaServiceAccountAuthentication, ApiTokenAuthentication)
permission_classes = (IsAuthenticated, RBACPermission)
rbac_permissions = {

View file

@ -3,7 +3,7 @@ from rest_framework.mixins import ListModelMixin, RetrieveModelMixin
from rest_framework.permissions import IsAuthenticated
from apps.api.permissions import RBACPermission
from apps.auth_token.auth import ApiTokenAuthentication
from apps.auth_token.auth import ApiTokenAuthentication, GrafanaServiceAccountAuthentication
from apps.public_api.serializers.teams import TeamSerializer
from apps.public_api.tf_sync import is_request_from_terraform, sync_teams_on_tf_request
from apps.public_api.throttlers.user_throttle import UserThrottle
@ -14,7 +14,7 @@ from common.api_helpers.paginators import FiftyPageSizePaginator
class TeamView(PublicPrimaryKeyMixin, RetrieveModelMixin, ListModelMixin, viewsets.GenericViewSet):
serializer_class = TeamSerializer
authentication_classes = (ApiTokenAuthentication,)
authentication_classes = (GrafanaServiceAccountAuthentication, ApiTokenAuthentication)
permission_classes = (IsAuthenticated, RBACPermission)
rbac_permissions = {

View file

@ -3,7 +3,7 @@ from rest_framework.permissions import IsAuthenticated
from rest_framework.viewsets import GenericViewSet
from apps.api.permissions import RBACPermission
from apps.auth_token.auth import ApiTokenAuthentication
from apps.auth_token.auth import ApiTokenAuthentication, GrafanaServiceAccountAuthentication
from apps.public_api.serializers.user_groups import UserGroupSerializer
from apps.public_api.throttlers.user_throttle import UserThrottle
from apps.slack.models import SlackUserGroup
@ -12,7 +12,7 @@ from common.api_helpers.paginators import FiftyPageSizePaginator
class UserGroupView(RateLimitHeadersMixin, mixins.ListModelMixin, GenericViewSet):
authentication_classes = (ApiTokenAuthentication,)
authentication_classes = (GrafanaServiceAccountAuthentication, ApiTokenAuthentication)
permission_classes = (IsAuthenticated, RBACPermission)
rbac_permissions = {

View file

@ -6,7 +6,11 @@ from rest_framework.views import Response
from rest_framework.viewsets import ReadOnlyModelViewSet
from apps.api.permissions import LegacyAccessControlRole, RBACPermission
from apps.auth_token.auth import ApiTokenAuthentication, UserScheduleExportAuthentication
from apps.auth_token.auth import (
ApiTokenAuthentication,
GrafanaServiceAccountAuthentication,
UserScheduleExportAuthentication,
)
from apps.public_api.custom_renderers import CalendarRenderer
from apps.public_api.serializers import FastUserSerializer, UserSerializer
from apps.public_api.tf_sync import is_request_from_terraform, sync_users_on_tf_request
@ -35,7 +39,7 @@ class UserFilter(filters.FilterSet):
class UserView(RateLimitHeadersMixin, ShortSerializerMixin, ReadOnlyModelViewSet):
authentication_classes = (ApiTokenAuthentication,)
authentication_classes = (GrafanaServiceAccountAuthentication, ApiTokenAuthentication)
permission_classes = (IsAuthenticated, RBACPermission)
rbac_permissions = {

View file

@ -6,7 +6,7 @@ from rest_framework.response import Response
from rest_framework.viewsets import ModelViewSet
from apps.api.permissions import RBACPermission
from apps.auth_token.auth import ApiTokenAuthentication
from apps.auth_token.auth import ApiTokenAuthentication, GrafanaServiceAccountAuthentication
from apps.public_api.serializers.webhooks import (
WebhookCreateSerializer,
WebhookResponseSerializer,
@ -21,7 +21,7 @@ from common.insight_log import EntityEvent, write_resource_insight_log
class WebhooksView(RateLimitHeadersMixin, UpdateSerializerMixin, ModelViewSet):
authentication_classes = (ApiTokenAuthentication,)
authentication_classes = (GrafanaServiceAccountAuthentication, ApiTokenAuthentication)
permission_classes = (IsAuthenticated, RBACPermission)
rbac_permissions = {

View file

@ -0,0 +1,26 @@
# Generated by Django 4.2.15 on 2024-11-12 13:13
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('user_management', '0026_auto_20241017_1919'),
]
operations = [
migrations.CreateModel(
name='ServiceAccount',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('grafana_id', models.PositiveIntegerField()),
('login', models.CharField(max_length=300)),
('organization', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='service_accounts', to='user_management.organization')),
],
options={
'unique_together': {('grafana_id', 'organization')},
},
),
]

View file

@ -1,4 +1,5 @@
from .user import User # noqa: F401, isort: skip
from .organization import Organization # noqa: F401
from .region import Region # noqa: F401
from .service_account import ServiceAccount, ServiceAccountUser # noqa: F401
from .team import Team # noqa: F401

View file

@ -0,0 +1,55 @@
from dataclasses import dataclass
from typing import List
from django.db import models
from apps.user_management.models import Organization
@dataclass
class ServiceAccountUser:
"""Authenticated service account in public API requests."""
service_account: "ServiceAccount"
organization: "Organization" # required for insight logs interface
username: str # required for insight logs interface
public_primary_key: str # required for insight logs interface
role: str # required for permissions check
permissions: List[str] # required for permissions check
@property
def id(self):
return self.service_account.id
@property
def pk(self):
return self.service_account.id
@property
def organization_id(self):
return self.organization.id
@property
def is_authenticated(self):
return True
class ServiceAccount(models.Model):
organization: "Organization"
grafana_id = models.PositiveIntegerField()
organization = models.ForeignKey(Organization, on_delete=models.CASCADE, related_name="service_accounts")
login = models.CharField(max_length=300)
class Meta:
unique_together = ("grafana_id", "organization")
@property
def username(self):
# required for insight logs interface
return self.login
@property
def public_primary_key(self):
# required for insight logs interface
return f"service-account:{self.grafana_id}"

View file

@ -1,6 +1,6 @@
import factory
from apps.user_management.models import Organization, Region, Team, User
from apps.user_management.models import Organization, Region, ServiceAccount, Team, User
from common.utils import UniqueFaker
@ -41,3 +41,11 @@ class RegionFactory(factory.DjangoModelFactory):
class Meta:
model = Region
class ServiceAccountFactory(factory.DjangoModelFactory):
grafana_id = UniqueFaker("pyint")
login = UniqueFaker("user_name")
class Meta:
model = ServiceAccount

View file

@ -1,3 +1,4 @@
import binascii
import datetime
import json
import os
@ -46,11 +47,14 @@ from apps.api.permissions import (
LegacyAccessControlRole,
RBACPermission,
)
from apps.auth_token import constants as auth_token_constants
from apps.auth_token.crypto import hash_token_string
from apps.auth_token.models import (
ApiAuthToken,
GoogleOAuth2Token,
IntegrationBacksyncAuthToken,
PluginAuthToken,
ServiceAccountToken,
SlackAuthToken,
)
from apps.base.models.user_notification_policy_log_record import (
@ -102,7 +106,13 @@ from apps.telegram.tests.factories import (
TelegramVerificationCodeFactory,
)
from apps.user_management.models.user import User, listen_for_user_model_save
from apps.user_management.tests.factories import OrganizationFactory, RegionFactory, TeamFactory, UserFactory
from apps.user_management.tests.factories import (
OrganizationFactory,
RegionFactory,
ServiceAccountFactory,
TeamFactory,
UserFactory,
)
from apps.webhooks.presets.preset_options import WebhookPresetOptions
from apps.webhooks.tests.factories import CustomWebhookFactory, WebhookResponseFactory
from apps.webhooks.tests.test_webhook_presets import (
@ -252,6 +262,30 @@ def make_user_for_organization(make_user):
return _make_user_for_organization
@pytest.fixture
def make_service_account_for_organization(make_user):
def _make_service_account_for_organization(organization, **kwargs):
return ServiceAccountFactory(organization=organization, **kwargs)
return _make_service_account_for_organization
@pytest.fixture
def make_token_for_service_account():
def _make_token_for_service_account(service_account, token_string):
prefix_length = len(ServiceAccountToken.GRAFANA_SA_PREFIX)
token_key = token_string[prefix_length : prefix_length + auth_token_constants.TOKEN_KEY_LENGTH]
hashable_token = binascii.hexlify(token_string.encode()).decode()
digest = hash_token_string(hashable_token)
return ServiceAccountToken.objects.create(
service_account=service_account,
token_key=token_key,
digest=digest,
)
return _make_token_for_service_account
@pytest.fixture
def make_token_for_organization():
def _make_token_for_organization(organization):

View file

@ -28,9 +28,13 @@ class RequestTimeLoggingMiddleware(MiddlewareMixin):
)
if hasattr(request, "user") and request.user and request.user.id and hasattr(request.user, "organization"):
user_id = request.user.id
if hasattr(request.user, "service_account"):
message += f"service_account_id={user_id} "
else:
message += f"user_id={user_id} "
org_id = request.user.organization.id
org_slug = request.user.organization.org_slug
message += f"user_id={user_id} org_id={org_id} org_slug={org_slug} "
message += f"org_id={org_id} org_slug={org_slug} "
if request.path.startswith("/integrations/v1"):
split_path = request.path.split("/")
integration_type = split_path[3]