fix: address RBAC Admin issue (#5087)
# What this PR does **NOTE**: should be merged/released after https://github.com/grafana/irm/pull/183 has been rolled out to most stacks (as that frontend update is what will grant that new RBAC "action" to users whom already have the "OnCall Admin" RBAC role assigned) tldr; from the comment in the `RBACPermission.Permission.ADMIN` comment in `engine/apps/api/permissions.py`: > NOTE: this is a bit of a hack for now. See https://github.com/grafana/support-escalations/issues/12625 > Basically when it comes to filtering teams that are configured to share their resources with > "Team members and admins", we have no way of knowing, when a user is ACTUALLY an Admin when RBAC is involed. > > Example: Take a user with the basic role of None/Editor/Viewer but with the "OnCall Admin" role assigned. > Without this RBAC permission, we have no way of knowing that the user is ACTUALLY an "Admin". ## Which issue(s) this PR closes Closes https://github.com/grafana/support-escalations/issues/12625 ## Checklist - [x] Unit, integration, and e2e (if applicable) tests updated - [x] Documentation added (or `pr:no public docs` PR label added if not required) - [x] Added the relevant release notes label (see labels prefixed w/ `release:`). These labels dictate how your PR will show up in the autogenerated release notes.
This commit is contained in:
parent
0ef6bb7eda
commit
b260a8e82b
6 changed files with 113 additions and 33 deletions
|
|
@ -66,6 +66,8 @@ class Resources(enum.Enum):
|
||||||
USER_SETTINGS = "user-settings"
|
USER_SETTINGS = "user-settings"
|
||||||
OTHER_SETTINGS = "other-settings"
|
OTHER_SETTINGS = "other-settings"
|
||||||
|
|
||||||
|
ADMIN = "admin"
|
||||||
|
|
||||||
|
|
||||||
class Actions(enum.Enum):
|
class Actions(enum.Enum):
|
||||||
READ = "read"
|
READ = "read"
|
||||||
|
|
@ -144,6 +146,14 @@ def user_is_authorized(
|
||||||
|
|
||||||
class RBACPermission(permissions.BasePermission):
|
class RBACPermission(permissions.BasePermission):
|
||||||
class Permissions:
|
class Permissions:
|
||||||
|
# NOTE: this is a bit of a hack for now. See https://github.com/grafana/support-escalations/issues/12625
|
||||||
|
# Basically when it comes to filtering teams that are configured to share their resources with
|
||||||
|
# "Team members and admins", we have no way of knowing, when a user is ACTUALLY an Admin when RBAC is involed.
|
||||||
|
#
|
||||||
|
# Example: Take a user with the basic role of None/Editor/Viewer but with the "OnCall Admin" role assigned.
|
||||||
|
# Without this RBAC permission, we have no way of knowing that the user is ACTUALLY an "Admin".
|
||||||
|
ADMIN = LegacyAccessControlCompatiblePermission(Resources.ADMIN, Actions.ADMIN, LegacyAccessControlRole.ADMIN)
|
||||||
|
|
||||||
ALERT_GROUPS_READ = LegacyAccessControlCompatiblePermission(
|
ALERT_GROUPS_READ = LegacyAccessControlCompatiblePermission(
|
||||||
Resources.ALERT_GROUPS, Actions.READ, LegacyAccessControlRole.VIEWER
|
Resources.ALERT_GROUPS, Actions.READ, LegacyAccessControlRole.VIEWER
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -83,14 +83,12 @@ class UserNotificationPolicySerializer(UserNotificationPolicyBaseSerializer):
|
||||||
|
|
||||||
def create(self, validated_data):
|
def create(self, validated_data):
|
||||||
user = validated_data.get("user") or self.context["request"].user
|
user = validated_data.get("user") or self.context["request"].user
|
||||||
organization = self.context["request"].auth.organization
|
|
||||||
|
|
||||||
self_or_admin = user.self_or_admin(user_to_check=self.context["request"].user, organization=organization)
|
if not user.self_or_has_user_settings_admin_permission(
|
||||||
if not self_or_admin:
|
user_to_check=self.context["request"].user, organization=self.context["request"].auth.organization
|
||||||
|
):
|
||||||
raise Forbidden()
|
raise Forbidden()
|
||||||
|
return UserNotificationPolicy.objects.create(**validated_data)
|
||||||
instance = UserNotificationPolicy.objects.create(**validated_data)
|
|
||||||
return instance
|
|
||||||
|
|
||||||
|
|
||||||
class UserNotificationPolicyUpdateSerializer(UserNotificationPolicyBaseSerializer):
|
class UserNotificationPolicyUpdateSerializer(UserNotificationPolicyBaseSerializer):
|
||||||
|
|
@ -104,10 +102,9 @@ class UserNotificationPolicyUpdateSerializer(UserNotificationPolicyBaseSerialize
|
||||||
read_only_fields = UserNotificationPolicyBaseSerializer.Meta.read_only_fields + ["user", "important"]
|
read_only_fields = UserNotificationPolicyBaseSerializer.Meta.read_only_fields + ["user", "important"]
|
||||||
|
|
||||||
def update(self, instance, validated_data):
|
def update(self, instance, validated_data):
|
||||||
self_or_admin = instance.user.self_or_admin(
|
if not instance.user.self_or_has_user_settings_admin_permission(
|
||||||
user_to_check=self.context["request"].user, organization=self.context["request"].user.organization
|
user_to_check=self.context["request"].user, organization=self.context["request"].user.organization
|
||||||
)
|
):
|
||||||
if not self_or_admin:
|
|
||||||
raise Forbidden()
|
raise Forbidden()
|
||||||
if validated_data.get("step") == UserNotificationPolicy.Step.WAIT and not validated_data.get("wait_delay"):
|
if validated_data.get("step") == UserNotificationPolicy.Step.WAIT and not validated_data.get("wait_delay"):
|
||||||
validated_data["wait_delay"] = UserNotificationPolicy.FIVE_MINUTES
|
validated_data["wait_delay"] = UserNotificationPolicy.FIVE_MINUTES
|
||||||
|
|
|
||||||
|
|
@ -172,11 +172,19 @@ class User(models.Model):
|
||||||
return f"{self.pk}: {self.username}"
|
return f"{self.pk}: {self.username}"
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def available_teams(self):
|
def is_admin(self) -> bool:
|
||||||
if self.role == LegacyAccessControlRole.ADMIN:
|
return user_is_authorized(self, [RBACPermission.Permissions.ADMIN])
|
||||||
|
|
||||||
|
@property
|
||||||
|
def available_teams(self) -> "RelatedManager['Team']":
|
||||||
|
if self.is_admin:
|
||||||
return self.organization.teams.all()
|
return self.organization.teams.all()
|
||||||
return self.organization.teams.filter(Q(is_sharing_resources_to_all=True) | Q(users=self)).distinct()
|
return self.organization.teams.filter(Q(is_sharing_resources_to_all=True) | Q(users=self)).distinct()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_notification_allowed(self) -> bool:
|
||||||
|
return user_is_authorized(self, [RBACPermission.Permissions.NOTIFICATIONS_READ])
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def is_authenticated(self):
|
def is_authenticated(self):
|
||||||
return True
|
return True
|
||||||
|
|
@ -223,15 +231,9 @@ class User(models.Model):
|
||||||
def is_telegram_connected(self):
|
def is_telegram_connected(self):
|
||||||
return hasattr(self, "telegram_connection")
|
return hasattr(self, "telegram_connection")
|
||||||
|
|
||||||
def self_or_admin(self, user_to_check, organization) -> bool:
|
def self_or_has_user_settings_admin_permission(self, user_to_check: "User", organization: "Organization") -> bool:
|
||||||
has_admin_permission = user_is_authorized(user_to_check, [RBACPermission.Permissions.USER_SETTINGS_ADMIN])
|
has_permission = user_is_authorized(user_to_check, [RBACPermission.Permissions.USER_SETTINGS_ADMIN])
|
||||||
return user_to_check.pk == self.pk or (
|
return user_to_check.pk == self.pk or (has_permission and organization.pk == user_to_check.organization_id)
|
||||||
has_admin_permission and organization.pk == user_to_check.organization_id
|
|
||||||
)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def is_notification_allowed(self):
|
|
||||||
return user_is_authorized(self, [RBACPermission.Permissions.NOTIFICATIONS_READ])
|
|
||||||
|
|
||||||
def get_username_with_slack_verbal(self, mention=False) -> str:
|
def get_username_with_slack_verbal(self, mention=False) -> str:
|
||||||
slack_verbal = None
|
slack_verbal = None
|
||||||
|
|
|
||||||
|
|
@ -3,26 +3,97 @@ import datetime
|
||||||
import pytest
|
import pytest
|
||||||
from django.utils import timezone
|
from django.utils import timezone
|
||||||
|
|
||||||
from apps.api.permissions import LegacyAccessControlRole
|
from apps.api.permissions import GrafanaAPIPermission, LegacyAccessControlRole, RBACPermission
|
||||||
from apps.google import constants as google_constants
|
from apps.google import constants as google_constants
|
||||||
from apps.google.models import GoogleOAuth2User
|
from apps.google.models import GoogleOAuth2User
|
||||||
from apps.user_management.models import User
|
from apps.user_management.models import User
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_self_or_admin(make_organization, make_user_for_organization):
|
def test_self_or_has_user_settings_admin_permission(make_organization, make_user_for_organization):
|
||||||
organization = make_organization()
|
# RBAC not enabled for org
|
||||||
|
organization = make_organization(is_rbac_permissions_enabled=False)
|
||||||
admin = make_user_for_organization(organization)
|
admin = make_user_for_organization(organization)
|
||||||
second_admin = make_user_for_organization(organization)
|
second_admin = make_user_for_organization(organization)
|
||||||
editor = make_user_for_organization(organization, role=LegacyAccessControlRole.EDITOR)
|
editor = make_user_for_organization(organization, role=LegacyAccessControlRole.EDITOR)
|
||||||
|
|
||||||
another_organization = make_organization()
|
another_organization = make_organization(is_rbac_permissions_enabled=False)
|
||||||
admin_from_another_organization = make_user_for_organization(another_organization)
|
admin_from_another_organization = make_user_for_organization(another_organization)
|
||||||
|
|
||||||
assert admin.self_or_admin(admin, organization) is True
|
assert organization.is_rbac_permissions_enabled is False
|
||||||
assert admin.self_or_admin(editor, organization) is False
|
assert another_organization.is_rbac_permissions_enabled is False
|
||||||
assert admin.self_or_admin(second_admin, organization) is True
|
|
||||||
assert admin.self_or_admin(admin_from_another_organization, organization) is False
|
assert admin.self_or_has_user_settings_admin_permission(admin, organization) is True
|
||||||
|
assert admin.self_or_has_user_settings_admin_permission(editor, organization) is False
|
||||||
|
assert admin.self_or_has_user_settings_admin_permission(second_admin, organization) is True
|
||||||
|
|
||||||
|
assert admin.self_or_has_user_settings_admin_permission(admin_from_another_organization, organization) is False
|
||||||
|
|
||||||
|
assert editor.self_or_has_user_settings_admin_permission(editor, organization) is True
|
||||||
|
assert editor.self_or_has_user_settings_admin_permission(admin, organization) is True
|
||||||
|
|
||||||
|
# RBAC enabled org
|
||||||
|
organization_with_rbac = make_organization(is_rbac_permissions_enabled=True)
|
||||||
|
user_with_perms = make_user_for_organization(
|
||||||
|
organization_with_rbac,
|
||||||
|
role=LegacyAccessControlRole.NONE,
|
||||||
|
permissions=[GrafanaAPIPermission(action=RBACPermission.Permissions.USER_SETTINGS_ADMIN.value)],
|
||||||
|
)
|
||||||
|
user_without_perms = make_user_for_organization(
|
||||||
|
organization_with_rbac,
|
||||||
|
role=LegacyAccessControlRole.NONE,
|
||||||
|
permissions=[],
|
||||||
|
)
|
||||||
|
|
||||||
|
assert organization_with_rbac.is_rbac_permissions_enabled is True
|
||||||
|
|
||||||
|
# true because self
|
||||||
|
assert user_with_perms.self_or_has_user_settings_admin_permission(user_with_perms, organization_with_rbac) is True
|
||||||
|
assert (
|
||||||
|
user_without_perms.self_or_has_user_settings_admin_permission(user_without_perms, organization_with_rbac)
|
||||||
|
is True
|
||||||
|
)
|
||||||
|
|
||||||
|
# true because user_with_perms has proper admin RBAC permission
|
||||||
|
assert (
|
||||||
|
user_without_perms.self_or_has_user_settings_admin_permission(user_with_perms, organization_with_rbac) is True
|
||||||
|
)
|
||||||
|
|
||||||
|
# false because user_without_perms does not have proper admin RBAC permission
|
||||||
|
assert (
|
||||||
|
user_with_perms.self_or_has_user_settings_admin_permission(user_without_perms, organization_with_rbac) is False
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_is_admin(make_organization, make_user_for_organization):
|
||||||
|
# RBAC not enabled for org
|
||||||
|
organization = make_organization(is_rbac_permissions_enabled=False)
|
||||||
|
admin = make_user_for_organization(organization, role=LegacyAccessControlRole.ADMIN)
|
||||||
|
editor = make_user_for_organization(organization, role=LegacyAccessControlRole.EDITOR)
|
||||||
|
|
||||||
|
assert organization.is_rbac_permissions_enabled is False
|
||||||
|
|
||||||
|
assert admin.is_admin is True
|
||||||
|
assert editor.is_admin is False
|
||||||
|
|
||||||
|
# RBAC enabled org
|
||||||
|
organization_with_rbac = make_organization(is_rbac_permissions_enabled=True)
|
||||||
|
user_with_perms = make_user_for_organization(
|
||||||
|
organization_with_rbac,
|
||||||
|
role=LegacyAccessControlRole.NONE,
|
||||||
|
permissions=[GrafanaAPIPermission(action=RBACPermission.Permissions.ADMIN.value)],
|
||||||
|
)
|
||||||
|
user_without_perms = make_user_for_organization(
|
||||||
|
organization_with_rbac,
|
||||||
|
role=LegacyAccessControlRole.NONE,
|
||||||
|
permissions=[],
|
||||||
|
)
|
||||||
|
|
||||||
|
assert organization_with_rbac.is_rbac_permissions_enabled is True
|
||||||
|
|
||||||
|
assert user_with_perms.is_admin is True
|
||||||
|
assert user_without_perms.is_admin is False
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,6 @@ from apps.alerts.incident_appearance.templaters import (
|
||||||
AlertWebTemplater,
|
AlertWebTemplater,
|
||||||
TemplateLoader,
|
TemplateLoader,
|
||||||
)
|
)
|
||||||
from apps.api.permissions import LegacyAccessControlRole
|
|
||||||
from apps.base.messaging import get_messaging_backends
|
from apps.base.messaging import get_messaging_backends
|
||||||
from common.api_helpers.exceptions import BadRequest
|
from common.api_helpers.exceptions import BadRequest
|
||||||
from common.jinja_templater import apply_jinja_template
|
from common.jinja_templater import apply_jinja_template
|
||||||
|
|
@ -183,7 +182,7 @@ class TeamFilteringMixin:
|
||||||
NOTE: use .distinct() after filtering by available teams as it may return duplicate instances.
|
NOTE: use .distinct() after filtering by available teams as it may return duplicate instances.
|
||||||
"""
|
"""
|
||||||
available_teams_lookup_args = []
|
available_teams_lookup_args = []
|
||||||
if not self.request.user.role == LegacyAccessControlRole.ADMIN:
|
if not self.request.user.is_admin:
|
||||||
available_teams_lookup_args = [
|
available_teams_lookup_args = [
|
||||||
Q(**{f"{self.TEAM_LOOKUP}__users": self.request.user})
|
Q(**{f"{self.TEAM_LOOKUP}__users": self.request.user})
|
||||||
| Q(**{f"{self.TEAM_LOOKUP}__is_sharing_resources_to_all": True})
|
| Q(**{f"{self.TEAM_LOOKUP}__is_sharing_resources_to_all": True})
|
||||||
|
|
|
||||||
|
|
@ -174,9 +174,10 @@
|
||||||
{ "action": "grafana-oncall-app.user-settings:admin" },
|
{ "action": "grafana-oncall-app.user-settings:admin" },
|
||||||
|
|
||||||
{ "action": "grafana-oncall-app.other-settings:read" },
|
{ "action": "grafana-oncall-app.other-settings:read" },
|
||||||
{ "action": "grafana-oncall-app.other-settings:write" }
|
{ "action": "grafana-oncall-app.other-settings:write" },
|
||||||
],
|
|
||||||
"hidden": true
|
{ "action": "grafana-oncall-app.admin:admin" }
|
||||||
|
]
|
||||||
},
|
},
|
||||||
"grants": ["Grafana Admin", "Admin"]
|
"grants": ["Grafana Admin", "Admin"]
|
||||||
},
|
},
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue