2022-11-29 09:41:56 +01:00
|
|
|
import enum
|
|
|
|
|
import typing
|
|
|
|
|
|
2023-01-20 09:19:41 +01:00
|
|
|
from django.conf import settings
|
2023-06-27 12:23:08 +02:00
|
|
|
from django.contrib.auth.models import AbstractUser
|
2022-11-29 09:41:56 +01:00
|
|
|
from rest_framework import permissions
|
|
|
|
|
from rest_framework.authentication import BasicAuthentication, SessionAuthentication
|
|
|
|
|
from rest_framework.request import Request
|
|
|
|
|
from rest_framework.views import APIView
|
|
|
|
|
from rest_framework.viewsets import ViewSet, ViewSetMixin
|
|
|
|
|
|
2024-10-02 13:39:49 -04:00
|
|
|
from common.constants.plugin_ids import PluginID
|
2022-11-29 09:41:56 +01:00
|
|
|
from common.utils import getattrd
|
|
|
|
|
|
2023-06-27 12:23:08 +02:00
|
|
|
if typing.TYPE_CHECKING:
|
2024-10-10 15:02:21 -04:00
|
|
|
from apps.user_management.models import Organization, User
|
2023-06-27 12:23:08 +02:00
|
|
|
|
2022-11-29 09:41:56 +01:00
|
|
|
RBAC_PERMISSIONS_ATTR = "rbac_permissions"
|
|
|
|
|
RBAC_OBJECT_PERMISSIONS_ATTR = "rbac_object_permissions"
|
|
|
|
|
|
2024-11-19 09:52:23 -03:00
|
|
|
|
2022-11-29 09:41:56 +01:00
|
|
|
ViewSetOrAPIView = typing.Union[ViewSet, APIView]
|
|
|
|
|
|
|
|
|
|
|
2023-06-27 12:23:08 +02:00
|
|
|
class AuthenticatedRequest(Request):
|
|
|
|
|
"""
|
|
|
|
|
Use this for typing, instead of rest_framework.request.Request, when you KNOW that the user is authenticated.
|
|
|
|
|
ex. In the RBACPermission class below, we know that the user is authenticated because this is handled by the
|
|
|
|
|
`authentication_classes` attribute on views.
|
|
|
|
|
|
|
|
|
|
https://github.com/typeddjango/django-stubs#how-can-i-create-a-httprequest-thats-guaranteed-to-have-an-authenticated-user
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
# see comment above, this is safe. without the type-ignore comment, mypy complains
|
|
|
|
|
# expression has type "User", base class "Request" defined the type as "Union[AbstractBaseUser, AnonymousUser]"
|
|
|
|
|
user: "User" # type: ignore[assignment]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AuthenticatedDjangoAdminRequest(Request):
|
|
|
|
|
"""
|
|
|
|
|
Use this for typing, instead of rest_framework.request.Request, when you KNOW that the user is authenticated via
|
|
|
|
|
Django admin user authentication.
|
|
|
|
|
|
|
|
|
|
https://github.com/typeddjango/django-stubs#how-can-i-create-a-httprequest-thats-guaranteed-to-have-an-authenticated-user
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
user: AbstractUser
|
|
|
|
|
|
|
|
|
|
|
2022-11-29 09:41:56 +01:00
|
|
|
class GrafanaAPIPermission(typing.TypedDict):
|
|
|
|
|
action: str
|
|
|
|
|
|
|
|
|
|
|
2024-10-10 15:02:21 -04:00
|
|
|
class GrafanaAPIPermissions:
|
|
|
|
|
@classmethod
|
|
|
|
|
def construct_permissions(cls, actions: typing.List[str]) -> typing.List[GrafanaAPIPermission]:
|
|
|
|
|
return [GrafanaAPIPermission(action=action) for action in actions]
|
|
|
|
|
|
|
|
|
|
|
2022-11-29 09:41:56 +01:00
|
|
|
class Resources(enum.Enum):
|
|
|
|
|
ALERT_GROUPS = "alert-groups"
|
|
|
|
|
INTEGRATIONS = "integrations"
|
|
|
|
|
ESCALATION_CHAINS = "escalation-chains"
|
|
|
|
|
SCHEDULES = "schedules"
|
|
|
|
|
CHATOPS = "chatops"
|
|
|
|
|
OUTGOING_WEBHOOKS = "outgoing-webhooks"
|
|
|
|
|
MAINTENANCE = "maintenance"
|
|
|
|
|
API_KEYS = "api-keys"
|
|
|
|
|
NOTIFICATIONS = "notifications"
|
|
|
|
|
|
|
|
|
|
NOTIFICATION_SETTINGS = "notification-settings"
|
|
|
|
|
USER_SETTINGS = "user-settings"
|
|
|
|
|
OTHER_SETTINGS = "other-settings"
|
|
|
|
|
|
2024-09-26 12:40:07 -04:00
|
|
|
ADMIN = "admin"
|
2024-10-02 13:39:49 -04:00
|
|
|
LABEL = "label"
|
2024-09-26 12:40:07 -04:00
|
|
|
|
2022-11-29 09:41:56 +01:00
|
|
|
|
|
|
|
|
class Actions(enum.Enum):
|
|
|
|
|
READ = "read"
|
|
|
|
|
WRITE = "write"
|
|
|
|
|
ADMIN = "admin"
|
|
|
|
|
TEST = "test"
|
|
|
|
|
EXPORT = "export"
|
|
|
|
|
UPDATE_SETTINGS = "update-settings"
|
2023-10-03 19:28:26 -04:00
|
|
|
DIRECT_PAGING = "direct-paging"
|
2022-11-29 09:41:56 +01:00
|
|
|
|
2024-10-02 13:39:49 -04:00
|
|
|
CREATE = "create"
|
|
|
|
|
|
2022-11-29 09:41:56 +01:00
|
|
|
|
|
|
|
|
class LegacyAccessControlRole(enum.IntEnum):
|
|
|
|
|
ADMIN = 0
|
|
|
|
|
EDITOR = 1
|
|
|
|
|
VIEWER = 2
|
2023-10-19 14:39:08 -03:00
|
|
|
NONE = 3
|
2022-11-29 09:41:56 +01:00
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def choices(cls):
|
|
|
|
|
return tuple((option.value, option.name) for option in cls)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class LegacyAccessControlCompatiblePermission:
|
2024-10-02 13:39:49 -04:00
|
|
|
def __init__(
|
|
|
|
|
self,
|
|
|
|
|
resource: Resources,
|
|
|
|
|
action: Actions,
|
|
|
|
|
fallback_role: LegacyAccessControlRole,
|
|
|
|
|
prefix: str = PluginID.ONCALL,
|
|
|
|
|
) -> None:
|
|
|
|
|
self.value = f"{prefix}.{resource.value}:{action.value}"
|
2022-11-29 09:41:56 +01:00
|
|
|
self.fallback_role = fallback_role
|
|
|
|
|
|
2024-10-10 15:02:21 -04:00
|
|
|
def user_has_permission(self, user: "User") -> bool:
|
|
|
|
|
return user_is_authorized(user, [self])
|
|
|
|
|
|
2022-11-29 09:41:56 +01:00
|
|
|
|
2023-06-27 12:23:08 +02:00
|
|
|
LegacyAccessControlCompatiblePermissions = typing.List[LegacyAccessControlCompatiblePermission]
|
|
|
|
|
RBACPermissionsAttribute = typing.Dict[str, LegacyAccessControlCompatiblePermissions]
|
|
|
|
|
RBACObjectPermissionsAttribute = typing.Dict[permissions.BasePermission, typing.List[str]]
|
2023-10-23 13:03:51 +02:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_view_action(request: AuthenticatedRequest, view: ViewSetOrAPIView) -> str:
|
|
|
|
|
"""
|
|
|
|
|
For right now this needs to support being used in both a ViewSet as well as APIView, we use both interchangably
|
|
|
|
|
|
|
|
|
|
Note: `request.method` is returned uppercase
|
|
|
|
|
"""
|
|
|
|
|
return view.action if isinstance(view, ViewSetMixin) else (request.method or "").lower()
|
2023-06-27 12:23:08 +02:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_most_authorized_role(permissions: LegacyAccessControlCompatiblePermissions) -> LegacyAccessControlRole:
|
2022-11-29 09:41:56 +01:00
|
|
|
if not permissions:
|
2023-10-19 14:39:08 -03:00
|
|
|
return LegacyAccessControlRole.NONE
|
2022-11-29 09:41:56 +01:00
|
|
|
|
2023-10-19 14:39:08 -03:00
|
|
|
# ex. Admin is 0, None is 3, thereby min makes sense here
|
2022-11-29 09:41:56 +01:00
|
|
|
return min({p.fallback_role for p in permissions}, key=lambda r: r.value)
|
|
|
|
|
|
|
|
|
|
|
2024-10-10 15:02:21 -04:00
|
|
|
def convert_oncall_permission_to_irm(permission: LegacyAccessControlCompatiblePermission) -> str:
|
|
|
|
|
return permission.value.replace(PluginID.ONCALL, PluginID.IRM)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_required_permission_values(
|
|
|
|
|
organization: "Organization", required_permissions: LegacyAccessControlCompatiblePermissions
|
|
|
|
|
) -> typing.List[str]:
|
|
|
|
|
"""
|
|
|
|
|
This function returns a list of required permission values, taking into account whether or not the organization
|
|
|
|
|
is using the IRM plugin.
|
|
|
|
|
|
|
|
|
|
If the IRM plugin is being used, we substitue `grafana-oncall-app` with `grafana-irm-app`
|
|
|
|
|
as the RBAC permission prefix.
|
|
|
|
|
"""
|
|
|
|
|
permission_values = []
|
|
|
|
|
|
|
|
|
|
for permission in required_permissions:
|
|
|
|
|
permission_value = permission.value
|
|
|
|
|
if permission_value.startswith(PluginID.ONCALL) and organization.is_grafana_irm_enabled:
|
|
|
|
|
permission_values.append(convert_oncall_permission_to_irm(permission))
|
|
|
|
|
else:
|
|
|
|
|
permission_values.append(permission_value)
|
|
|
|
|
|
|
|
|
|
return permission_values
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def user_has_minimum_required_basic_role(user: "User", required_basic_role: LegacyAccessControlRole) -> bool:
|
|
|
|
|
return user.role <= required_basic_role.value
|
|
|
|
|
|
|
|
|
|
|
2024-10-02 13:39:49 -04:00
|
|
|
def user_is_authorized(user: "User", required_permissions: LegacyAccessControlCompatiblePermissions) -> bool:
|
2023-01-25 11:08:09 +01:00
|
|
|
"""
|
2024-10-02 13:39:49 -04:00
|
|
|
This function checks whether `user` has all necessary permissions specified in `required_permissions`.
|
2023-10-23 13:03:51 +02:00
|
|
|
RBAC permissions are used if RBAC is enabled for the organization, otherwise the fallback basic role is checked.
|
2023-01-25 11:08:09 +01:00
|
|
|
|
2024-10-02 13:39:49 -04:00
|
|
|
`user` - The user to check permissions for
|
|
|
|
|
`required_permissions` - A list of permissions that a user must have to be considered authorized
|
2023-01-25 11:08:09 +01:00
|
|
|
"""
|
2024-10-10 15:02:21 -04:00
|
|
|
organization = user.organization
|
2024-12-12 19:11:59 -03:00
|
|
|
if organization.is_rbac_permissions_enabled or user.is_service_account:
|
2022-11-29 09:41:56 +01:00
|
|
|
user_permissions = [u["action"] for u in user.permissions]
|
2024-10-10 15:02:21 -04:00
|
|
|
required_permission_values = get_required_permission_values(organization, required_permissions)
|
2023-06-27 12:23:08 +02:00
|
|
|
return all(permission in user_permissions for permission in required_permission_values)
|
2024-10-10 15:02:21 -04:00
|
|
|
return user_has_minimum_required_basic_role(user, get_most_authorized_role(required_permissions))
|
2022-11-29 09:41:56 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
class RBACPermission(permissions.BasePermission):
|
|
|
|
|
class Permissions:
|
2024-09-26 12:40:07 -04:00
|
|
|
# NOTE: this is a bit of a hack for now. See https://github.com/grafana/support-escalations/issues/12625
|
|
|
|
|
# Basically when it comes to filtering teams that are configured to share their resources with
|
|
|
|
|
# "Team members and admins", we have no way of knowing, when a user is ACTUALLY an Admin when RBAC is involed.
|
|
|
|
|
#
|
|
|
|
|
# Example: Take a user with the basic role of None/Editor/Viewer but with the "OnCall Admin" role assigned.
|
|
|
|
|
# Without this RBAC permission, we have no way of knowing that the user is ACTUALLY an "Admin".
|
|
|
|
|
ADMIN = LegacyAccessControlCompatiblePermission(Resources.ADMIN, Actions.ADMIN, LegacyAccessControlRole.ADMIN)
|
|
|
|
|
|
2022-11-29 09:41:56 +01:00
|
|
|
ALERT_GROUPS_READ = LegacyAccessControlCompatiblePermission(
|
|
|
|
|
Resources.ALERT_GROUPS, Actions.READ, LegacyAccessControlRole.VIEWER
|
|
|
|
|
)
|
|
|
|
|
ALERT_GROUPS_WRITE = LegacyAccessControlCompatiblePermission(
|
|
|
|
|
Resources.ALERT_GROUPS, Actions.WRITE, LegacyAccessControlRole.EDITOR
|
|
|
|
|
)
|
2023-10-03 19:28:26 -04:00
|
|
|
ALERT_GROUPS_DIRECT_PAGING = LegacyAccessControlCompatiblePermission(
|
|
|
|
|
Resources.ALERT_GROUPS, Actions.DIRECT_PAGING, LegacyAccessControlRole.EDITOR
|
|
|
|
|
)
|
2022-11-29 09:41:56 +01:00
|
|
|
|
|
|
|
|
INTEGRATIONS_READ = LegacyAccessControlCompatiblePermission(
|
|
|
|
|
Resources.INTEGRATIONS, Actions.READ, LegacyAccessControlRole.VIEWER
|
|
|
|
|
)
|
|
|
|
|
INTEGRATIONS_TEST = LegacyAccessControlCompatiblePermission(
|
|
|
|
|
Resources.INTEGRATIONS, Actions.TEST, LegacyAccessControlRole.EDITOR
|
|
|
|
|
)
|
|
|
|
|
INTEGRATIONS_WRITE = LegacyAccessControlCompatiblePermission(
|
|
|
|
|
Resources.INTEGRATIONS, Actions.WRITE, LegacyAccessControlRole.ADMIN
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
ESCALATION_CHAINS_READ = LegacyAccessControlCompatiblePermission(
|
|
|
|
|
Resources.ESCALATION_CHAINS, Actions.READ, LegacyAccessControlRole.VIEWER
|
|
|
|
|
)
|
|
|
|
|
ESCALATION_CHAINS_WRITE = LegacyAccessControlCompatiblePermission(
|
|
|
|
|
Resources.ESCALATION_CHAINS, Actions.WRITE, LegacyAccessControlRole.ADMIN
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
SCHEDULES_READ = LegacyAccessControlCompatiblePermission(
|
|
|
|
|
Resources.SCHEDULES, Actions.READ, LegacyAccessControlRole.VIEWER
|
|
|
|
|
)
|
|
|
|
|
SCHEDULES_WRITE = LegacyAccessControlCompatiblePermission(
|
2022-12-09 12:48:19 -03:00
|
|
|
Resources.SCHEDULES, Actions.WRITE, LegacyAccessControlRole.EDITOR
|
2022-11-29 09:41:56 +01:00
|
|
|
)
|
|
|
|
|
SCHEDULES_EXPORT = LegacyAccessControlCompatiblePermission(
|
|
|
|
|
Resources.SCHEDULES, Actions.EXPORT, LegacyAccessControlRole.EDITOR
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
CHATOPS_READ = LegacyAccessControlCompatiblePermission(
|
|
|
|
|
Resources.CHATOPS, Actions.READ, LegacyAccessControlRole.VIEWER
|
|
|
|
|
)
|
|
|
|
|
CHATOPS_WRITE = LegacyAccessControlCompatiblePermission(
|
|
|
|
|
Resources.CHATOPS, Actions.WRITE, LegacyAccessControlRole.EDITOR
|
|
|
|
|
)
|
|
|
|
|
CHATOPS_UPDATE_SETTINGS = LegacyAccessControlCompatiblePermission(
|
|
|
|
|
Resources.CHATOPS, Actions.UPDATE_SETTINGS, LegacyAccessControlRole.ADMIN
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
OUTGOING_WEBHOOKS_READ = LegacyAccessControlCompatiblePermission(
|
|
|
|
|
Resources.OUTGOING_WEBHOOKS, Actions.READ, LegacyAccessControlRole.VIEWER
|
|
|
|
|
)
|
|
|
|
|
OUTGOING_WEBHOOKS_WRITE = LegacyAccessControlCompatiblePermission(
|
|
|
|
|
Resources.OUTGOING_WEBHOOKS, Actions.WRITE, LegacyAccessControlRole.ADMIN
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
MAINTENANCE_READ = LegacyAccessControlCompatiblePermission(
|
|
|
|
|
Resources.MAINTENANCE, Actions.READ, LegacyAccessControlRole.VIEWER
|
|
|
|
|
)
|
|
|
|
|
MAINTENANCE_WRITE = LegacyAccessControlCompatiblePermission(
|
|
|
|
|
Resources.MAINTENANCE, Actions.WRITE, LegacyAccessControlRole.EDITOR
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
API_KEYS_READ = LegacyAccessControlCompatiblePermission(
|
2022-12-06 13:02:53 +01:00
|
|
|
Resources.API_KEYS, Actions.READ, LegacyAccessControlRole.ADMIN
|
2022-11-29 09:41:56 +01:00
|
|
|
)
|
|
|
|
|
API_KEYS_WRITE = LegacyAccessControlCompatiblePermission(
|
2022-12-06 13:02:53 +01:00
|
|
|
Resources.API_KEYS, Actions.WRITE, LegacyAccessControlRole.ADMIN
|
2022-11-29 09:41:56 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
NOTIFICATIONS_READ = LegacyAccessControlCompatiblePermission(
|
|
|
|
|
Resources.NOTIFICATIONS, Actions.READ, LegacyAccessControlRole.EDITOR
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
NOTIFICATION_SETTINGS_READ = LegacyAccessControlCompatiblePermission(
|
|
|
|
|
Resources.NOTIFICATION_SETTINGS, Actions.READ, LegacyAccessControlRole.VIEWER
|
|
|
|
|
)
|
|
|
|
|
NOTIFICATION_SETTINGS_WRITE = LegacyAccessControlCompatiblePermission(
|
|
|
|
|
Resources.NOTIFICATION_SETTINGS, Actions.WRITE, LegacyAccessControlRole.EDITOR
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
USER_SETTINGS_READ = LegacyAccessControlCompatiblePermission(
|
|
|
|
|
Resources.USER_SETTINGS, Actions.READ, LegacyAccessControlRole.VIEWER
|
|
|
|
|
)
|
|
|
|
|
USER_SETTINGS_WRITE = LegacyAccessControlCompatiblePermission(
|
|
|
|
|
Resources.USER_SETTINGS, Actions.WRITE, LegacyAccessControlRole.EDITOR
|
|
|
|
|
)
|
|
|
|
|
USER_SETTINGS_ADMIN = LegacyAccessControlCompatiblePermission(
|
|
|
|
|
Resources.USER_SETTINGS, Actions.ADMIN, LegacyAccessControlRole.ADMIN
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
OTHER_SETTINGS_READ = LegacyAccessControlCompatiblePermission(
|
|
|
|
|
Resources.OTHER_SETTINGS, Actions.READ, LegacyAccessControlRole.VIEWER
|
|
|
|
|
)
|
|
|
|
|
OTHER_SETTINGS_WRITE = LegacyAccessControlCompatiblePermission(
|
|
|
|
|
Resources.OTHER_SETTINGS, Actions.WRITE, LegacyAccessControlRole.ADMIN
|
|
|
|
|
)
|
|
|
|
|
|
2024-10-02 13:39:49 -04:00
|
|
|
# NOTE: we don't currently add the label delete permission here because we don't currently use this in OnCall
|
|
|
|
|
LABEL_CREATE = LegacyAccessControlCompatiblePermission(
|
|
|
|
|
Resources.LABEL, Actions.CREATE, LegacyAccessControlRole.EDITOR, prefix=PluginID.LABELS
|
|
|
|
|
)
|
|
|
|
|
LABEL_READ = LegacyAccessControlCompatiblePermission(
|
|
|
|
|
Resources.LABEL, Actions.READ, LegacyAccessControlRole.VIEWER, prefix=PluginID.LABELS
|
|
|
|
|
)
|
|
|
|
|
LABEL_WRITE = LegacyAccessControlCompatiblePermission(
|
|
|
|
|
Resources.LABEL, Actions.WRITE, LegacyAccessControlRole.EDITOR, prefix=PluginID.LABELS
|
|
|
|
|
)
|
|
|
|
|
|
2023-06-27 12:23:08 +02:00
|
|
|
# mypy complains about "Liskov substitution principle" here because request is `AuthenticatedRequest` object
|
|
|
|
|
# and not rest_framework.request.Request
|
|
|
|
|
# https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides
|
|
|
|
|
def has_permission(self, request: AuthenticatedRequest, view: ViewSetOrAPIView) -> bool: # type: ignore[override]
|
2023-01-20 09:19:41 +01:00
|
|
|
# the django-debug-toolbar UI makes OPTIONS calls. Without this statement the debug UI can't gather the
|
|
|
|
|
# necessary info it needs to work properly
|
|
|
|
|
if settings.DEBUG and request.method == "OPTIONS":
|
|
|
|
|
return True
|
|
|
|
|
|
2023-10-23 13:03:51 +02:00
|
|
|
action = get_view_action(request, view)
|
2022-11-29 09:41:56 +01:00
|
|
|
|
2023-06-27 12:23:08 +02:00
|
|
|
rbac_permissions: typing.Optional[RBACPermissionsAttribute] = getattr(view, RBAC_PERMISSIONS_ATTR, None)
|
2022-11-29 09:41:56 +01:00
|
|
|
|
|
|
|
|
# first check that the rbac_permissions dict attribute is defined
|
|
|
|
|
assert (
|
|
|
|
|
rbac_permissions is not None
|
|
|
|
|
), f"Must define a {RBAC_PERMISSIONS_ATTR} dict on the ViewSet that is consuming the RBACPermission class"
|
|
|
|
|
|
2023-06-27 12:23:08 +02:00
|
|
|
action_required_permissions: typing.Optional[typing.List] = rbac_permissions.get(action, None)
|
2022-11-29 09:41:56 +01:00
|
|
|
|
|
|
|
|
# next check that the action in question is defined within the rbac_permissions dict attribute
|
|
|
|
|
assert (
|
|
|
|
|
action_required_permissions is not None
|
|
|
|
|
), f"""Each action must be defined within the {RBAC_PERMISSIONS_ATTR} dict on the ViewSet.
|
|
|
|
|
\nIf an action requires no permissions, its value should explicitly be set to an empty list"""
|
|
|
|
|
|
|
|
|
|
return user_is_authorized(request.user, action_required_permissions)
|
|
|
|
|
|
2023-06-27 12:23:08 +02:00
|
|
|
# mypy complains about "Liskov substitution principle" here because request is `AuthenticatedRequest` object
|
|
|
|
|
# and not rest_framework.request.Request
|
|
|
|
|
# https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides
|
|
|
|
|
def has_object_permission(self, request: AuthenticatedRequest, view: ViewSetOrAPIView, obj: typing.Any) -> bool: # type: ignore[override]
|
|
|
|
|
rbac_object_permissions: typing.Optional[RBACObjectPermissionsAttribute] = getattr(
|
|
|
|
|
view, RBAC_OBJECT_PERMISSIONS_ATTR, None
|
|
|
|
|
)
|
2022-11-29 09:41:56 +01:00
|
|
|
|
|
|
|
|
if rbac_object_permissions:
|
2023-10-23 13:03:51 +02:00
|
|
|
action = get_view_action(request, view)
|
2022-11-29 09:41:56 +01:00
|
|
|
|
|
|
|
|
for permission_class, actions in rbac_object_permissions.items():
|
|
|
|
|
if action in actions:
|
|
|
|
|
return permission_class.has_object_permission(request, view, obj)
|
2023-07-21 21:35:19 +02:00
|
|
|
|
|
|
|
|
# Note: if an endpoint is not found within the rbac_object_permissions dictionary,
|
|
|
|
|
# that means object permissions are not relevant to this endpoint. Return True (authorized)
|
2022-11-29 09:41:56 +01:00
|
|
|
|
|
|
|
|
# has_object_permission is called after has_permission, so return True if in view there is not
|
|
|
|
|
# RBAC_OBJECT_PERMISSIONS_ATTR attr which mean no additional check involving object required
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
2023-05-02 08:19:34 -04:00
|
|
|
ALL_PERMISSION_NAMES = [perm for perm in dir(RBACPermission.Permissions) if not perm.startswith("_")]
|
2024-10-10 15:02:21 -04:00
|
|
|
ALL_PERMISSION_CLASSES: LegacyAccessControlCompatiblePermissions = [
|
2023-05-02 08:19:34 -04:00
|
|
|
getattr(RBACPermission.Permissions, permission_name) for permission_name in ALL_PERMISSION_NAMES
|
|
|
|
|
]
|
2024-10-10 15:02:21 -04:00
|
|
|
ALL_PERMISSION_CHOICES: typing.List[typing.Tuple[str, str]] = []
|
|
|
|
|
for permission_class, permission_name in zip(ALL_PERMISSION_CLASSES, ALL_PERMISSION_NAMES):
|
|
|
|
|
ALL_PERMISSION_CHOICES += [
|
|
|
|
|
(permission_class.value, permission_name),
|
|
|
|
|
(convert_oncall_permission_to_irm(permission_class), permission_name),
|
|
|
|
|
]
|
|
|
|
|
ALL_PERMISSION_NAME_TO_CLASS_MAP: typing.Dict[str, LegacyAccessControlCompatiblePermission] = {}
|
|
|
|
|
for permission_class in ALL_PERMISSION_CLASSES:
|
|
|
|
|
ALL_PERMISSION_NAME_TO_CLASS_MAP.update(
|
|
|
|
|
{
|
|
|
|
|
permission_class.value: permission_class,
|
|
|
|
|
convert_oncall_permission_to_irm(permission_class): permission_class,
|
|
|
|
|
}
|
|
|
|
|
)
|
2023-05-02 08:19:34 -04:00
|
|
|
|
|
|
|
|
|
2022-11-29 09:41:56 +01:00
|
|
|
class IsOwner(permissions.BasePermission):
|
|
|
|
|
def __init__(self, ownership_field: typing.Optional[str] = None) -> None:
|
|
|
|
|
self.ownership_field = ownership_field
|
|
|
|
|
|
2023-06-27 12:23:08 +02:00
|
|
|
# mypy complains about "Liskov substitution principle" here because request is `AuthenticatedRequest` object
|
|
|
|
|
# and not rest_framework.request.Request
|
|
|
|
|
# https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides
|
|
|
|
|
def has_object_permission(self, request: AuthenticatedRequest, _view: ViewSetOrAPIView, obj: typing.Any) -> bool: # type: ignore[override]
|
2022-11-29 09:41:56 +01:00
|
|
|
owner = obj if self.ownership_field is None else getattrd(obj, self.ownership_field)
|
|
|
|
|
return owner == request.user
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class HasRBACPermissions(permissions.BasePermission):
|
2023-06-27 12:23:08 +02:00
|
|
|
def __init__(self, required_permissions: LegacyAccessControlCompatiblePermissions) -> None:
|
2022-11-29 09:41:56 +01:00
|
|
|
self.required_permissions = required_permissions
|
|
|
|
|
|
2023-06-27 12:23:08 +02:00
|
|
|
# mypy complains about "Liskov substitution principle" here because request is `AuthenticatedRequest` object
|
|
|
|
|
# and not rest_framework.request.Request
|
|
|
|
|
# https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides
|
|
|
|
|
def has_object_permission(self, request: AuthenticatedRequest, _view: ViewSetOrAPIView, _obj: typing.Any) -> bool: # type: ignore[override]
|
2022-11-29 09:41:56 +01:00
|
|
|
return user_is_authorized(request.user, self.required_permissions)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class IsOwnerOrHasRBACPermissions(permissions.BasePermission):
|
|
|
|
|
def __init__(
|
|
|
|
|
self,
|
2023-06-27 12:23:08 +02:00
|
|
|
required_permissions: LegacyAccessControlCompatiblePermissions,
|
2022-11-29 09:41:56 +01:00
|
|
|
ownership_field: typing.Optional[str] = None,
|
|
|
|
|
) -> None:
|
|
|
|
|
self.IsOwner = IsOwner(ownership_field)
|
|
|
|
|
self.HasRBACPermissions = HasRBACPermissions(required_permissions)
|
|
|
|
|
|
2023-06-27 12:23:08 +02:00
|
|
|
# mypy complains about "Liskov substitution principle" here because request is `AuthenticatedRequest` object
|
|
|
|
|
# and not rest_framework.request.Request
|
|
|
|
|
# https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides
|
|
|
|
|
def has_object_permission(self, request: AuthenticatedRequest, view: ViewSetOrAPIView, obj: typing.Any) -> bool: # type: ignore[override]
|
2022-11-29 09:41:56 +01:00
|
|
|
return self.IsOwner.has_object_permission(request, view, obj) or self.HasRBACPermissions.has_object_permission(
|
|
|
|
|
request, view, obj
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class IsStaff(permissions.BasePermission):
|
|
|
|
|
STAFF_AUTH_CLASSES = [BasicAuthentication, SessionAuthentication]
|
|
|
|
|
|
2023-06-27 12:23:08 +02:00
|
|
|
# mypy complains about "Liskov substitution principle" here because request is `AuthenticatedRequest` object
|
|
|
|
|
# and not rest_framework.request.Request
|
|
|
|
|
# https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides
|
|
|
|
|
def has_permission(self, request: AuthenticatedDjangoAdminRequest, _view: ViewSet) -> bool: # type: ignore[override]
|
2022-11-29 09:41:56 +01:00
|
|
|
user = request.user
|
|
|
|
|
if not any(isinstance(request._authenticator, x) for x in self.STAFF_AUTH_CLASSES):
|
|
|
|
|
return False
|
|
|
|
|
if user and user.is_authenticated:
|
|
|
|
|
return user.is_staff
|
|
|
|
|
return False
|