oncall-engine/engine/apps/api/permissions.py
Matias Bordese 132bdf235b
feat: update service account auth not to require rbac enabled org (#5360)
Related to https://github.com/grafana/oncall-private/issues/2826

RBAC enabled or not (OSS or cloud), it is possible to get service
account permissions, enabling perm check (for service account tokens) in
public API.

Also allow empty value for users in sync (instead of returning a 400
response).
2024-12-12 22:11:59 +00:00

426 lines
19 KiB
Python

import enum
import typing
from django.conf import settings
from django.contrib.auth.models import AbstractUser
from rest_framework import permissions
from rest_framework.authentication import BasicAuthentication, SessionAuthentication
from rest_framework.request import Request
from rest_framework.views import APIView
from rest_framework.viewsets import ViewSet, ViewSetMixin
from common.constants.plugin_ids import PluginID
from common.utils import getattrd
if typing.TYPE_CHECKING:
from apps.user_management.models import Organization, User
RBAC_PERMISSIONS_ATTR = "rbac_permissions"
RBAC_OBJECT_PERMISSIONS_ATTR = "rbac_object_permissions"
ViewSetOrAPIView = typing.Union[ViewSet, APIView]
class AuthenticatedRequest(Request):
"""
Use this for typing, instead of rest_framework.request.Request, when you KNOW that the user is authenticated.
ex. In the RBACPermission class below, we know that the user is authenticated because this is handled by the
`authentication_classes` attribute on views.
https://github.com/typeddjango/django-stubs#how-can-i-create-a-httprequest-thats-guaranteed-to-have-an-authenticated-user
"""
# see comment above, this is safe. without the type-ignore comment, mypy complains
# expression has type "User", base class "Request" defined the type as "Union[AbstractBaseUser, AnonymousUser]"
user: "User" # type: ignore[assignment]
class AuthenticatedDjangoAdminRequest(Request):
"""
Use this for typing, instead of rest_framework.request.Request, when you KNOW that the user is authenticated via
Django admin user authentication.
https://github.com/typeddjango/django-stubs#how-can-i-create-a-httprequest-thats-guaranteed-to-have-an-authenticated-user
"""
user: AbstractUser
class GrafanaAPIPermission(typing.TypedDict):
action: str
class GrafanaAPIPermissions:
@classmethod
def construct_permissions(cls, actions: typing.List[str]) -> typing.List[GrafanaAPIPermission]:
return [GrafanaAPIPermission(action=action) for action in actions]
class Resources(enum.Enum):
ALERT_GROUPS = "alert-groups"
INTEGRATIONS = "integrations"
ESCALATION_CHAINS = "escalation-chains"
SCHEDULES = "schedules"
CHATOPS = "chatops"
OUTGOING_WEBHOOKS = "outgoing-webhooks"
MAINTENANCE = "maintenance"
API_KEYS = "api-keys"
NOTIFICATIONS = "notifications"
NOTIFICATION_SETTINGS = "notification-settings"
USER_SETTINGS = "user-settings"
OTHER_SETTINGS = "other-settings"
ADMIN = "admin"
LABEL = "label"
class Actions(enum.Enum):
READ = "read"
WRITE = "write"
ADMIN = "admin"
TEST = "test"
EXPORT = "export"
UPDATE_SETTINGS = "update-settings"
DIRECT_PAGING = "direct-paging"
CREATE = "create"
class LegacyAccessControlRole(enum.IntEnum):
ADMIN = 0
EDITOR = 1
VIEWER = 2
NONE = 3
@classmethod
def choices(cls):
return tuple((option.value, option.name) for option in cls)
class LegacyAccessControlCompatiblePermission:
def __init__(
self,
resource: Resources,
action: Actions,
fallback_role: LegacyAccessControlRole,
prefix: str = PluginID.ONCALL,
) -> None:
self.value = f"{prefix}.{resource.value}:{action.value}"
self.fallback_role = fallback_role
def user_has_permission(self, user: "User") -> bool:
return user_is_authorized(user, [self])
LegacyAccessControlCompatiblePermissions = typing.List[LegacyAccessControlCompatiblePermission]
RBACPermissionsAttribute = typing.Dict[str, LegacyAccessControlCompatiblePermissions]
RBACObjectPermissionsAttribute = typing.Dict[permissions.BasePermission, typing.List[str]]
def get_view_action(request: AuthenticatedRequest, view: ViewSetOrAPIView) -> str:
"""
For right now this needs to support being used in both a ViewSet as well as APIView, we use both interchangably
Note: `request.method` is returned uppercase
"""
return view.action if isinstance(view, ViewSetMixin) else (request.method or "").lower()
def get_most_authorized_role(permissions: LegacyAccessControlCompatiblePermissions) -> LegacyAccessControlRole:
if not permissions:
return LegacyAccessControlRole.NONE
# ex. Admin is 0, None is 3, thereby min makes sense here
return min({p.fallback_role for p in permissions}, key=lambda r: r.value)
def convert_oncall_permission_to_irm(permission: LegacyAccessControlCompatiblePermission) -> str:
return permission.value.replace(PluginID.ONCALL, PluginID.IRM)
def get_required_permission_values(
organization: "Organization", required_permissions: LegacyAccessControlCompatiblePermissions
) -> typing.List[str]:
"""
This function returns a list of required permission values, taking into account whether or not the organization
is using the IRM plugin.
If the IRM plugin is being used, we substitue `grafana-oncall-app` with `grafana-irm-app`
as the RBAC permission prefix.
"""
permission_values = []
for permission in required_permissions:
permission_value = permission.value
if permission_value.startswith(PluginID.ONCALL) and organization.is_grafana_irm_enabled:
permission_values.append(convert_oncall_permission_to_irm(permission))
else:
permission_values.append(permission_value)
return permission_values
def user_has_minimum_required_basic_role(user: "User", required_basic_role: LegacyAccessControlRole) -> bool:
return user.role <= required_basic_role.value
def user_is_authorized(user: "User", required_permissions: LegacyAccessControlCompatiblePermissions) -> bool:
"""
This function checks whether `user` has all necessary permissions specified in `required_permissions`.
RBAC permissions are used if RBAC is enabled for the organization, otherwise the fallback basic role is checked.
`user` - The user to check permissions for
`required_permissions` - A list of permissions that a user must have to be considered authorized
"""
organization = user.organization
if organization.is_rbac_permissions_enabled or user.is_service_account:
user_permissions = [u["action"] for u in user.permissions]
required_permission_values = get_required_permission_values(organization, required_permissions)
return all(permission in user_permissions for permission in required_permission_values)
return user_has_minimum_required_basic_role(user, get_most_authorized_role(required_permissions))
class RBACPermission(permissions.BasePermission):
class Permissions:
# NOTE: this is a bit of a hack for now. See https://github.com/grafana/support-escalations/issues/12625
# Basically when it comes to filtering teams that are configured to share their resources with
# "Team members and admins", we have no way of knowing, when a user is ACTUALLY an Admin when RBAC is involed.
#
# Example: Take a user with the basic role of None/Editor/Viewer but with the "OnCall Admin" role assigned.
# Without this RBAC permission, we have no way of knowing that the user is ACTUALLY an "Admin".
ADMIN = LegacyAccessControlCompatiblePermission(Resources.ADMIN, Actions.ADMIN, LegacyAccessControlRole.ADMIN)
ALERT_GROUPS_READ = LegacyAccessControlCompatiblePermission(
Resources.ALERT_GROUPS, Actions.READ, LegacyAccessControlRole.VIEWER
)
ALERT_GROUPS_WRITE = LegacyAccessControlCompatiblePermission(
Resources.ALERT_GROUPS, Actions.WRITE, LegacyAccessControlRole.EDITOR
)
ALERT_GROUPS_DIRECT_PAGING = LegacyAccessControlCompatiblePermission(
Resources.ALERT_GROUPS, Actions.DIRECT_PAGING, LegacyAccessControlRole.EDITOR
)
INTEGRATIONS_READ = LegacyAccessControlCompatiblePermission(
Resources.INTEGRATIONS, Actions.READ, LegacyAccessControlRole.VIEWER
)
INTEGRATIONS_TEST = LegacyAccessControlCompatiblePermission(
Resources.INTEGRATIONS, Actions.TEST, LegacyAccessControlRole.EDITOR
)
INTEGRATIONS_WRITE = LegacyAccessControlCompatiblePermission(
Resources.INTEGRATIONS, Actions.WRITE, LegacyAccessControlRole.ADMIN
)
ESCALATION_CHAINS_READ = LegacyAccessControlCompatiblePermission(
Resources.ESCALATION_CHAINS, Actions.READ, LegacyAccessControlRole.VIEWER
)
ESCALATION_CHAINS_WRITE = LegacyAccessControlCompatiblePermission(
Resources.ESCALATION_CHAINS, Actions.WRITE, LegacyAccessControlRole.ADMIN
)
SCHEDULES_READ = LegacyAccessControlCompatiblePermission(
Resources.SCHEDULES, Actions.READ, LegacyAccessControlRole.VIEWER
)
SCHEDULES_WRITE = LegacyAccessControlCompatiblePermission(
Resources.SCHEDULES, Actions.WRITE, LegacyAccessControlRole.EDITOR
)
SCHEDULES_EXPORT = LegacyAccessControlCompatiblePermission(
Resources.SCHEDULES, Actions.EXPORT, LegacyAccessControlRole.EDITOR
)
CHATOPS_READ = LegacyAccessControlCompatiblePermission(
Resources.CHATOPS, Actions.READ, LegacyAccessControlRole.VIEWER
)
CHATOPS_WRITE = LegacyAccessControlCompatiblePermission(
Resources.CHATOPS, Actions.WRITE, LegacyAccessControlRole.EDITOR
)
CHATOPS_UPDATE_SETTINGS = LegacyAccessControlCompatiblePermission(
Resources.CHATOPS, Actions.UPDATE_SETTINGS, LegacyAccessControlRole.ADMIN
)
OUTGOING_WEBHOOKS_READ = LegacyAccessControlCompatiblePermission(
Resources.OUTGOING_WEBHOOKS, Actions.READ, LegacyAccessControlRole.VIEWER
)
OUTGOING_WEBHOOKS_WRITE = LegacyAccessControlCompatiblePermission(
Resources.OUTGOING_WEBHOOKS, Actions.WRITE, LegacyAccessControlRole.ADMIN
)
MAINTENANCE_READ = LegacyAccessControlCompatiblePermission(
Resources.MAINTENANCE, Actions.READ, LegacyAccessControlRole.VIEWER
)
MAINTENANCE_WRITE = LegacyAccessControlCompatiblePermission(
Resources.MAINTENANCE, Actions.WRITE, LegacyAccessControlRole.EDITOR
)
API_KEYS_READ = LegacyAccessControlCompatiblePermission(
Resources.API_KEYS, Actions.READ, LegacyAccessControlRole.ADMIN
)
API_KEYS_WRITE = LegacyAccessControlCompatiblePermission(
Resources.API_KEYS, Actions.WRITE, LegacyAccessControlRole.ADMIN
)
NOTIFICATIONS_READ = LegacyAccessControlCompatiblePermission(
Resources.NOTIFICATIONS, Actions.READ, LegacyAccessControlRole.EDITOR
)
NOTIFICATION_SETTINGS_READ = LegacyAccessControlCompatiblePermission(
Resources.NOTIFICATION_SETTINGS, Actions.READ, LegacyAccessControlRole.VIEWER
)
NOTIFICATION_SETTINGS_WRITE = LegacyAccessControlCompatiblePermission(
Resources.NOTIFICATION_SETTINGS, Actions.WRITE, LegacyAccessControlRole.EDITOR
)
USER_SETTINGS_READ = LegacyAccessControlCompatiblePermission(
Resources.USER_SETTINGS, Actions.READ, LegacyAccessControlRole.VIEWER
)
USER_SETTINGS_WRITE = LegacyAccessControlCompatiblePermission(
Resources.USER_SETTINGS, Actions.WRITE, LegacyAccessControlRole.EDITOR
)
USER_SETTINGS_ADMIN = LegacyAccessControlCompatiblePermission(
Resources.USER_SETTINGS, Actions.ADMIN, LegacyAccessControlRole.ADMIN
)
OTHER_SETTINGS_READ = LegacyAccessControlCompatiblePermission(
Resources.OTHER_SETTINGS, Actions.READ, LegacyAccessControlRole.VIEWER
)
OTHER_SETTINGS_WRITE = LegacyAccessControlCompatiblePermission(
Resources.OTHER_SETTINGS, Actions.WRITE, LegacyAccessControlRole.ADMIN
)
# NOTE: we don't currently add the label delete permission here because we don't currently use this in OnCall
LABEL_CREATE = LegacyAccessControlCompatiblePermission(
Resources.LABEL, Actions.CREATE, LegacyAccessControlRole.EDITOR, prefix=PluginID.LABELS
)
LABEL_READ = LegacyAccessControlCompatiblePermission(
Resources.LABEL, Actions.READ, LegacyAccessControlRole.VIEWER, prefix=PluginID.LABELS
)
LABEL_WRITE = LegacyAccessControlCompatiblePermission(
Resources.LABEL, Actions.WRITE, LegacyAccessControlRole.EDITOR, prefix=PluginID.LABELS
)
# mypy complains about "Liskov substitution principle" here because request is `AuthenticatedRequest` object
# and not rest_framework.request.Request
# https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides
def has_permission(self, request: AuthenticatedRequest, view: ViewSetOrAPIView) -> bool: # type: ignore[override]
# the django-debug-toolbar UI makes OPTIONS calls. Without this statement the debug UI can't gather the
# necessary info it needs to work properly
if settings.DEBUG and request.method == "OPTIONS":
return True
action = get_view_action(request, view)
rbac_permissions: typing.Optional[RBACPermissionsAttribute] = getattr(view, RBAC_PERMISSIONS_ATTR, None)
# first check that the rbac_permissions dict attribute is defined
assert (
rbac_permissions is not None
), f"Must define a {RBAC_PERMISSIONS_ATTR} dict on the ViewSet that is consuming the RBACPermission class"
action_required_permissions: typing.Optional[typing.List] = rbac_permissions.get(action, None)
# next check that the action in question is defined within the rbac_permissions dict attribute
assert (
action_required_permissions is not None
), f"""Each action must be defined within the {RBAC_PERMISSIONS_ATTR} dict on the ViewSet.
\nIf an action requires no permissions, its value should explicitly be set to an empty list"""
return user_is_authorized(request.user, action_required_permissions)
# mypy complains about "Liskov substitution principle" here because request is `AuthenticatedRequest` object
# and not rest_framework.request.Request
# https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides
def has_object_permission(self, request: AuthenticatedRequest, view: ViewSetOrAPIView, obj: typing.Any) -> bool: # type: ignore[override]
rbac_object_permissions: typing.Optional[RBACObjectPermissionsAttribute] = getattr(
view, RBAC_OBJECT_PERMISSIONS_ATTR, None
)
if rbac_object_permissions:
action = get_view_action(request, view)
for permission_class, actions in rbac_object_permissions.items():
if action in actions:
return permission_class.has_object_permission(request, view, obj)
# Note: if an endpoint is not found within the rbac_object_permissions dictionary,
# that means object permissions are not relevant to this endpoint. Return True (authorized)
# has_object_permission is called after has_permission, so return True if in view there is not
# RBAC_OBJECT_PERMISSIONS_ATTR attr which mean no additional check involving object required
return True
ALL_PERMISSION_NAMES = [perm for perm in dir(RBACPermission.Permissions) if not perm.startswith("_")]
ALL_PERMISSION_CLASSES: LegacyAccessControlCompatiblePermissions = [
getattr(RBACPermission.Permissions, permission_name) for permission_name in ALL_PERMISSION_NAMES
]
ALL_PERMISSION_CHOICES: typing.List[typing.Tuple[str, str]] = []
for permission_class, permission_name in zip(ALL_PERMISSION_CLASSES, ALL_PERMISSION_NAMES):
ALL_PERMISSION_CHOICES += [
(permission_class.value, permission_name),
(convert_oncall_permission_to_irm(permission_class), permission_name),
]
ALL_PERMISSION_NAME_TO_CLASS_MAP: typing.Dict[str, LegacyAccessControlCompatiblePermission] = {}
for permission_class in ALL_PERMISSION_CLASSES:
ALL_PERMISSION_NAME_TO_CLASS_MAP.update(
{
permission_class.value: permission_class,
convert_oncall_permission_to_irm(permission_class): permission_class,
}
)
class IsOwner(permissions.BasePermission):
def __init__(self, ownership_field: typing.Optional[str] = None) -> None:
self.ownership_field = ownership_field
# mypy complains about "Liskov substitution principle" here because request is `AuthenticatedRequest` object
# and not rest_framework.request.Request
# https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides
def has_object_permission(self, request: AuthenticatedRequest, _view: ViewSetOrAPIView, obj: typing.Any) -> bool: # type: ignore[override]
owner = obj if self.ownership_field is None else getattrd(obj, self.ownership_field)
return owner == request.user
class HasRBACPermissions(permissions.BasePermission):
def __init__(self, required_permissions: LegacyAccessControlCompatiblePermissions) -> None:
self.required_permissions = required_permissions
# mypy complains about "Liskov substitution principle" here because request is `AuthenticatedRequest` object
# and not rest_framework.request.Request
# https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides
def has_object_permission(self, request: AuthenticatedRequest, _view: ViewSetOrAPIView, _obj: typing.Any) -> bool: # type: ignore[override]
return user_is_authorized(request.user, self.required_permissions)
class IsOwnerOrHasRBACPermissions(permissions.BasePermission):
def __init__(
self,
required_permissions: LegacyAccessControlCompatiblePermissions,
ownership_field: typing.Optional[str] = None,
) -> None:
self.IsOwner = IsOwner(ownership_field)
self.HasRBACPermissions = HasRBACPermissions(required_permissions)
# mypy complains about "Liskov substitution principle" here because request is `AuthenticatedRequest` object
# and not rest_framework.request.Request
# https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides
def has_object_permission(self, request: AuthenticatedRequest, view: ViewSetOrAPIView, obj: typing.Any) -> bool: # type: ignore[override]
return self.IsOwner.has_object_permission(request, view, obj) or self.HasRBACPermissions.has_object_permission(
request, view, obj
)
class IsStaff(permissions.BasePermission):
STAFF_AUTH_CLASSES = [BasicAuthentication, SessionAuthentication]
# mypy complains about "Liskov substitution principle" here because request is `AuthenticatedRequest` object
# and not rest_framework.request.Request
# https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides
def has_permission(self, request: AuthenticatedDjangoAdminRequest, _view: ViewSet) -> bool: # type: ignore[override]
user = request.user
if not any(isinstance(request._authenticator, x) for x in self.STAFF_AUTH_CLASSES):
return False
if user and user.is_authenticated:
return user.is_staff
return False