fix: update internal labels endpoints to work with RBAC (#5099)

# What this PR does

Related to:
- https://github.com/grafana/oncall-private/issues/2943

## Checklist

- [x] Unit, integration, and e2e (if applicable) tests updated
- [x] Documentation added (or `pr:no public docs` PR label added if not
required)
- [x] Added the relevant release notes label (see labels prefixed w/
`release:`). These labels dictate how your PR will
    show up in the autogenerated release notes.

---------

Co-authored-by: Vadim Stepanov <vadimkerr@gmail.com>
This commit is contained in:
Joey Orlando 2024-10-02 13:39:49 -04:00 committed by GitHub
parent 784b7e5344
commit f39a755942
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 95 additions and 240 deletions

View file

@ -30,6 +30,7 @@ from apps.alerts.tasks import (
from apps.metrics_exporter.tasks import update_metrics_for_alert_group
from apps.slack.slack_formatter import SlackFormatter
from apps.user_management.models import User
from common.constants.plugin_ids import PluginID
from common.public_primary_keys import generate_public_primary_key, increase_public_primary_key_length
from common.utils import clean_markup, str_or_backup
@ -556,7 +557,7 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
@property
def declare_incident_link(self) -> str:
"""Generate a link for AlertGroup to declare Grafana Incident by click"""
incident_link = urljoin(self.channel.organization.grafana_url, "a/grafana-incident-app/incidents/declare/")
incident_link = urljoin(self.channel.organization.grafana_url, f"a/{PluginID.INCIDENT}/incidents/declare/")
caption = urllib.parse.quote_plus("OnCall Alert Group")
title = urllib.parse.quote_plus(self.web_title_cache) if self.web_title_cache else DEFAULT_BACKUP_TITLE
title = title[:2000] # set max title length to avoid exceptions with too long declare incident link

View file

@ -9,15 +9,14 @@ from rest_framework.request import Request
from rest_framework.views import APIView
from rest_framework.viewsets import ViewSet, ViewSetMixin
from common.constants.plugin_ids import PluginID
from common.utils import getattrd
if typing.TYPE_CHECKING:
from apps.user_management.models import User
ACTION_PREFIX = "grafana-oncall-app"
RBAC_PERMISSIONS_ATTR = "rbac_permissions"
RBAC_OBJECT_PERMISSIONS_ATTR = "rbac_object_permissions"
BASIC_ROLE_PERMISSIONS_ATTR = "basic_role_permissions"
ViewSetOrAPIView = typing.Union[ViewSet, APIView]
@ -67,6 +66,7 @@ class Resources(enum.Enum):
OTHER_SETTINGS = "other-settings"
ADMIN = "admin"
LABEL = "label"
class Actions(enum.Enum):
@ -78,6 +78,8 @@ class Actions(enum.Enum):
UPDATE_SETTINGS = "update-settings"
DIRECT_PAGING = "direct-paging"
CREATE = "create"
class LegacyAccessControlRole(enum.IntEnum):
ADMIN = 0
@ -91,15 +93,20 @@ class LegacyAccessControlRole(enum.IntEnum):
class LegacyAccessControlCompatiblePermission:
def __init__(self, resource: Resources, action: Actions, fallback_role: LegacyAccessControlRole) -> None:
self.value = f"{ACTION_PREFIX}.{resource.value}:{action.value}"
def __init__(
self,
resource: Resources,
action: Actions,
fallback_role: LegacyAccessControlRole,
prefix: str = PluginID.ONCALL,
) -> None:
self.value = f"{prefix}.{resource.value}:{action.value}"
self.fallback_role = fallback_role
LegacyAccessControlCompatiblePermissions = typing.List[LegacyAccessControlCompatiblePermission]
RBACPermissionsAttribute = typing.Dict[str, LegacyAccessControlCompatiblePermissions]
RBACObjectPermissionsAttribute = typing.Dict[permissions.BasePermission, typing.List[str]]
BasicRolePermissionsAttribute = typing.Dict[str, LegacyAccessControlRole]
def get_view_action(request: AuthenticatedRequest, view: ViewSetOrAPIView) -> str:
@ -119,24 +126,14 @@ def get_most_authorized_role(permissions: LegacyAccessControlCompatiblePermissio
return min({p.fallback_role for p in permissions}, key=lambda r: r.value)
def user_is_authorized(
user: "User",
required_permissions: LegacyAccessControlCompatiblePermissions,
required_basic_role_permission: LegacyAccessControlRole = None,
) -> bool:
def user_is_authorized(user: "User", required_permissions: LegacyAccessControlCompatiblePermissions) -> bool:
"""
This function checks whether `user` has all necessary permissions. If `required_basic_role_permission` is set,
it only checks the basic user role, otherwise it checks whether `user` has all permissions in
`required_permissions`.
This function checks whether `user` has all necessary permissions specified in `required_permissions`.
RBAC permissions are used if RBAC is enabled for the organization, otherwise the fallback basic role is checked.
user - The user to check permissions for
required_permissions - A list of permissions that a user must have to be considered authorized
required_basic_role_permission - Min basic role user must have to be considered authorized (used in cases when
it's needed to check ONLY the basic user role, otherwise `required_permissions` should be used)
`user` - The user to check permissions for
`required_permissions` - A list of permissions that a user must have to be considered authorized
"""
if required_basic_role_permission is not None:
return user.role <= required_basic_role_permission.value
if user.organization.is_rbac_permissions_enabled:
user_permissions = [u["action"] for u in user.permissions]
required_permission_values = [p.value for p in required_permissions]
@ -250,6 +247,17 @@ class RBACPermission(permissions.BasePermission):
Resources.OTHER_SETTINGS, Actions.WRITE, LegacyAccessControlRole.ADMIN
)
# NOTE: we don't currently add the label delete permission here because we don't currently use this in OnCall
LABEL_CREATE = LegacyAccessControlCompatiblePermission(
Resources.LABEL, Actions.CREATE, LegacyAccessControlRole.EDITOR, prefix=PluginID.LABELS
)
LABEL_READ = LegacyAccessControlCompatiblePermission(
Resources.LABEL, Actions.READ, LegacyAccessControlRole.VIEWER, prefix=PluginID.LABELS
)
LABEL_WRITE = LegacyAccessControlCompatiblePermission(
Resources.LABEL, Actions.WRITE, LegacyAccessControlRole.EDITOR, prefix=PluginID.LABELS
)
# mypy complains about "Liskov substitution principle" here because request is `AuthenticatedRequest` object
# and not rest_framework.request.Request
# https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides
@ -301,40 +309,6 @@ class RBACPermission(permissions.BasePermission):
return True
class BasicRolePermission(permissions.BasePermission):
"""Checks only basic user role permissions, regardless of whether RBAC is enabled for the organization"""
# mypy complains about "Liskov substitution principle" here because request is `AuthenticatedRequest` object
# and not rest_framework.request.Request
# https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides
def has_permission(self, request: AuthenticatedRequest, view: ViewSetOrAPIView) -> bool: # type: ignore[override]
# the django-debug-toolbar UI makes OPTIONS calls. Without this statement the debug UI can't gather the
# necessary info it needs to work properly
if settings.DEBUG and request.method == "OPTIONS":
return True
action = get_view_action(request, view)
basic_role_permissions: typing.Optional[BasicRolePermissionsAttribute] = getattr(
view, BASIC_ROLE_PERMISSIONS_ATTR, None
)
# first check that the basic_role_permissions dict attribute is defined
assert (
basic_role_permissions is not None
), f"Must define a {BASIC_ROLE_PERMISSIONS_ATTR} dict on the ViewSet that is consuming the role class"
action_required_permissions: LegacyAccessControlRole = basic_role_permissions.get(action, None)
# next check that the action in question is defined within the basic_role_permissions dict attribute
assert (
action_required_permissions is not None
), f"""Each action must be defined within the {BASIC_ROLE_PERMISSIONS_ATTR} dict on the ViewSet"""
return user_is_authorized(
request.user, required_permissions=[], required_basic_role_permission=action_required_permissions
)
ALL_PERMISSION_NAMES = [perm for perm in dir(RBACPermission.Permissions) if not perm.startswith("_")]
ALL_PERMISSION_CLASSES = [
getattr(RBACPermission.Permissions, permission_name) for permission_name in ALL_PERMISSION_NAMES

View file

@ -23,9 +23,8 @@ def test_labels_get_keys(
mocked_get_labels_keys,
make_organization_and_user_with_plugin_token,
make_user_auth_headers,
make_alert_receive_channel,
):
organization, user, token = make_organization_and_user_with_plugin_token()
_, user, token = make_organization_and_user_with_plugin_token()
client = APIClient()
url = reverse("api-internal:get_keys")
response = client.get(url, format="json", **make_user_auth_headers(user, token))
@ -49,7 +48,7 @@ def test_get_update_key_get(
make_organization_and_user_with_plugin_token,
make_user_auth_headers,
):
organization, user, token = make_organization_and_user_with_plugin_token()
_, user, token = make_organization_and_user_with_plugin_token()
client = APIClient()
url = reverse("api-internal:get_update_key", kwargs={"key_id": "keyid123"})
response = client.get(url, format="json", **make_user_auth_headers(user, token))
@ -73,7 +72,7 @@ def test_get_update_key_put(
make_organization_and_user_with_plugin_token,
make_user_auth_headers,
):
organization, user, token = make_organization_and_user_with_plugin_token()
_, user, token = make_organization_and_user_with_plugin_token()
client = APIClient()
url = reverse("api-internal:get_update_key", kwargs={"key_id": "keyid123"})
data = {"name": "team"}
@ -98,7 +97,7 @@ def test_add_value(
make_organization_and_user_with_plugin_token,
make_user_auth_headers,
):
organization, user, token = make_organization_and_user_with_plugin_token()
_, user, token = make_organization_and_user_with_plugin_token()
client = APIClient()
url = reverse("api-internal:add_value", kwargs={"key_id": "keyid123"})
data = {"name": "yolo"}
@ -123,7 +122,7 @@ def test_rename_value(
make_organization_and_user_with_plugin_token,
make_user_auth_headers,
):
organization, user, token = make_organization_and_user_with_plugin_token()
_, user, token = make_organization_and_user_with_plugin_token()
client = APIClient()
url = reverse("api-internal:get_update_value", kwargs={"key_id": "keyid123", "value_id": "valueid123"})
data = {"name": "yolo"}
@ -148,7 +147,7 @@ def test_get_value(
make_organization_and_user_with_plugin_token,
make_user_auth_headers,
):
organization, user, token = make_organization_and_user_with_plugin_token()
_, user, token = make_organization_and_user_with_plugin_token()
client = APIClient()
url = reverse("api-internal:get_update_value", kwargs={"key_id": "keyid123", "value_id": "valueid123"})
response = client.get(url, format="json", **make_user_auth_headers(user, token))
@ -172,7 +171,7 @@ def test_labels_create_label(
make_organization_and_user_with_plugin_token,
make_user_auth_headers,
):
organization, user, token = make_organization_and_user_with_plugin_token()
_, user, token = make_organization_and_user_with_plugin_token()
client = APIClient()
url = reverse("api-internal:create_label")
data = {"key": {"name": "team"}, "values": [{"name": "yolo"}]}
@ -192,7 +191,7 @@ def test_labels_feature_false(
):
settings.FEATURE_LABELS_ENABLED_FOR_ALL = False
organization, user, token = make_organization_and_user_with_plugin_token()
_, user, token = make_organization_and_user_with_plugin_token()
client = APIClient()
url = reverse("api-internal:get_keys")
@ -240,7 +239,7 @@ def test_labels_permissions_get_actions(
role,
expected_status,
):
organization, user, token = make_organization_and_user_with_plugin_token(role)
_, user, token = make_organization_and_user_with_plugin_token(role)
client = APIClient()
with patch("apps.api.views.labels.LabelsViewSet.get_keys", return_value=Response(status=status.HTTP_200_OK)):
url = reverse("api-internal:get_keys")
@ -274,7 +273,7 @@ def test_labels_permissions_create_update_actions(
role,
expected_status,
):
organization, user, token = make_organization_and_user_with_plugin_token(role)
_, user, token = make_organization_and_user_with_plugin_token(role)
client = APIClient()
with patch("apps.api.views.labels.LabelsViewSet.rename_key", return_value=Response(status=status.HTTP_200_OK)):
url = reverse("api-internal:get_update_key", kwargs={"key_id": "keyid123"})

View file

@ -5,10 +5,7 @@ from rest_framework.views import APIView
from rest_framework.viewsets import ViewSetMixin
from apps.api.permissions import (
BASIC_ROLE_PERMISSIONS_ATTR,
RBAC_PERMISSIONS_ATTR,
BasicRolePermission,
BasicRolePermissionsAttribute,
GrafanaAPIPermission,
HasRBACPermissions,
IsOwner,
@ -60,7 +57,6 @@ class MockedViewSet(ViewSetMixin):
action: str,
rbac_permissions: typing.Optional[RBACPermissionsAttribute] = None,
rbac_object_permissions: typing.Optional[RBACObjectPermissionsAttribute] = None,
basic_role_permissions: typing.Optional[BasicRolePermissionsAttribute] = None,
) -> None:
super().__init__()
self.action = action
@ -69,8 +65,6 @@ class MockedViewSet(ViewSetMixin):
self.rbac_permissions = rbac_permissions
if rbac_object_permissions:
self.rbac_object_permissions = rbac_object_permissions
if basic_role_permissions:
self.basic_role_permissions = basic_role_permissions
class MockedAPIView(APIView):
@ -78,7 +72,6 @@ class MockedAPIView(APIView):
self,
rbac_permissions: typing.Optional[RBACPermissionsAttribute] = None,
rbac_object_permissions: typing.Optional[RBACObjectPermissionsAttribute] = None,
basic_role_permissions: typing.Optional[BasicRolePermissionsAttribute] = None,
) -> None:
super().__init__()
@ -86,8 +79,6 @@ class MockedAPIView(APIView):
self.rbac_permissions = rbac_permissions
if rbac_object_permissions:
self.rbac_object_permissions = rbac_object_permissions
if basic_role_permissions:
self.basic_role_permissions = basic_role_permissions
@pytest.mark.parametrize(
@ -460,142 +451,3 @@ class TestIsOwnerOrHasRBACPermissions:
assert PermClass.has_object_permission(request, None, thingy) is True
assert PermClass.has_object_permission(MockedRequest(MockedUser([])), None, thingy) is False
@pytest.mark.parametrize(
"role,required_role,org_has_rbac_enabled,expected_result",
[
(
LegacyAccessControlRole.VIEWER,
LegacyAccessControlRole.VIEWER,
True,
True,
),
(
LegacyAccessControlRole.VIEWER,
LegacyAccessControlRole.VIEWER,
False,
True,
),
(
LegacyAccessControlRole.ADMIN,
LegacyAccessControlRole.VIEWER,
True,
True,
),
(
LegacyAccessControlRole.ADMIN,
LegacyAccessControlRole.VIEWER,
False,
True,
),
(
LegacyAccessControlRole.VIEWER,
LegacyAccessControlRole.ADMIN,
True,
False,
),
(
LegacyAccessControlRole.VIEWER,
LegacyAccessControlRole.ADMIN,
False,
False,
),
],
)
def test_user_is_authorized_basic_role(
role,
required_role,
org_has_rbac_enabled,
expected_result,
) -> None:
user = MockedUser([], org_has_rbac_enabled=org_has_rbac_enabled, basic_role=role)
assert user_is_authorized(user, [], required_role) == expected_result
class TestBasicRolePermission:
def test_has_permission_works_on_a_viewset_view(self) -> None:
required_role = LegacyAccessControlRole.VIEWER
action = "hello"
viewset = MockedViewSet(
action=action,
basic_role_permissions={
action: required_role,
},
)
user_with_permission = MockedUser([], basic_role=required_role)
user_without_permission = MockedUser([], basic_role=LegacyAccessControlRole.NONE)
assert (
BasicRolePermission().has_permission(MockedRequest(user_with_permission), viewset) is True
), "it works on a viewset when the user does have permission"
assert (
BasicRolePermission().has_permission(MockedRequest(user_without_permission), viewset) is False
), "it works on a viewset when the user does have permission"
def test_has_permission_works_on_an_apiview_view(self) -> None:
required_role = LegacyAccessControlRole.VIEWER
method = "hello"
apiview = MockedAPIView(
basic_role_permissions={
method: required_role,
},
)
user_with_permission = MockedUser([], basic_role=required_role)
user_without_permission = MockedUser([], basic_role=LegacyAccessControlRole.NONE)
class Request(MockedRequest):
def __init__(self, user: typing.Optional[MockedUser] = None) -> None:
super().__init__(user, method)
assert (
BasicRolePermission().has_permission(Request(user_with_permission), apiview) is True
), "it works on an APIView when the user has permission"
assert (
BasicRolePermission().has_permission(Request(user_without_permission), apiview) is False
), "it works on an APIView when the user does not have permission"
def test_has_permission_throws_assertion_error_if_developer_forgets_to_specify_basic_role_permissions(self) -> None:
action_slash_method = "hello"
error_msg = f"Must define a {BASIC_ROLE_PERMISSIONS_ATTR} dict on the ViewSet that is consuming the role class"
viewset = MockedViewSet(action_slash_method)
apiview = MockedAPIView()
with pytest.raises(AssertionError, match=error_msg):
BasicRolePermission().has_permission(MockedRequest(), viewset)
with pytest.raises(AssertionError, match=error_msg):
BasicRolePermission().has_permission(MockedRequest(method=action_slash_method), apiview)
def test_has_permission_throws_assertion_error_if_developer_forgets_to_specify_an_action_in_basic_role_permissions(
self,
) -> None:
action_slash_method = "hello"
other_action_role_permissions = {"bonjour": LegacyAccessControlRole.VIEWER}
error_msg = f"""Each action must be defined within the {BASIC_ROLE_PERMISSIONS_ATTR} dict on the ViewSet"""
viewset = MockedViewSet(action_slash_method, basic_role_permissions=other_action_role_permissions)
apiview = MockedAPIView(basic_role_permissions=other_action_role_permissions)
with pytest.raises(AssertionError, match=error_msg):
BasicRolePermission().has_permission(MockedRequest(), viewset)
with pytest.raises(AssertionError, match=error_msg):
BasicRolePermission().has_permission(MockedRequest(method=action_slash_method), apiview)
def test_has_object_permission_returns_true(self) -> None:
action = "hello"
request = MockedRequest(None, action)
apiview = MockedAPIView()
viewset = MockedViewSet(action)
assert BasicRolePermission().has_object_permission(request, apiview, None) is True
assert BasicRolePermission().has_object_permission(request, viewset, None) is True

View file

@ -7,7 +7,7 @@ from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.viewsets import ViewSet
from apps.api.permissions import BasicRolePermission, LegacyAccessControlRole
from apps.api.permissions import RBACPermission
from apps.api.serializers.labels import (
LabelKeySerializer,
LabelOptionSerializer,
@ -35,16 +35,16 @@ class LabelsViewSet(LabelsFeatureFlagViewSet):
Proxy requests to labels-app to create/update labels
"""
permission_classes = (IsAuthenticated, BasicRolePermission)
permission_classes = (IsAuthenticated, RBACPermission)
authentication_classes = (PluginAuthentication,)
basic_role_permissions = {
"get_keys": LegacyAccessControlRole.VIEWER,
"get_key": LegacyAccessControlRole.VIEWER,
"get_value": LegacyAccessControlRole.VIEWER,
"rename_key": LegacyAccessControlRole.EDITOR,
"create_label": LegacyAccessControlRole.EDITOR,
"add_value": LegacyAccessControlRole.EDITOR,
"rename_value": LegacyAccessControlRole.EDITOR,
rbac_permissions = {
"create_label": [RBACPermission.Permissions.LABEL_CREATE],
"rename_key": [RBACPermission.Permissions.LABEL_WRITE],
"add_value": [RBACPermission.Permissions.LABEL_WRITE],
"rename_value": [RBACPermission.Permissions.LABEL_WRITE],
"get_keys": [RBACPermission.Permissions.LABEL_READ],
"get_key": [RBACPermission.Permissions.LABEL_READ],
"get_value": [RBACPermission.Permissions.LABEL_READ],
}
@extend_schema(responses=LabelKeySerializer(many=True))
@ -160,11 +160,11 @@ class AlertGroupLabelsViewSet(LabelsFeatureFlagViewSet):
Alert group labels are stored in the database, not in the label repo.
"""
permission_classes = (IsAuthenticated, BasicRolePermission)
permission_classes = (IsAuthenticated, RBACPermission)
authentication_classes = (PluginAuthentication,)
basic_role_permissions = {
"get_keys": LegacyAccessControlRole.VIEWER,
"get_key": LegacyAccessControlRole.VIEWER,
rbac_permissions = {
"get_keys": [RBACPermission.Permissions.ALERT_GROUPS_READ],
"get_key": [RBACPermission.Permissions.ALERT_GROUPS_READ],
}
@extend_schema(responses=LabelKeySerializer(many=True))

View file

@ -8,7 +8,8 @@ import requests
from django.conf import settings
from rest_framework import status
from apps.api.permissions import ACTION_PREFIX, GrafanaAPIPermission
from apps.api.permissions import GrafanaAPIPermission
from common.constants.plugin_ids import PluginID
logger = logging.getLogger(__name__)
@ -160,11 +161,9 @@ class APIClient:
class GrafanaAPIClient(APIClient):
GRAFANA_INCIDENT_PLUGIN = "grafana-incident-app"
GRAFANA_INCIDENT_PLUGIN_BACKEND_URL_KEY = "backendUrl"
GRAFANA_LABELS_PLUGIN = "grafana-labels-app"
USER_PERMISSION_ENDPOINT = f"api/access-control/users/permissions/search?actionPrefix={ACTION_PREFIX}"
USER_PERMISSION_ENDPOINT = f"api/access-control/users/permissions/search?actionPrefix={PluginID.ONCALL}"
MIN_GRAFANA_TOKEN_LENGTH = 16
@ -305,10 +304,10 @@ class GrafanaAPIClient(APIClient):
return self.api_get(f"api/plugins/{recipient}/settings")
def get_grafana_incident_plugin_settings(self) -> APIClientResponse["GrafanaAPIClient.Types.PluginSettings"]:
return self.get_grafana_plugin_settings(self.GRAFANA_INCIDENT_PLUGIN)
return self.get_grafana_plugin_settings(PluginID.INCIDENT)
def get_grafana_labels_plugin_settings(self) -> APIClientResponse["GrafanaAPIClient.Types.PluginSettings"]:
return self.get_grafana_plugin_settings(self.GRAFANA_LABELS_PLUGIN)
return self.get_grafana_plugin_settings(PluginID.LABELS)
def get_service_account(self, login: str) -> APIClientResponse["GrafanaAPIClient.Types.ServiceAccountResponse"]:
return self.api_get(f"api/serviceaccounts/search?query={login}")

View file

@ -5,6 +5,8 @@ from urllib.parse import urljoin
import requests
from django.conf import settings
from common.constants.plugin_ids import PluginID
if typing.TYPE_CHECKING:
from apps.labels.types import LabelKey, LabelOption, LabelValue
@ -33,7 +35,7 @@ TIMEOUT = 5
class LabelsAPIClient:
LABELS_API_URL = "/api/plugins/grafana-labels-app/resources/v1/labels/"
LABELS_API_URL = f"/api/plugins/{PluginID.LABELS}/resources/v1/labels/"
def __init__(self, api_url: str, api_token: str) -> None:
self.api_token = api_token

View file

@ -0,0 +1,7 @@
class PluginID:
ONCALL = "grafana-oncall-app"
IRM = "grafana-irm-app"
INCIDENT = "grafana-incident-app"
LABELS = "grafana-labels-app"
ML = "grafana-ml-app"

View file

@ -5,6 +5,8 @@ from urllib.parse import urljoin
import requests
from django.conf import settings
from common.constants.plugin_ids import PluginID
class IncidentDetails(typing.TypedDict):
# https://grafana.com/docs/grafana-cloud/alerting-and-irm/irm/incident/api/reference/#getincidentresponse
@ -79,7 +81,7 @@ DEFAULT_ACTIVITY_KIND = "userNote"
class IncidentAPIClient:
INCIDENT_BASE_PATH = "/api/plugins/grafana-incident-app/resources/"
INCIDENT_BASE_PATH = f"/api/plugins/{PluginID.INCIDENT}/resources/"
def __init__(self, api_url: str, api_token: str) -> None:
self.api_token = api_token

View file

@ -41,7 +41,6 @@ from apps.alerts.tests.factories import (
UserNotificationBundleFactory,
)
from apps.api.permissions import (
ACTION_PREFIX,
GrafanaAPIPermission,
LegacyAccessControlCompatiblePermission,
LegacyAccessControlRole,
@ -112,6 +111,7 @@ from apps.webhooks.tests.test_webhook_presets import (
TestAdvancedWebhookPreset,
TestWebhookPreset,
)
from common.constants.plugin_ids import PluginID
register(OrganizationFactory)
register(UserFactory)
@ -356,11 +356,30 @@ def get_user_permission_role_mapping_from_frontend_plugin_json() -> RoleMapping:
with open("../grafana-plugin/src/plugin.json") as fp:
plugin_json: PluginJSON = json.load(fp)
# NOTE: we need to manually add grafana-labels-app permissions here since these
# are granted to basic roles via the grafana-labels-app itself, and not
# ../grafana-plugin/src/plugin.json
#
# However, we do sync these permissions into our backend. See
# https://github.com/grafana/irm/pull/200 for more details
#
# We don't currently add the label delete permission here because we don't currently
# use this in OnCall
role_mapping: RoleMapping = {
LegacyAccessControlRole.NONE: [],
LegacyAccessControlRole.VIEWER: [],
LegacyAccessControlRole.EDITOR: [],
LegacyAccessControlRole.ADMIN: [],
LegacyAccessControlRole.VIEWER: [
RBACPermission.Permissions.LABEL_READ,
],
LegacyAccessControlRole.EDITOR: [
RBACPermission.Permissions.LABEL_READ,
RBACPermission.Permissions.LABEL_WRITE,
RBACPermission.Permissions.LABEL_CREATE,
],
LegacyAccessControlRole.ADMIN: [
RBACPermission.Permissions.LABEL_READ,
RBACPermission.Permissions.LABEL_WRITE,
RBACPermission.Permissions.LABEL_CREATE,
],
}
all_permission_classes: typing.Dict[str, LegacyAccessControlCompatiblePermission] = {
@ -378,7 +397,7 @@ def get_user_permission_role_mapping_from_frontend_plugin_json() -> RoleMapping:
action = permission["action"]
permission_class = None
if action.startswith(ACTION_PREFIX):
if action.startswith(PluginID.ONCALL):
permission_class = all_permission_classes[action]
if permission_class: