This commit is contained in:
Joey Orlando 2024-10-10 15:26:24 -04:00 committed by GitHub
commit b06f937188
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 1048 additions and 515 deletions

View file

@ -36,118 +36,107 @@ refs:
# Escalation Chains and Routes
Often alerts from monitoring systems need to be sent to different escalation chains and messaging channels, based on their severity, or other alert content.
In Grafana OnCall, configuring proper alert routing and escalation ensures that alerts are directed to the right teams and handled promptly.
Alerts often need to be sent to different teams or channels depending on their severity or specific alert details.
Set up routes and escalation chains to customize and automate escalation according to each teams workflows.
## Routes
Routes are used to determine which escalation chain should be used for a specific alert
group. A route's _[Routing Templates]_
are evaluated for each alert and **the first matching route** is used to determine the
escalation chain and chatops channels.
Routes determine which escalation chain should be triggered for a specific alert group based on the details of the alert.
A route uses [Routing Templates](ref:routing-templates) to determine the escalation chain and notification channels.
> **Example:**
>
>
> * trigger escalation chain called `Database Critical` for alerts with `{{ payload.severity == "critical" and payload.service == "database" }}` in the payload
> * create a different route for alerts with the payload `{{ "synthetic-monitoring-dev-" in payload.namespace }}` and select a escalation chain called `Security`.
When an alert is received, its details are evaluated against the route's routing template, and **the first matching route** determines how the alert will be handled.
### Manage routes
**Example:**
1. Open Integration page
1. Click **Add route** button to create a new route
1. Click **Edit** button to edit `Routing Template`. The routing template must evaluate to `True` for it to apply
1. Select channels in **Publish to Chatops** section
> **Note:** If the **Publish to Chatops** section doesn't exist, connect Chatops integrations first.
> For more information, refer to [Notify people].
1. Select **Escalation Chain** from the list
1. If **Escalation Chain** does not exist, click **Add new escalation chain** button to create a new one, it will open in a new tab.
1. Once created, **Reload list**, and select the new escalation chain
1. Click **Arrow Up** and **Arrow Down** on the right to change the order of routes
1. Click **Three dots** and **Delete Route** to delete the route
- Trigger the `Database Critical` escalation chain for alerts with `{{ payload.severity == "critical" and payload.service == "database" }}`
- Use a different route for alerts with the payload `{{ "synthetic-monitoring-dev-" in payload.namespace }}`, selecting the `Security` escalation chain.
### Routing based on labels
### Create and manage routes
> **Note:** Labels are currently available only in cloud.
To create or manage a route:
In addition, there is a `labels` variable available to your routing templates, which contains all of the labels assigned
to the Alert Group, as a `dict`. This allows you to route based on labels (or a mix of labels and/or payload based data):
1. Navigate to the **Integrations** page.
1. Click **Add route** to create a new route, or **Edit** to modify an existing one.
1. In the **Routing Template** section, define conditions that will determine which alerts this route applies to.
The template must evaluate to `True` for the route to be selected.
1. Select the appropriate escalation chain from the **Escalation Chain** dropdown.
If an escalation chain doesnt exist, click **Add new escalation chain**, which will open a new tab for chain creation.
After creating the chain, return to the routes page and click **Reload list** to update the available options.
1. In the **Publish to ChatOps** section, select the relevant communication channels for this route (Slack, Teams, etc.).
Ensure ChatOps integrations are configured before using this feature.
1. Arrange the routes by clicking the up/down arrows to prioritize the routes as needed. The order determines which route is evaluated first.
1. To delete a route, click the three dots on the route and select **Delete Route**.
> **Example:**
>
> * `{{ labels.foo == "bar" or "hello" in labels.keys() or payload.severity == "critical" }}`
### Label-based routing
{{< admonition type="note" >}}
This feature is available exclusively on Grafana Cloud.
{{< /admonition >}}
You can use the labels variable in your routing templates to evaluate based on alert group labels.
This provides additional flexibility in routing alerts based on both labels and payload data.
**Example:**
`{{ labels.foo == "bar" or "hello" in labels.keys() or payload.severity == "critical" }}`
## Escalation Chains
Once an alert group is created and assigned to the route with escalation chain, the
escalation chain will be executed. Until user performs an action, which stops the escalation
chain (e.g. acknowledge, resolve, silence etc), the escalation chain will continue to
execute.
Escalation chains define the series of actions taken when an alert is triggered.
The chain continues until a user intervenes by acknowledging, resolving, or silencing the alert.
Users can create escalation chains to configure different type of escalation workflows.
For example, you can create a chain that will notify on-call users with high priority, and
another chain that will only send a message into a Slack channel.
You can configure different escalation chains for different workflows.
For example, one chain might notify on-call users immediately, while another sends a low-priority message to a Slack channel.
Escalation chains determine Who and When to notify. How to notify is set by the user, based on their own preferences.
### Create and manage escalation chains
1. Navigate to the **Escalation Chains** page.
1. Click **New escalation chain** to create a new chain.
1. Enter a unique name and assign the chain to a team.
1. Click **Add escalation step** to define the steps for this chain (e.g., notifying users, waiting, escalating).
1. To edit an existing chain, click **Edit**. To remove a chain, click **Delete**.
{{< admonition type="note" >}}
- The name must be unique across the organization.
Alert groups inherit the team from the integration, not the escalation chain.
- Linked integrations and routes are shown in the right panel.
Changes to the escalation chain impact all associated integrations and routes.
{{< /admonition >}}
### Types of escalation steps
* `Wait` - wait for a specified amount of time before proceeding to the next step. If you
need a larger time interval, use multiple wait steps in a row.
* `Notify users` - send a notification to a user or a group of users.
* `Notify users from on-call schedule` - send a notification to a user or a group of users
from an on-call schedule.
* `Notify all users from a team` - send a notification to all users in a team.
* `Resolve incident automatically` - resolve the alert group right now with status
`Resolved automatically`.
* `Escalate to all Slack channel members` - send a notification to the users in the slack channel. These users will be notified
via the method configured in their user profile.
* `Notify Slack User Group` - send a notification to each member of a slack user group. These users will be notified
via the method configured in their user profile.
* `Trigger outgoing webhook` - trigger an [outgoing webhook].
* `Notify users one by one (round robin)` - notify users sequentially, cycling through users for **different alert groups**.
Example: if users A, B, and C are in the list, the first alert group notifies A, the second alert group notifies B, and
the third alert group notifies C. Note: users are sorted alphabetically by their username.
To notify multiple users **within the same alert group** until someone acknowledges, instead use `Notify users` policies with
`Wait` policies between them in the escalation chain.
* `Continue escalation if current time is in range` - continue escalation only if current
time is in specified range. It will wait for the specfied time to continue escalation.
Useful when you want to get escalation only during working hours
* `Continue escalation if >X alerts per Y minutes (beta)` - continue escalation only if it
passes some threshold
* `Repeat escalation from beginning (5 times max)` - loop the escalation chain
- `Wait`: Pause for a specified time before moving to the next step. You can add multiple wait steps for longer intervals.
- `Notify users`: Notify individual users or groups.
- `Notify users from on-call schedule`: Send notifications to users from a defined on-call schedule.
- `Notify all team members`: Notify all users in a team.
- `Resolve incident automatically`: Immediately resolve the alert group with the status `Resolved automatically`.
- `Notify Slack channel members`: Notify users in a Slack channel based on their OnCall profile preferences.
- `Notify Slack user group`: Notify all members of a Slack user group.
- `Trigger outgoing webhook`: Activate an [outgoing webhook](ref:outgoing-webhooks).
- `Round robin notifications`: Notify users sequentially, with each user receiving different alert groups.
- `Time-based escalation`: Continue escalation only if the current time falls within a specific range (e.g., during working hours)
- `Threshold-based escalation`: Escalate only if a certain number of alerts occur within a specific time frame.
- `Repeat escalation`: Loop the escalation chain up to five times.
- `Declare incident (non-default routes)`: **Available only in Grafana Cloud**. Declares an incident with a specified severity.
Limited to one incident per route at a time.
Additional alerts are grouped into the active incident, and up to five are listed as incident context.
> **Note:** Both "**Escalate to all Slack channel members**" and "**Notify Slack User Group**" will filter OnCall registered users
matching the users in the Slack channel or Slack User Group with their profiles linked to their Slack accounts (ie. users
should have linked their Slack and OnCall users). In both cases, the filtered users satisfying the criteria above are
notified following their respective notification policies. However, to avoid **spamming** the Slack channel/thread,
users **won't be notified** in the alert group Slack **thread** (this is how the feature is currently implemented)
but instead notify them using their **other defined** options in
their respective policies.
{{< admonition type="note" >}}
The **Notify Slack channel members** and **Notify Slack user group** steps are designed to notify OnCall-registered users via their configured notification rules.
To avoid spamming a Slack channel with alert group notifications, notifications are not sent in the alert group Slack thread.
{{< /admonition >}}
### Notification types
Each escalation step that notifies a user, does so by triggering their personal notification steps. These are configured in the Grafana
OnCall users page (by clicking "View my profile").
It will be executed for each user in the escalation step
User can configure two types of personal notification chains:
When an escalation step notifies a user, it follows their personal notification settings, which are configured in their user profile.
* **Default Notifications**
Each user can have two sets of notification rules:
* **Important Notifications**
- **Default Notifications**: For standard alerts.
- **Important Notifications**: For high-priority alerts.
In the escalation step, user can select which type of notification to use.
For more information, refer to [Notify people].
### Manage Escalation Chains
1. Open **Escalation Chains** page
2. Click **New escalation chain** button to create a new escalation chain
3. Enter a name and assign it to a team
> **Note:** Name must be unique across organization
> **Note:** Alert Groups inherit the team from the Integration, not the Escalation Chain
4. Click **Add escalation step** button to add a new step
5. Click **Delete** to delete the Escalation Chain, and **Edit** to edit the name or the team.
> **Important:** Linked Integrations and Routes are displayed in the right panel. Any change in the Escalation Chain will
affect all linked Integrations and Routes.
Each escalation step allows you to select which set of notification rules to use.
For more information about user notification rules, refer to the [Notifications](ref:notify-people) section.

View file

@ -42,7 +42,7 @@ The above command returns JSON structured in the following way:
| ---------------------------------- | :--------------------------------------: | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `escalation_chain_id` | Yes | Each escalation policy is assigned to a specific escalation chain. |
| `position` | Optional | Escalation policies execute one after another starting from `position=0`. `Position=-1` will put the escalation policy to the end of the list. A new escalation policy created with a position of an existing escalation policy will move the old one (and all following) down in the list. |
| `type` | Yes | One of: `wait`, `notify_persons`, `notify_person_next_each_time`, `notify_on_call_from_schedule`, `notify_user_group`, `trigger_webhook`, `resolve`, `notify_whole_channel`, `notify_if_time_from_to`. |
| `type` | Yes | One of: `wait`, `notify_persons`, `notify_person_next_each_time`, `notify_on_call_from_schedule`, `notify_user_group`, `trigger_webhook`, `resolve`, `notify_whole_channel`, `notify_if_time_from_to`, `declare_incident`. |
| `important` | Optional | Default is `false`. Will assign "important" to personal notification rules if `true`. This can be used to distinguish alerts on which you want to be notified immediately by phone. Applicable for types `notify_persons`, `notify_team_members`, `notify_on_call_from_schedule`, and `notify_user_group`. |
| `duration` | If type = `wait` | The duration, in seconds, when type `wait` is chosen. Valid values are: `60`, `300`, `900`, `1800`, `3600`. |
| `action_to_trigger` | If type = `trigger_webhook` | ID of a webhook. |
@ -52,7 +52,8 @@ The above command returns JSON structured in the following way:
| `notify_on_call _from_schedule` | If type = `notify_on_call_from_schedule` | ID of a Schedule. |
| `notify_if_time_from` | If type = `notify_if_time_from_to` | UTC time represents the beginning of the time period, for example `09:00:00Z`. |
| `notify_if_time_to` | If type = `notify_if_time_from_to` | UTC time represents the end of the time period, for example `18:00:00Z`. |
| `team_to_notify` | If type = `notify_team_members` | ID of a team. |
| `team_to_notify` | If type = `notify_team_members` | ID of a team. |
| `severity` | If type = `declare_incident` | Severity of the incident. |
**HTTP request**

View file

@ -1,7 +1,7 @@
import re
from apps.alerts.incident_appearance.templaters.alert_templater import AlertTemplater
from common.utils import convert_md_to_html, escape_html, url_re, urlize_with_respect_to_a
from common.utils import convert_md_to_html, escape_html, url_re, urlize_with_respect_to_a, validate_url
class AlertWebTemplater(AlertTemplater):
@ -26,7 +26,7 @@ class AlertWebTemplater(AlertTemplater):
message = message.replace(substitution, original_link)
templated_alert.message = urlize_with_respect_to_a(message)
if templated_alert.image_url:
templated_alert.image_url = escape_html(templated_alert.image_url)
templated_alert.image_url = validate_url(templated_alert.image_url)
return templated_alert

View file

@ -422,6 +422,8 @@ class AlertReceiveChannel(IntegrationOptionsMixin, MaintainableObject):
@property
def new_incidents_web_link(self):
from apps.alerts.models import AlertGroup
return UIURLBuilder(self.organization).alert_groups(
f"?integration={self.public_primary_key}&status={AlertGroup.NEW}",
)

View file

@ -400,7 +400,7 @@ def perform_notification(log_record_pk, use_default_notification_policy_fallback
UserNotificationPolicyLogRecord(
author=user,
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED,
notification_policy=notification_policy,
notification_policy=notification_policy if not use_default_notification_policy_fallback else None,
reason="Expected data is missing",
alert_group=alert_group,
notification_step=notification_policy.step if notification_policy else None,
@ -424,7 +424,7 @@ def perform_notification(log_record_pk, use_default_notification_policy_fallback
UserNotificationPolicyLogRecord(
author=user,
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED,
notification_policy=notification_policy,
notification_policy=notification_policy if not use_default_notification_policy_fallback else None,
reason="Skipped notification because alert group is resolved",
alert_group=alert_group,
notification_step=notification_policy.step if notification_policy else None,

View file

@ -55,6 +55,37 @@ def test_render_web_alert_links(
)
@pytest.mark.django_db
@pytest.mark.parametrize(
"url,expected",
[
("https://example.com", "https://example.com"),
('https://some-url"<script>hey</script>', None),
("https://example.com?something=foo&else=bar", "https://example.com?something=foo&else=bar"),
],
)
def test_render_web_image_url(
make_organization_and_user_with_slack_identities,
make_alert_receive_channel,
make_alert_group,
make_alert,
url,
expected,
):
organization, _, _, _ = make_organization_and_user_with_slack_identities()
alert_receive_channel = make_alert_receive_channel(
organization,
web_image_url_template="{{ payload.image_url }}",
)
alert_group = make_alert_group(alert_receive_channel)
alert = make_alert(alert_group=alert_group, raw_request_data={"image_url": url})
templater = AlertWebTemplater(alert)
templated_alert = templater.render()
assert templated_alert.image_url == expected
@pytest.mark.django_db
def test_getattr_template(
make_organization_and_user_with_slack_identities,

View file

@ -13,7 +13,7 @@ from common.constants.plugin_ids import PluginID
from common.utils import getattrd
if typing.TYPE_CHECKING:
from apps.user_management.models import User
from apps.user_management.models import Organization, User
RBAC_PERMISSIONS_ATTR = "rbac_permissions"
RBAC_OBJECT_PERMISSIONS_ATTR = "rbac_object_permissions"
@ -50,6 +50,12 @@ class GrafanaAPIPermission(typing.TypedDict):
action: str
class GrafanaAPIPermissions:
@classmethod
def construct_permissions(cls, actions: typing.List[str]) -> typing.List[GrafanaAPIPermission]:
return [GrafanaAPIPermission(action=action) for action in actions]
class Resources(enum.Enum):
ALERT_GROUPS = "alert-groups"
INTEGRATIONS = "integrations"
@ -103,6 +109,9 @@ class LegacyAccessControlCompatiblePermission:
self.value = f"{prefix}.{resource.value}:{action.value}"
self.fallback_role = fallback_role
def user_has_permission(self, user: "User") -> bool:
return user_is_authorized(user, [self])
LegacyAccessControlCompatiblePermissions = typing.List[LegacyAccessControlCompatiblePermission]
RBACPermissionsAttribute = typing.Dict[str, LegacyAccessControlCompatiblePermissions]
@ -126,6 +135,36 @@ def get_most_authorized_role(permissions: LegacyAccessControlCompatiblePermissio
return min({p.fallback_role for p in permissions}, key=lambda r: r.value)
def convert_oncall_permission_to_irm(permission: LegacyAccessControlCompatiblePermission) -> str:
return permission.value.replace(PluginID.ONCALL, PluginID.IRM)
def get_required_permission_values(
organization: "Organization", required_permissions: LegacyAccessControlCompatiblePermissions
) -> typing.List[str]:
"""
This function returns a list of required permission values, taking into account whether or not the organization
is using the IRM plugin.
If the IRM plugin is being used, we substitue `grafana-oncall-app` with `grafana-irm-app`
as the RBAC permission prefix.
"""
permission_values = []
for permission in required_permissions:
permission_value = permission.value
if permission_value.startswith(PluginID.ONCALL) and organization.is_grafana_irm_enabled:
permission_values.append(convert_oncall_permission_to_irm(permission))
else:
permission_values.append(permission_value)
return permission_values
def user_has_minimum_required_basic_role(user: "User", required_basic_role: LegacyAccessControlRole) -> bool:
return user.role <= required_basic_role.value
def user_is_authorized(user: "User", required_permissions: LegacyAccessControlCompatiblePermissions) -> bool:
"""
This function checks whether `user` has all necessary permissions specified in `required_permissions`.
@ -134,11 +173,12 @@ def user_is_authorized(user: "User", required_permissions: LegacyAccessControlCo
`user` - The user to check permissions for
`required_permissions` - A list of permissions that a user must have to be considered authorized
"""
if user.organization.is_rbac_permissions_enabled:
organization = user.organization
if organization.is_rbac_permissions_enabled:
user_permissions = [u["action"] for u in user.permissions]
required_permission_values = [p.value for p in required_permissions]
required_permission_values = get_required_permission_values(organization, required_permissions)
return all(permission in user_permissions for permission in required_permission_values)
return user.role <= get_most_authorized_role(required_permissions).value
return user_has_minimum_required_basic_role(user, get_most_authorized_role(required_permissions))
class RBACPermission(permissions.BasePermission):
@ -310,20 +350,23 @@ class RBACPermission(permissions.BasePermission):
ALL_PERMISSION_NAMES = [perm for perm in dir(RBACPermission.Permissions) if not perm.startswith("_")]
ALL_PERMISSION_CLASSES = [
ALL_PERMISSION_CLASSES: LegacyAccessControlCompatiblePermissions = [
getattr(RBACPermission.Permissions, permission_name) for permission_name in ALL_PERMISSION_NAMES
]
ALL_PERMISSION_CHOICES = [
(permission_class.value, permission_name)
for permission_class, permission_name in zip(ALL_PERMISSION_CLASSES, ALL_PERMISSION_NAMES)
]
def get_permission_from_permission_string(perm: str) -> typing.Optional[LegacyAccessControlCompatiblePermission]:
for permission_class in ALL_PERMISSION_CLASSES:
if permission_class.value == perm:
return permission_class
return None
ALL_PERMISSION_CHOICES: typing.List[typing.Tuple[str, str]] = []
for permission_class, permission_name in zip(ALL_PERMISSION_CLASSES, ALL_PERMISSION_NAMES):
ALL_PERMISSION_CHOICES += [
(permission_class.value, permission_name),
(convert_oncall_permission_to_irm(permission_class), permission_name),
]
ALL_PERMISSION_NAME_TO_CLASS_MAP: typing.Dict[str, LegacyAccessControlCompatiblePermission] = {}
for permission_class in ALL_PERMISSION_CLASSES:
ALL_PERMISSION_NAME_TO_CLASS_MAP.update(
{
permission_class.value: permission_class,
convert_oncall_permission_to_irm(permission_class): permission_class,
}
)
class IsOwner(permissions.BasePermission):

View file

@ -786,7 +786,7 @@ def test_alert_receive_channel_preview_template_require_notification_channel(
"api-internal:alert_receive_channel-preview-template", kwargs={"pk": alert_receive_channel.public_primary_key}
)
data = {
"template_body": "Template",
"template_body": "Template" if template_name != "image_url" else "http://example.com/image.jpg",
"template_name": template_name,
}
@ -795,7 +795,7 @@ def test_alert_receive_channel_preview_template_require_notification_channel(
assert response.status_code == status.HTTP_400_BAD_REQUEST
data = {
"template_body": "Template",
"template_body": "Template" if template_name != "image_url" else "http://example.com/image.jpg",
"template_name": f"{notification_channel}_{template_name}",
}
@ -830,7 +830,7 @@ def test_alert_receive_channel_preview_template_dynamic_payload(
data = {
"template_body": "{{ payload.foo }}",
"template_name": f"{notification_channel}_{template_name}",
"payload": {"foo": "bar"},
"payload": {"foo": "bar" if template_name != "image_url" else "http://example.com/image.jpg"},
}
response = client.post(url, data=data, format="json", **make_user_auth_headers(user, token))
@ -839,7 +839,7 @@ def test_alert_receive_channel_preview_template_dynamic_payload(
if notification_channel == "web" and template_name == "message":
assert response.data["preview"] == "<p>bar</p>"
else:
assert response.data["preview"] == "bar"
assert response.data["preview"] == data["payload"]["foo"]
@pytest.mark.django_db

View file

@ -4,47 +4,18 @@ import pytest
from rest_framework.views import APIView
from rest_framework.viewsets import ViewSetMixin
from apps.api.permissions import (
RBAC_PERMISSIONS_ATTR,
GrafanaAPIPermission,
HasRBACPermissions,
IsOwner,
IsOwnerOrHasRBACPermissions,
LegacyAccessControlCompatiblePermission,
LegacyAccessControlRole,
RBACObjectPermissionsAttribute,
RBACPermission,
RBACPermissionsAttribute,
get_most_authorized_role,
get_view_action,
user_is_authorized,
)
class MockedOrg:
def __init__(self, org_has_rbac_enabled: bool) -> None:
self.is_rbac_permissions_enabled = org_has_rbac_enabled
class MockedUser:
def __init__(
self,
permissions: typing.List[LegacyAccessControlCompatiblePermission],
org_has_rbac_enabled=True,
basic_role: typing.Optional[LegacyAccessControlRole] = None,
) -> None:
self.permissions = [GrafanaAPIPermission(action=perm.value) for perm in permissions]
self.role = basic_role if basic_role is not None else get_most_authorized_role(permissions)
self.organization = MockedOrg(org_has_rbac_enabled)
from apps.api import permissions
from apps.user_management.models import User
from common.constants.plugin_ids import PluginID
class MockedSchedule:
def __init__(self, user: MockedUser) -> None:
def __init__(self, user: User) -> None:
self.user = user
class MockedRequest:
def __init__(self, user: typing.Optional[MockedUser] = None, method: typing.Optional[str] = None) -> None:
def __init__(self, user: typing.Optional[User] = None, method: typing.Optional[str] = None) -> None:
if user:
self.user = user
if method:
@ -55,8 +26,8 @@ class MockedViewSet(ViewSetMixin):
def __init__(
self,
action: str,
rbac_permissions: typing.Optional[RBACPermissionsAttribute] = None,
rbac_object_permissions: typing.Optional[RBACObjectPermissionsAttribute] = None,
rbac_permissions: typing.Optional[permissions.RBACPermissionsAttribute] = None,
rbac_object_permissions: typing.Optional[permissions.RBACObjectPermissionsAttribute] = None,
) -> None:
super().__init__()
self.action = action
@ -70,8 +41,8 @@ class MockedViewSet(ViewSetMixin):
class MockedAPIView(APIView):
def __init__(
self,
rbac_permissions: typing.Optional[RBACPermissionsAttribute] = None,
rbac_object_permissions: typing.Optional[RBACObjectPermissionsAttribute] = None,
rbac_permissions: typing.Optional[permissions.RBACPermissionsAttribute] = None,
rbac_object_permissions: typing.Optional[permissions.RBACObjectPermissionsAttribute] = None,
) -> None:
super().__init__()
@ -81,84 +52,263 @@ class MockedAPIView(APIView):
self.rbac_object_permissions = rbac_object_permissions
class TestLegacyAccessControlCompatiblePermission:
@pytest.mark.parametrize(
"permission_to_test,user_basic_role,is_rbac_permissions_enabled,is_grafana_irm_enabled,expected_result",
[
# rbac enabled - is_grafana_irm_enabled disabled
(
permissions.RBACPermission.Permissions.ALERT_GROUPS_READ,
permissions.LegacyAccessControlRole.VIEWER,
True,
False,
True,
),
(
permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE,
permissions.LegacyAccessControlRole.VIEWER,
True,
False,
False,
),
# rbac enabled - is_grafana_irm_enabled enabled
(
permissions.RBACPermission.Permissions.ALERT_GROUPS_READ,
permissions.LegacyAccessControlRole.VIEWER,
True,
True,
True,
),
(
permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE,
permissions.LegacyAccessControlRole.VIEWER,
True,
True,
False,
),
# rbac disabled (and hence is_grafana_irm_enabled is irrelevant)
(
permissions.RBACPermission.Permissions.ALERT_GROUPS_READ,
permissions.LegacyAccessControlRole.VIEWER,
False,
False,
True,
),
(
permissions.RBACPermission.Permissions.ALERT_GROUPS_READ,
permissions.LegacyAccessControlRole.VIEWER,
False,
True,
True,
),
(
permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE,
permissions.LegacyAccessControlRole.VIEWER,
False,
False,
False,
),
(
permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE,
permissions.LegacyAccessControlRole.VIEWER,
False,
True,
False,
),
],
)
@pytest.mark.django_db
def test_user_has_permission(
self,
make_organization,
make_user_for_organization,
permission_to_test,
user_basic_role,
is_rbac_permissions_enabled,
is_grafana_irm_enabled,
expected_result,
):
user_permission = permissions.RBACPermission.Permissions.ALERT_GROUPS_READ
org = make_organization(
is_rbac_permissions_enabled=is_rbac_permissions_enabled, is_grafana_irm_enabled=is_grafana_irm_enabled
)
user = make_user_for_organization(
org,
role=user_basic_role,
permissions=permissions.GrafanaAPIPermissions.construct_permissions(
[
permissions.convert_oncall_permission_to_irm(user_permission)
if is_grafana_irm_enabled
else user_permission.value
]
),
)
assert permission_to_test.user_has_permission(user) == expected_result
@pytest.mark.parametrize(
"user_permissions,required_permissions,org_has_rbac_enabled,expected_result",
"user_role,required_basic_role,expected_result",
[
(
[RBACPermission.Permissions.ALERT_GROUPS_READ],
[RBACPermission.Permissions.ALERT_GROUPS_READ],
True,
True,
),
(
[RBACPermission.Permissions.ALERT_GROUPS_READ],
[RBACPermission.Permissions.ALERT_GROUPS_READ],
False,
True,
),
(
[RBACPermission.Permissions.ALERT_GROUPS_READ, RBACPermission.Permissions.ALERT_GROUPS_WRITE],
[RBACPermission.Permissions.ALERT_GROUPS_READ, RBACPermission.Permissions.ALERT_GROUPS_WRITE],
True,
True,
),
(
[RBACPermission.Permissions.ALERT_GROUPS_READ, RBACPermission.Permissions.ALERT_GROUPS_WRITE],
[RBACPermission.Permissions.ALERT_GROUPS_READ, RBACPermission.Permissions.ALERT_GROUPS_WRITE],
False,
True,
),
(
[RBACPermission.Permissions.ALERT_GROUPS_WRITE],
[RBACPermission.Permissions.ALERT_GROUPS_READ],
True,
False,
),
(
[RBACPermission.Permissions.ALERT_GROUPS_WRITE],
[RBACPermission.Permissions.ALERT_GROUPS_READ],
False,
True,
),
(
[RBACPermission.Permissions.ALERT_GROUPS_READ],
[RBACPermission.Permissions.ALERT_GROUPS_READ, RBACPermission.Permissions.ALERT_GROUPS_WRITE],
False,
False,
),
(
[RBACPermission.Permissions.ALERT_GROUPS_READ],
[RBACPermission.Permissions.ALERT_GROUPS_READ, RBACPermission.Permissions.ALERT_GROUPS_WRITE],
True,
False,
),
(permissions.LegacyAccessControlRole.NONE, permissions.LegacyAccessControlRole.NONE, True),
(permissions.LegacyAccessControlRole.NONE, permissions.LegacyAccessControlRole.VIEWER, False),
(permissions.LegacyAccessControlRole.NONE, permissions.LegacyAccessControlRole.EDITOR, False),
(permissions.LegacyAccessControlRole.NONE, permissions.LegacyAccessControlRole.ADMIN, False),
(permissions.LegacyAccessControlRole.VIEWER, permissions.LegacyAccessControlRole.NONE, True),
(permissions.LegacyAccessControlRole.VIEWER, permissions.LegacyAccessControlRole.VIEWER, True),
(permissions.LegacyAccessControlRole.VIEWER, permissions.LegacyAccessControlRole.EDITOR, False),
(permissions.LegacyAccessControlRole.VIEWER, permissions.LegacyAccessControlRole.ADMIN, False),
(permissions.LegacyAccessControlRole.EDITOR, permissions.LegacyAccessControlRole.NONE, True),
(permissions.LegacyAccessControlRole.EDITOR, permissions.LegacyAccessControlRole.VIEWER, True),
(permissions.LegacyAccessControlRole.EDITOR, permissions.LegacyAccessControlRole.EDITOR, True),
(permissions.LegacyAccessControlRole.EDITOR, permissions.LegacyAccessControlRole.ADMIN, False),
(permissions.LegacyAccessControlRole.ADMIN, permissions.LegacyAccessControlRole.NONE, True),
(permissions.LegacyAccessControlRole.ADMIN, permissions.LegacyAccessControlRole.VIEWER, True),
(permissions.LegacyAccessControlRole.ADMIN, permissions.LegacyAccessControlRole.EDITOR, True),
(permissions.LegacyAccessControlRole.ADMIN, permissions.LegacyAccessControlRole.ADMIN, True),
],
)
def test_user_is_authorized(user_permissions, required_permissions, org_has_rbac_enabled, expected_result) -> None:
user = MockedUser(user_permissions, org_has_rbac_enabled=org_has_rbac_enabled)
assert user_is_authorized(user, required_permissions) == expected_result
@pytest.mark.django_db
def test_user_has_minimum_required_basic_role(
make_organization,
make_user_for_organization,
user_role,
required_basic_role,
expected_result,
):
org = make_organization()
user = make_user_for_organization(org, role=user_role, permissions=[])
assert permissions.user_has_minimum_required_basic_role(user, required_basic_role) is expected_result
@pytest.mark.parametrize("is_grafana_irm_enabled", [True, False])
@pytest.mark.parametrize(
"permissions,expected_role",
"user_permissions,required_permissions,is_rbac_permissions_enabled,expected_result",
[
([RBACPermission.Permissions.ALERT_GROUPS_READ], RBACPermission.Permissions.ALERT_GROUPS_READ.fallback_role),
(
[RBACPermission.Permissions.ALERT_GROUPS_READ, RBACPermission.Permissions.ALERT_GROUPS_WRITE],
RBACPermission.Permissions.ALERT_GROUPS_WRITE.fallback_role,
[permissions.RBACPermission.Permissions.ALERT_GROUPS_READ],
[permissions.RBACPermission.Permissions.ALERT_GROUPS_READ],
True,
True,
),
(
[permissions.RBACPermission.Permissions.ALERT_GROUPS_READ],
[permissions.RBACPermission.Permissions.ALERT_GROUPS_READ],
False,
True,
),
(
[
RBACPermission.Permissions.USER_SETTINGS_READ,
RBACPermission.Permissions.USER_SETTINGS_WRITE,
RBACPermission.Permissions.USER_SETTINGS_ADMIN,
permissions.RBACPermission.Permissions.ALERT_GROUPS_READ,
permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE,
],
RBACPermission.Permissions.USER_SETTINGS_ADMIN.fallback_role,
[
permissions.RBACPermission.Permissions.ALERT_GROUPS_READ,
permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE,
],
True,
True,
),
(
[
permissions.RBACPermission.Permissions.ALERT_GROUPS_READ,
permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE,
],
[
permissions.RBACPermission.Permissions.ALERT_GROUPS_READ,
permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE,
],
False,
True,
),
(
[permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE],
[permissions.RBACPermission.Permissions.ALERT_GROUPS_READ],
True,
False,
),
(
[permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE],
[permissions.RBACPermission.Permissions.ALERT_GROUPS_READ],
False,
True,
),
(
[permissions.RBACPermission.Permissions.ALERT_GROUPS_READ],
[
permissions.RBACPermission.Permissions.ALERT_GROUPS_READ,
permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE,
],
False,
False,
),
(
[permissions.RBACPermission.Permissions.ALERT_GROUPS_READ],
[
permissions.RBACPermission.Permissions.ALERT_GROUPS_READ,
permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE,
],
True,
False,
),
],
)
def test_get_most_authorized_role(permissions, expected_role) -> None:
assert get_most_authorized_role(permissions) == expected_role
@pytest.mark.django_db
def test_user_is_authorized(
make_organization,
make_user_for_organization,
user_permissions,
required_permissions,
is_rbac_permissions_enabled,
is_grafana_irm_enabled,
expected_result,
) -> None:
basic_role = permissions.get_most_authorized_role(user_permissions)
org = make_organization(
is_rbac_permissions_enabled=is_rbac_permissions_enabled, is_grafana_irm_enabled=is_grafana_irm_enabled
)
user = make_user_for_organization(
org,
role=basic_role,
permissions=permissions.GrafanaAPIPermissions.construct_permissions(
[
permissions.convert_oncall_permission_to_irm(perm) if is_grafana_irm_enabled else perm.value
for perm in user_permissions
]
),
)
assert permissions.user_is_authorized(user, required_permissions) == expected_result
@pytest.mark.parametrize(
"user_permissions,expected_role",
[
(
[permissions.RBACPermission.Permissions.ALERT_GROUPS_READ],
permissions.RBACPermission.Permissions.ALERT_GROUPS_READ.fallback_role,
),
(
[
permissions.RBACPermission.Permissions.ALERT_GROUPS_READ,
permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE,
],
permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE.fallback_role,
),
(
[
permissions.RBACPermission.Permissions.USER_SETTINGS_READ,
permissions.RBACPermission.Permissions.USER_SETTINGS_WRITE,
permissions.RBACPermission.Permissions.USER_SETTINGS_ADMIN,
],
permissions.RBACPermission.Permissions.USER_SETTINGS_ADMIN.fallback_role,
),
],
)
def test_get_most_authorized_role(user_permissions, expected_role) -> None:
assert permissions.get_most_authorized_role(user_permissions) == expected_role
def test_get_view_action():
@ -170,13 +320,18 @@ def test_get_view_action():
method = "APIVIEW_ACTION"
request = MockedRequest(method=method)
assert get_view_action(request, viewset) == viewset_action, "it works with a ViewSet"
assert get_view_action(request, apiview) == method.lower(), "it works with an APIView"
assert permissions.get_view_action(request, viewset) == viewset_action, "it works with a ViewSet"
assert permissions.get_view_action(request, apiview) == method.lower(), "it works with an APIView"
class TestRBACPermission:
def test_has_permission_works_on_a_viewset_view(self) -> None:
required_permission = RBACPermission.Permissions.ALERT_GROUPS_READ
@pytest.mark.django_db
def test_has_permission_works_on_a_viewset_view(
self,
make_organization,
make_user_for_organization,
) -> None:
required_permission = permissions.RBACPermission.Permissions.ALERT_GROUPS_READ
action = "hello"
viewset = MockedViewSet(
@ -193,26 +348,42 @@ class TestRBACPermission:
},
)
user_with_permission = MockedUser([required_permission])
user_without_permission = MockedUser([RBACPermission.Permissions.ALERT_GROUPS_WRITE])
org = make_organization(is_rbac_permissions_enabled=True)
user_with_permission = make_user_for_organization(
org,
role=permissions.LegacyAccessControlRole.NONE,
permissions=permissions.GrafanaAPIPermissions.construct_permissions([required_permission.value]),
)
user_without_permission = make_user_for_organization(
org,
role=permissions.LegacyAccessControlRole.NONE,
permissions=permissions.GrafanaAPIPermissions.construct_permissions(
[permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE.value]
),
)
assert (
RBACPermission().has_permission(MockedRequest(user_with_permission), viewset) is True
permissions.RBACPermission().has_permission(MockedRequest(user_with_permission), viewset) is True
), "it works on a viewset when the user does have permission"
assert (
RBACPermission().has_permission(MockedRequest(user_without_permission), viewset) is False
permissions.RBACPermission().has_permission(MockedRequest(user_without_permission), viewset) is False
), "it works on a viewset when the user does have permission"
assert (
RBACPermission().has_permission(
permissions.RBACPermission().has_permission(
MockedRequest(user_without_permission), viewset_with_no_required_permissions
)
is True
), "it works on a viewset when the viewset action does not require permissions"
def test_has_permission_works_on_an_apiview_view(self) -> None:
required_permission = RBACPermission.Permissions.ALERT_GROUPS_READ
@pytest.mark.django_db
def test_has_permission_works_on_an_apiview_view(
self,
make_organization,
make_user_for_organization,
) -> None:
required_permission = permissions.RBACPermission.Permissions.ALERT_GROUPS_READ
method = "hello"
apiview = MockedAPIView(
@ -226,61 +397,70 @@ class TestRBACPermission:
}
)
user1 = MockedUser([required_permission])
user2 = MockedUser([RBACPermission.Permissions.ALERT_GROUPS_WRITE])
org = make_organization(is_rbac_permissions_enabled=True)
user1 = make_user_for_organization(
org,
role=permissions.LegacyAccessControlRole.NONE,
permissions=permissions.GrafanaAPIPermissions.construct_permissions([required_permission.value]),
)
user2 = make_user_for_organization(
org,
role=permissions.LegacyAccessControlRole.NONE,
permissions=permissions.GrafanaAPIPermissions.construct_permissions(
[permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE.value]
),
)
class Request(MockedRequest):
def __init__(self, user: typing.Optional[MockedUser] = None) -> None:
def __init__(self, user: typing.Optional[User] = None) -> None:
super().__init__(user, method)
assert (
RBACPermission().has_permission(Request(user1), apiview) is True
permissions.RBACPermission().has_permission(Request(user1), apiview) is True
), "it works on an APIView when the user has permission"
assert (
RBACPermission().has_permission(Request(user2), apiview) is False
permissions.RBACPermission().has_permission(Request(user2), apiview) is False
), "it works on an APIView when the user does not have permission"
assert (
RBACPermission().has_permission(Request(user2), apiview_with_no_permissions) is True
permissions.RBACPermission().has_permission(Request(user2), apiview_with_no_permissions) is True
), "it works on a viewset when the viewset action does not require permissions"
def test_has_permission_throws_assertion_error_if_developer_forgets_to_specify_rbac_permissions(self) -> None:
action_slash_method = "hello"
error_msg = (
f"Must define a {RBAC_PERMISSIONS_ATTR} dict on the ViewSet that is consuming the RBACPermission class"
)
error_msg = f"Must define a {permissions.RBAC_PERMISSIONS_ATTR} dict on the ViewSet that is consuming the RBACPermission class"
viewset = MockedViewSet(action_slash_method)
apiview = MockedAPIView()
with pytest.raises(AssertionError, match=error_msg):
RBACPermission().has_permission(MockedRequest(), viewset)
permissions.RBACPermission().has_permission(MockedRequest(), viewset)
with pytest.raises(AssertionError, match=error_msg):
RBACPermission().has_permission(MockedRequest(method=action_slash_method), apiview)
permissions.RBACPermission().has_permission(MockedRequest(method=action_slash_method), apiview)
def test_has_permission_throws_assertion_error_if_developer_forgets_to_specify_an_action_in_rbac_permissions(
self,
) -> None:
action_slash_method = "hello"
other_action_rbac_permissions = {"bonjour": []}
error_msg = f"""Each action must be defined within the {RBAC_PERMISSIONS_ATTR} dict on the ViewSet.
error_msg = f"""Each action must be defined within the {permissions.RBAC_PERMISSIONS_ATTR} dict on the ViewSet.
\nIf an action requires no permissions, its value should explicitly be set to an empty list"""
viewset = MockedViewSet(action_slash_method, other_action_rbac_permissions)
apiview = MockedAPIView(rbac_permissions=other_action_rbac_permissions)
with pytest.raises(AssertionError, match=error_msg):
RBACPermission().has_permission(MockedRequest(), viewset)
permissions.RBACPermission().has_permission(MockedRequest(), viewset)
with pytest.raises(AssertionError, match=error_msg):
RBACPermission().has_permission(MockedRequest(method=action_slash_method), apiview)
permissions.RBACPermission().has_permission(MockedRequest(method=action_slash_method), apiview)
def test_has_object_permission_returns_true_if_rbac_object_permissions_not_specified(self) -> None:
request = MockedRequest()
assert RBACPermission().has_object_permission(request, MockedAPIView(), None) is True
assert RBACPermission().has_object_permission(request, MockedViewSet("potato"), None) is True
assert permissions.RBACPermission().has_object_permission(request, MockedAPIView(), None) is True
assert permissions.RBACPermission().has_object_permission(request, MockedViewSet("potato"), None) is True
def test_has_object_permission_works_if_no_permission_class_specified_for_action(self) -> None:
action = "hello"
@ -289,8 +469,8 @@ class TestRBACPermission:
apiview = MockedAPIView(rbac_object_permissions={})
viewset = MockedViewSet(action, rbac_object_permissions={})
assert RBACPermission().has_object_permission(request, apiview, None) is True
assert RBACPermission().has_object_permission(request, viewset, None) is True
assert permissions.RBACPermission().has_object_permission(request, apiview, None) is True
assert permissions.RBACPermission().has_object_permission(request, viewset, None) is True
def test_has_object_permission_returns_true_if_action_omitted_from_rbac_object_permissions(self) -> None:
action1 = "hello"
@ -308,8 +488,8 @@ class TestRBACPermission:
apiview = MockedAPIView(rbac_object_permissions=rbac_object_permissions)
viewset = MockedViewSet(action2, rbac_object_permissions=rbac_object_permissions)
assert RBACPermission().has_object_permission(request, apiview, None) is True
assert RBACPermission().has_object_permission(request, viewset, None) is True
assert permissions.RBACPermission().has_object_permission(request, apiview, None) is True
assert permissions.RBACPermission().has_object_permission(request, viewset, None) is True
def test_has_object_permission_works_when_permission_class_specified_for_action(self) -> None:
action = "hello"
@ -324,39 +504,66 @@ class TestRBACPermission:
apiview = MockedAPIView(rbac_object_permissions=rbac_object_permissions)
viewset = MockedViewSet(action, rbac_object_permissions=rbac_object_permissions)
assert RBACPermission().has_object_permission(request, apiview, None) == mocked_permission_class_response
assert RBACPermission().has_object_permission(request, viewset, None) == mocked_permission_class_response
assert (
permissions.RBACPermission().has_object_permission(request, apiview, None)
== mocked_permission_class_response
)
assert (
permissions.RBACPermission().has_object_permission(request, viewset, None)
== mocked_permission_class_response
)
class TestIsOwner:
def test_it_works_when_comparing_user_to_object(self) -> None:
user1 = MockedUser([])
user2 = MockedUser([])
@pytest.mark.django_db
def test_it_works_when_comparing_user_to_object(
self,
make_organization,
make_user_for_organization,
) -> None:
org = make_organization(is_rbac_permissions_enabled=True)
user1 = make_user_for_organization(org, role=permissions.LegacyAccessControlRole.NONE, permissions=[])
user2 = make_user_for_organization(org, role=permissions.LegacyAccessControlRole.NONE, permissions=[])
request = MockedRequest(user1)
IsUser = IsOwner()
IsUser = permissions.IsOwner()
assert IsUser.has_object_permission(request, None, user1) is True
assert IsUser.has_object_permission(request, None, user2) is False
def test_it_works_when_comparing_user_to_ownership_field_object(self) -> None:
user1 = MockedUser([])
user2 = MockedUser([])
@pytest.mark.django_db
def test_it_works_when_comparing_user_to_ownership_field_object(
self,
make_organization,
make_user_for_organization,
) -> None:
org = make_organization(is_rbac_permissions_enabled=True)
user1 = make_user_for_organization(org, role=permissions.LegacyAccessControlRole.NONE, permissions=[])
user2 = make_user_for_organization(org, role=permissions.LegacyAccessControlRole.NONE, permissions=[])
schedule = MockedSchedule(user1)
IsScheduleOwner = IsOwner("user")
IsScheduleOwner = permissions.IsOwner("user")
assert IsScheduleOwner.has_object_permission(MockedRequest(user1), None, schedule) is True
assert IsScheduleOwner.has_object_permission(MockedRequest(user2), None, schedule) is False
def test_it_works_when_comparing_user_to_nested_ownership_field_object(self) -> None:
@pytest.mark.django_db
def test_it_works_when_comparing_user_to_nested_ownership_field_object(
self,
make_organization,
make_user_for_organization,
) -> None:
class Thingy:
def __init__(self, schedule: MockedSchedule) -> None:
self.schedule = schedule
user1 = MockedUser([])
user2 = MockedUser([])
org = make_organization(is_rbac_permissions_enabled=True)
user1 = make_user_for_organization(org, role=permissions.LegacyAccessControlRole.NONE, permissions=[])
user2 = make_user_for_organization(org, role=permissions.LegacyAccessControlRole.NONE, permissions=[])
schedule = MockedSchedule(user1)
thingy = Thingy(schedule)
IsScheduleOwner = IsOwner("schedule.user")
IsScheduleOwner = permissions.IsOwner("schedule.user")
assert IsScheduleOwner.has_object_permission(MockedRequest(user1), None, thingy) is True
assert IsScheduleOwner.has_object_permission(MockedRequest(user2), None, thingy) is False
@ -366,80 +573,140 @@ class TestIsOwner:
"user_permissions,required_permissions,expected_result",
[
(
[RBACPermission.Permissions.ALERT_GROUPS_READ],
[RBACPermission.Permissions.ALERT_GROUPS_READ],
[permissions.RBACPermission.Permissions.ALERT_GROUPS_READ],
[permissions.RBACPermission.Permissions.ALERT_GROUPS_READ],
True,
),
(
[RBACPermission.Permissions.ALERT_GROUPS_READ, RBACPermission.Permissions.ALERT_GROUPS_WRITE],
[RBACPermission.Permissions.ALERT_GROUPS_READ, RBACPermission.Permissions.ALERT_GROUPS_WRITE],
[
permissions.RBACPermission.Permissions.ALERT_GROUPS_READ,
permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE,
],
[
permissions.RBACPermission.Permissions.ALERT_GROUPS_READ,
permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE,
],
True,
),
(
[RBACPermission.Permissions.ALERT_GROUPS_WRITE],
[RBACPermission.Permissions.ALERT_GROUPS_READ],
[permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE],
[permissions.RBACPermission.Permissions.ALERT_GROUPS_READ],
False,
),
(
[RBACPermission.Permissions.ALERT_GROUPS_READ],
[RBACPermission.Permissions.ALERT_GROUPS_READ, RBACPermission.Permissions.ALERT_GROUPS_WRITE],
[permissions.RBACPermission.Permissions.ALERT_GROUPS_READ],
[
permissions.RBACPermission.Permissions.ALERT_GROUPS_READ,
permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE,
],
False,
),
],
)
def test_HasRBACPermission(user_permissions, required_permissions, expected_result) -> None:
request = MockedRequest(MockedUser(user_permissions))
assert HasRBACPermissions(required_permissions).has_object_permission(request, None, None) == expected_result
@pytest.mark.django_db
def test_HasRBACPermission(
make_organization,
make_user_for_organization,
user_permissions,
required_permissions,
expected_result,
) -> None:
org = make_organization(is_rbac_permissions_enabled=True)
user = make_user_for_organization(
org,
role=permissions.LegacyAccessControlRole.NONE,
permissions=permissions.GrafanaAPIPermissions.construct_permissions([perm.value for perm in user_permissions]),
)
request = MockedRequest(user)
assert (
permissions.HasRBACPermissions(required_permissions).has_object_permission(request, None, None)
== expected_result
)
class TestIsOwnerOrHasRBACPermissions:
required_permission = RBACPermission.Permissions.SCHEDULES_READ
required_permission = permissions.RBACPermission.Permissions.SCHEDULES_READ
required_permissions = [required_permission]
user_permissions = permissions.GrafanaAPIPermissions.construct_permissions(
[perm.value for perm in required_permissions]
)
def test_it_works_when_user_is_owner_and_does_not_have_permissions(self) -> None:
user1 = MockedUser([])
@pytest.mark.django_db
def test_it_works_when_user_is_owner_and_does_not_have_permissions(
self,
make_organization,
make_user_for_organization,
) -> None:
org = make_organization(is_rbac_permissions_enabled=True)
user1 = make_user_for_organization(org, role=permissions.LegacyAccessControlRole.NONE, permissions=[])
schedule = MockedSchedule(user1)
request = MockedRequest(user1)
PermClass = IsOwnerOrHasRBACPermissions(self.required_permissions)
PermClass = permissions.IsOwnerOrHasRBACPermissions(self.required_permissions)
assert PermClass.has_object_permission(request, None, user1) is True
PermClass = IsOwnerOrHasRBACPermissions(self.required_permissions, "user")
PermClass = permissions.IsOwnerOrHasRBACPermissions(self.required_permissions, "user")
assert PermClass.has_object_permission(request, None, schedule) is True
def test_it_works_when_user_is_owner_and_has_permissions(self) -> None:
user1 = MockedUser(self.required_permissions)
@pytest.mark.django_db
def test_it_works_when_user_is_owner_and_has_permissions(
self,
make_organization,
make_user_for_organization,
) -> None:
org = make_organization(is_rbac_permissions_enabled=True)
user1 = make_user_for_organization(
org, role=permissions.LegacyAccessControlRole.NONE, permissions=self.user_permissions
)
schedule = MockedSchedule(user1)
request = MockedRequest(user1)
PermClass = IsOwnerOrHasRBACPermissions(self.required_permissions)
PermClass = permissions.IsOwnerOrHasRBACPermissions(self.required_permissions)
assert PermClass.has_object_permission(request, None, user1) is True
PermClass = IsOwnerOrHasRBACPermissions(self.required_permissions, "user")
PermClass = permissions.IsOwnerOrHasRBACPermissions(self.required_permissions, "user")
assert PermClass.has_object_permission(request, None, schedule) is True
def test_it_works_when_user_is_not_owner_and_does_not_have_permissions(self) -> None:
user1 = MockedUser([])
user2 = MockedUser([])
@pytest.mark.django_db
def test_it_works_when_user_is_not_owner_and_does_not_have_permissions(
self,
make_organization,
make_user_for_organization,
) -> None:
org = make_organization(is_rbac_permissions_enabled=True)
user1 = make_user_for_organization(org, role=permissions.LegacyAccessControlRole.NONE, permissions=[])
user2 = make_user_for_organization(org, role=permissions.LegacyAccessControlRole.NONE, permissions=[])
schedule = MockedSchedule(user1)
request = MockedRequest(user2)
PermClass = IsOwnerOrHasRBACPermissions(self.required_permissions)
PermClass = permissions.IsOwnerOrHasRBACPermissions(self.required_permissions)
assert PermClass.has_object_permission(request, None, user1) is False
PermClass = IsOwnerOrHasRBACPermissions(self.required_permissions, "user")
PermClass = permissions.IsOwnerOrHasRBACPermissions(self.required_permissions, "user")
assert PermClass.has_object_permission(request, None, schedule) is False
def test_it_works_when_user_is_not_owner_and_has_permissions(self) -> None:
user1 = MockedUser([])
user2 = MockedUser(self.required_permissions)
@pytest.mark.django_db
def test_it_works_when_user_is_not_owner_and_has_permissions(
self,
make_organization,
make_user_for_organization,
) -> None:
org = make_organization(is_rbac_permissions_enabled=True)
user1 = make_user_for_organization(org, role=permissions.LegacyAccessControlRole.NONE, permissions=[])
user2 = make_user_for_organization(
org, role=permissions.LegacyAccessControlRole.NONE, permissions=self.user_permissions
)
user3 = make_user_for_organization(org, role=permissions.LegacyAccessControlRole.NONE, permissions=[])
schedule = MockedSchedule(user1)
request = MockedRequest(user2)
request_user3 = MockedRequest(user3)
PermClass = IsOwnerOrHasRBACPermissions(self.required_permissions)
PermClass = permissions.IsOwnerOrHasRBACPermissions(self.required_permissions)
assert PermClass.has_object_permission(request, None, user1) is True
PermClass = IsOwnerOrHasRBACPermissions(self.required_permissions, "user")
PermClass = permissions.IsOwnerOrHasRBACPermissions(self.required_permissions, "user")
assert PermClass.has_object_permission(request, None, schedule) is True
class Thingy:
@ -447,7 +714,96 @@ class TestIsOwnerOrHasRBACPermissions:
self.schedule = schedule
thingy = Thingy(schedule)
PermClass = IsOwnerOrHasRBACPermissions(self.required_permissions, "schedule.user")
PermClass = permissions.IsOwnerOrHasRBACPermissions(self.required_permissions, "schedule.user")
assert PermClass.has_object_permission(request, None, thingy) is True
assert PermClass.has_object_permission(MockedRequest(MockedUser([])), None, thingy) is False
assert PermClass.has_object_permission(request_user3, None, thingy) is False
@pytest.mark.parametrize(
"permission,expected",
[
(
permissions.RBACPermission.Permissions.ALERT_GROUPS_READ,
f"{PluginID.IRM}.alert-groups:read",
),
(
permissions.RBACPermission.Permissions.LABEL_READ,
permissions.RBACPermission.Permissions.LABEL_READ.value,
),
],
)
def test_convert_oncall_permission_to_irm(permission, expected) -> None:
assert permissions.convert_oncall_permission_to_irm(permission) == expected
@pytest.mark.parametrize(
"is_grafana_irm_enabled,required_permissions,expected_permission_values",
[
(
False,
[
permissions.RBACPermission.Permissions.ALERT_GROUPS_READ,
permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE,
],
[
permissions.RBACPermission.Permissions.ALERT_GROUPS_READ.value,
permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE.value,
],
),
(
True,
[
permissions.RBACPermission.Permissions.ALERT_GROUPS_READ,
permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE,
],
[
permissions.RBACPermission.Permissions.ALERT_GROUPS_READ.value.replace(PluginID.ONCALL, PluginID.IRM),
permissions.RBACPermission.Permissions.ALERT_GROUPS_WRITE.value.replace(PluginID.ONCALL, PluginID.IRM),
],
),
(
True,
[
permissions.RBACPermission.Permissions.LABEL_CREATE,
permissions.RBACPermission.Permissions.LABEL_WRITE,
permissions.RBACPermission.Permissions.LABEL_READ,
],
[
permissions.RBACPermission.Permissions.LABEL_CREATE.value,
permissions.RBACPermission.Permissions.LABEL_WRITE.value,
permissions.RBACPermission.Permissions.LABEL_READ.value,
],
),
],
)
@pytest.mark.django_db
def test_get_required_permission_values(
make_organization,
is_grafana_irm_enabled,
required_permissions,
expected_permission_values,
) -> None:
organization = make_organization(is_rbac_permissions_enabled=True, is_grafana_irm_enabled=is_grafana_irm_enabled)
assert permissions.get_required_permission_values(organization, required_permissions) == expected_permission_values
@pytest.mark.parametrize(
"perm,expected_permission",
[
(
permissions.RBACPermission.Permissions.ALERT_GROUPS_READ.value,
permissions.RBACPermission.Permissions.ALERT_GROUPS_READ,
),
(
"non.existent.permission",
None,
),
(
permissions.convert_oncall_permission_to_irm(permissions.RBACPermission.Permissions.ALERT_GROUPS_READ),
permissions.RBACPermission.Permissions.ALERT_GROUPS_READ,
),
],
)
def test_all_permission_name_to_class_map(perm, expected_permission) -> None:
assert permissions.ALL_PERMISSION_NAME_TO_CLASS_MAP.get(perm, None) == expected_permission

View file

@ -11,7 +11,7 @@ from rest_framework import status
from rest_framework.response import Response
from rest_framework.test import APIClient
from apps.api.permissions import GrafanaAPIPermission, LegacyAccessControlRole, RBACPermission
from apps.api import permissions
from apps.api.serializers.user import UserHiddenFieldsSerializer
from apps.api.views.user import UPCOMING_SHIFTS_DEFAULT_DAYS
from apps.base.models import UserNotificationPolicy
@ -231,7 +231,7 @@ def test_list_users(
):
organization = make_organization()
admin = make_user_for_organization(organization, _verified_phone_number="1234567890")
editor = make_user_for_organization(organization, role=LegacyAccessControlRole.EDITOR)
editor = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.EDITOR)
_, token = make_token_for_organization(organization)
client = APIClient()
@ -316,21 +316,39 @@ def test_list_users(
assert response.json() == expected_payload
@pytest.mark.parametrize("is_grafana_irm_enabled", [False, True])
@pytest.mark.django_db
def test_list_users_filtered_by_granted_permission(
is_grafana_irm_enabled,
make_organization,
make_user_for_organization,
make_token_for_organization,
make_user_auth_headers,
):
perm_to_filter_on = RBACPermission.Permissions.NOTIFICATIONS_READ.value
perms_to_grant = [GrafanaAPIPermission(action=perm_to_filter_on)]
permission = permissions.RBACPermission.Permissions.NOTIFICATIONS_READ
admin_perm_required_to_call_endpoint = permissions.RBACPermission.Permissions.USER_SETTINGS_READ
perm_to_filter_on = (
permissions.convert_oncall_permission_to_irm(permission) if is_grafana_irm_enabled else permission.value
)
organization = make_organization()
admin_user = make_user_for_organization(organization)
perms_to_grant = permissions.GrafanaAPIPermissions.construct_permissions([perm_to_filter_on])
organization = make_organization(is_grafana_irm_enabled=is_grafana_irm_enabled, is_rbac_permissions_enabled=True)
admin_user = make_user_for_organization(
organization,
# NOTE: need to explicitly grant this permission here because otherwise the permissions granted by the
# make_user_for_organization fixture will only grant the oncall flavour of the permission
permissions=permissions.GrafanaAPIPermissions.construct_permissions(
[
permissions.convert_oncall_permission_to_irm(admin_perm_required_to_call_endpoint)
if is_grafana_irm_enabled
else admin_perm_required_to_call_endpoint.value
]
),
)
user1 = make_user_for_organization(organization, permissions=perms_to_grant)
user2 = make_user_for_organization(organization, permissions=perms_to_grant)
user3 = make_user_for_organization(organization, role=LegacyAccessControlRole.VIEWER)
user3 = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.VIEWER)
_, token = make_token_for_organization(organization)
client = APIClient()
@ -343,7 +361,8 @@ def test_list_users_filtered_by_granted_permission(
assert response.status_code == status.HTTP_200_OK
returned_user_pks = [u["pk"] for u in response.json()["results"]]
assert admin_user.public_primary_key in returned_user_pks
assert len(returned_user_pks) == 2
assert user1.public_primary_key in returned_user_pks
assert user2.public_primary_key in returned_user_pks
assert user3.public_primary_key not in returned_user_pks
@ -481,10 +500,10 @@ def test_notification_chain_verbal(
@pytest.mark.parametrize(
"role,expected_status",
[
(LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
(LegacyAccessControlRole.EDITOR, status.HTTP_200_OK),
(LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN),
(LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN),
(permissions.LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
(permissions.LegacyAccessControlRole.EDITOR, status.HTTP_200_OK),
(permissions.LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN),
(permissions.LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN),
],
)
def test_user_update_self_permissions(
@ -511,10 +530,10 @@ def test_user_update_self_permissions(
@pytest.mark.parametrize(
"role,expected_status",
[
(LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
(LegacyAccessControlRole.EDITOR, status.HTTP_403_FORBIDDEN),
(LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN),
(LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN),
(permissions.LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
(permissions.LegacyAccessControlRole.EDITOR, status.HTTP_403_FORBIDDEN),
(permissions.LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN),
(permissions.LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN),
],
)
def test_user_update_other_permissions(
@ -540,10 +559,10 @@ def test_user_update_other_permissions(
@pytest.mark.parametrize(
"role,expected_status",
[
(LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
(LegacyAccessControlRole.EDITOR, status.HTTP_200_OK),
(LegacyAccessControlRole.VIEWER, status.HTTP_200_OK),
(LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN),
(permissions.LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
(permissions.LegacyAccessControlRole.EDITOR, status.HTTP_200_OK),
(permissions.LegacyAccessControlRole.VIEWER, status.HTTP_200_OK),
(permissions.LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN),
],
)
def test_user_list_permissions(
@ -571,10 +590,10 @@ def test_user_list_permissions(
@pytest.mark.parametrize(
"role,expected_status",
[
(LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
(LegacyAccessControlRole.EDITOR, status.HTTP_200_OK),
(LegacyAccessControlRole.VIEWER, status.HTTP_200_OK),
(LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN),
(permissions.LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
(permissions.LegacyAccessControlRole.EDITOR, status.HTTP_200_OK),
(permissions.LegacyAccessControlRole.VIEWER, status.HTTP_200_OK),
(permissions.LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN),
],
)
def test_user_detail_self_permissions(
@ -602,10 +621,10 @@ def test_user_detail_self_permissions(
@pytest.mark.parametrize(
"role,expected_status",
[
(LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
(LegacyAccessControlRole.EDITOR, status.HTTP_200_OK),
(LegacyAccessControlRole.VIEWER, status.HTTP_200_OK),
(LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN),
(permissions.LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
(permissions.LegacyAccessControlRole.EDITOR, status.HTTP_200_OK),
(permissions.LegacyAccessControlRole.VIEWER, status.HTTP_200_OK),
(permissions.LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN),
],
)
def test_user_detail_other_permissions(
@ -625,7 +644,7 @@ def test_user_detail_other_permissions(
assert response.status_code == expected_status
# hidden information for editor/viewer
available_fields = UserHiddenFieldsSerializer.fields_available_for_all_users + ["hidden_fields"]
if role in (LegacyAccessControlRole.EDITOR, LegacyAccessControlRole.VIEWER):
if role in (permissions.LegacyAccessControlRole.EDITOR, permissions.LegacyAccessControlRole.VIEWER):
user_details = response.json()
for f_name in user_details:
if f_name not in available_fields:
@ -636,10 +655,10 @@ def test_user_detail_other_permissions(
@pytest.mark.parametrize(
"role,expected_status",
[
(LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
(LegacyAccessControlRole.EDITOR, status.HTTP_200_OK),
(LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN),
(LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN),
(permissions.LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
(permissions.LegacyAccessControlRole.EDITOR, status.HTTP_200_OK),
(permissions.LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN),
(permissions.LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN),
],
)
def test_user_get_own_verification_code(
@ -667,10 +686,10 @@ def test_user_get_own_verification_code(
@pytest.mark.parametrize(
"role,expected_status",
[
(LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
(LegacyAccessControlRole.EDITOR, status.HTTP_403_FORBIDDEN),
(LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN),
(LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN),
(permissions.LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
(permissions.LegacyAccessControlRole.EDITOR, status.HTTP_403_FORBIDDEN),
(permissions.LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN),
(permissions.LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN),
],
)
def test_user_get_other_verification_code(
@ -740,10 +759,10 @@ def test_verification_code_provider_exception(
@pytest.mark.parametrize(
"role,expected_status",
[
(LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
(LegacyAccessControlRole.EDITOR, status.HTTP_200_OK),
(LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN),
(LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN),
(permissions.LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
(permissions.LegacyAccessControlRole.EDITOR, status.HTTP_200_OK),
(permissions.LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN),
(permissions.LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN),
],
)
def test_user_verify_own_phone(
@ -771,10 +790,10 @@ def test_user_verify_own_phone(
@pytest.mark.parametrize(
"role,expected_status",
[
(LegacyAccessControlRole.ADMIN, status.HTTP_400_BAD_REQUEST),
(LegacyAccessControlRole.EDITOR, status.HTTP_400_BAD_REQUEST),
(LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN),
(LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN),
(permissions.LegacyAccessControlRole.ADMIN, status.HTTP_400_BAD_REQUEST),
(permissions.LegacyAccessControlRole.EDITOR, status.HTTP_400_BAD_REQUEST),
(permissions.LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN),
(permissions.LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN),
],
)
def test_user_get_own_telegram_verification_code_with_telegram_connected(
@ -802,10 +821,10 @@ Tests below are outdated
@pytest.mark.parametrize(
"role,expected_status",
[
(LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
(LegacyAccessControlRole.EDITOR, status.HTTP_403_FORBIDDEN),
(LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN),
(LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN),
(permissions.LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
(permissions.LegacyAccessControlRole.EDITOR, status.HTTP_403_FORBIDDEN),
(permissions.LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN),
(permissions.LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN),
],
)
def test_user_verify_another_phone(
@ -816,7 +835,7 @@ def test_user_verify_another_phone(
expected_status,
):
organization, tester, token = make_organization_and_user_with_plugin_token(role)
other_user = make_user_for_organization(organization, role=LegacyAccessControlRole.EDITOR)
other_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.EDITOR)
client = APIClient()
url = reverse("api-internal:user-verify-number", kwargs={"pk": other_user.public_primary_key})
@ -831,10 +850,10 @@ def test_user_verify_another_phone(
@pytest.mark.parametrize(
"role,expected_status",
[
(LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
(LegacyAccessControlRole.EDITOR, status.HTTP_200_OK),
(LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN),
(LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN),
(permissions.LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
(permissions.LegacyAccessControlRole.EDITOR, status.HTTP_200_OK),
(permissions.LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN),
(permissions.LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN),
],
)
def test_user_get_own_telegram_verification_code(
@ -856,10 +875,10 @@ def test_user_get_own_telegram_verification_code(
@pytest.mark.parametrize(
"role,expected_status",
[
(LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
(LegacyAccessControlRole.EDITOR, status.HTTP_403_FORBIDDEN),
(LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN),
(LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN),
(permissions.LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
(permissions.LegacyAccessControlRole.EDITOR, status.HTTP_403_FORBIDDEN),
(permissions.LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN),
(permissions.LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN),
],
)
def test_user_get_another_telegram_verification_code(
@ -870,7 +889,7 @@ def test_user_get_another_telegram_verification_code(
expected_status,
):
organization, tester, token = make_organization_and_user_with_plugin_token(role)
other_user = make_user_for_organization(organization, role=LegacyAccessControlRole.EDITOR)
other_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.EDITOR)
client = APIClient()
url = reverse("api-internal:user-get-telegram-verification-code", kwargs={"pk": other_user.public_primary_key})
@ -886,7 +905,7 @@ def test_admin_can_update_user(
make_user_auth_headers,
):
organization, first_user, token = make_organization_and_user_with_plugin_token()
second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.ADMIN)
second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.ADMIN)
client = APIClient()
data = {
@ -903,7 +922,7 @@ def test_admin_can_update_user(
@pytest.mark.django_db
def test_admin_can_update_himself(make_organization_and_user_with_plugin_token, make_user_auth_headers):
_, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.ADMIN)
_, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.ADMIN)
client = APIClient()
data = {
@ -921,7 +940,7 @@ def test_admin_can_update_himself(make_organization_and_user_with_plugin_token,
@pytest.mark.django_db
def test_admin_can_list_users(make_organization_and_user_with_plugin_token, make_user_auth_headers):
_, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.ADMIN)
_, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.ADMIN)
client = APIClient()
@ -938,7 +957,7 @@ def test_admin_can_detail_users(
make_user_auth_headers,
):
organization, first_user, token = make_organization_and_user_with_plugin_token()
second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.ADMIN)
second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.ADMIN)
client = APIClient()
@ -955,7 +974,7 @@ def test_admin_can_get_own_verification_code(
make_organization_and_user_with_plugin_token,
make_user_auth_headers,
):
_, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.ADMIN)
_, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.ADMIN)
client = APIClient()
url = reverse("api-internal:user-get-verification-code", kwargs={"pk": user.public_primary_key})
@ -973,7 +992,7 @@ def test_admin_can_get_another_user_verification_code(
make_user_auth_headers,
):
organization, first_user, token = make_organization_and_user_with_plugin_token()
second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.ADMIN)
second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.ADMIN)
client = APIClient()
url = reverse("api-internal:user-get-verification-code", kwargs={"pk": first_user.public_primary_key})
@ -988,7 +1007,7 @@ def test_admin_can_verify_own_phone(
make_organization_and_user_with_plugin_token,
make_user_auth_headers,
):
_, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.ADMIN)
_, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.ADMIN)
client = APIClient()
url = reverse("api-internal:user-verify-number", kwargs={"pk": user.public_primary_key})
@ -1005,7 +1024,7 @@ def test_admin_can_verify_another_user_phone(
make_user_auth_headers,
):
organization, first_user, token = make_organization_and_user_with_plugin_token()
second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.ADMIN)
second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.ADMIN)
client = APIClient()
url = reverse("api-internal:user-verify-number", kwargs={"pk": first_user.public_primary_key})
@ -1018,7 +1037,7 @@ def test_admin_can_verify_another_user_phone(
def test_admin_can_get_own_telegram_verification_code(
make_organization_and_user_with_plugin_token, make_user_auth_headers
):
_, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.ADMIN)
_, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.ADMIN)
client = APIClient()
url = reverse("api-internal:user-get-telegram-verification-code", kwargs={"pk": user.public_primary_key})
@ -1034,7 +1053,7 @@ def test_admin_can_get_another_user_telegram_verification_code(
make_user_auth_headers,
):
organization, first_user, token = make_organization_and_user_with_plugin_token()
second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.ADMIN)
second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.ADMIN)
client = APIClient()
url = reverse("api-internal:user-get-telegram-verification-code", kwargs={"pk": first_user.public_primary_key})
@ -1050,7 +1069,7 @@ def test_admin_can_get_another_user_backend_verification_code(
make_user_auth_headers,
):
organization, first_user, token = make_organization_and_user_with_plugin_token()
second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.ADMIN)
second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.ADMIN)
client = APIClient()
url = (
@ -1069,7 +1088,7 @@ def test_admin_can_unlink_another_user_backend_account(
make_user_auth_headers,
):
organization, first_user, token = make_organization_and_user_with_plugin_token()
second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.ADMIN)
second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.ADMIN)
client = APIClient()
url = (
@ -1092,7 +1111,7 @@ def test_admin_can_unlink_another_user_slack_account(
_, token = make_token_for_organization(organization)
user, _ = make_user_with_slack_user_identity(
slack_team_identity, organization, slack_id="user_2", role=LegacyAccessControlRole.ADMIN
slack_team_identity, organization, slack_id="user_2", role=permissions.LegacyAccessControlRole.ADMIN
)
other_user = make_user_for_organization(organization)
@ -1114,8 +1133,10 @@ def test_user_cant_update_user(
make_user_for_organization,
make_user_auth_headers,
):
organization, first_user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR)
second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.EDITOR)
organization, first_user, token = make_organization_and_user_with_plugin_token(
role=permissions.LegacyAccessControlRole.EDITOR
)
second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.EDITOR)
client = APIClient()
data = {
@ -1132,7 +1153,7 @@ def test_user_cant_update_user(
@pytest.mark.django_db
def test_user_can_update_themself(make_organization_and_user_with_plugin_token, make_user_auth_headers):
_, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR)
_, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.EDITOR)
client = APIClient()
data = {
@ -1150,7 +1171,7 @@ def test_user_can_update_themself(make_organization_and_user_with_plugin_token,
@pytest.mark.django_db
def test_user_can_list_users(make_organization_and_user_with_plugin_token, make_user_auth_headers):
_, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR)
_, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.EDITOR)
client = APIClient()
@ -1164,8 +1185,10 @@ def test_user_can_list_users(make_organization_and_user_with_plugin_token, make_
def test_user_can_detail_users(
make_organization_and_user_with_plugin_token, make_user_for_organization, make_user_auth_headers
):
organization, first_user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR)
second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.EDITOR)
organization, first_user, token = make_organization_and_user_with_plugin_token(
role=permissions.LegacyAccessControlRole.EDITOR
)
second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.EDITOR)
client = APIClient()
url = reverse("api-internal:user-detail", kwargs={"pk": first_user.public_primary_key})
@ -1185,7 +1208,7 @@ def test_user_can_detail_users(
def test_user_can_get_own_verification_code(
mock_verification_start, make_organization_and_user_with_plugin_token, make_user_auth_headers
):
_, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR)
_, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.EDITOR)
client = APIClient()
url = reverse("api-internal:user-get-verification-code", kwargs={"pk": user.public_primary_key})
@ -1202,8 +1225,10 @@ def test_user_cant_get_another_user_verification_code(
make_user_for_organization,
make_user_auth_headers,
):
organization, first_user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR)
second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.EDITOR)
organization, first_user, token = make_organization_and_user_with_plugin_token(
role=permissions.LegacyAccessControlRole.EDITOR
)
second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.EDITOR)
client = APIClient()
url = reverse("api-internal:user-get-verification-code", kwargs={"pk": first_user.public_primary_key})
@ -1217,7 +1242,7 @@ def test_user_cant_get_another_user_verification_code(
def test_user_can_verify_own_phone(
mocked_verification_check, make_organization_and_user_with_plugin_token, make_user_auth_headers
):
_, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR)
_, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.EDITOR)
client = APIClient()
url = reverse("api-internal:user-verify-number", kwargs={"pk": user.public_primary_key})
@ -1234,8 +1259,10 @@ def test_user_cant_verify_another_user_phone(
make_user_for_organization,
make_user_auth_headers,
):
organization, first_user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR)
second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.EDITOR)
organization, first_user, token = make_organization_and_user_with_plugin_token(
role=permissions.LegacyAccessControlRole.EDITOR
)
second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.EDITOR)
client = APIClient()
url = reverse("api-internal:user-verify-number", kwargs={"pk": first_user.public_primary_key})
@ -1248,7 +1275,7 @@ def test_user_cant_verify_another_user_phone(
def test_user_can_get_own_telegram_verification_code(
make_organization_and_user_with_plugin_token, make_user_auth_headers
):
_, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR)
_, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.EDITOR)
client = APIClient()
url = reverse("api-internal:user-get-telegram-verification-code", kwargs={"pk": user.public_primary_key})
@ -1263,8 +1290,10 @@ def test_user_cant_get_another_user_telegram_verification_code(
make_user_for_organization,
make_user_auth_headers,
):
organization, first_user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR)
second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.EDITOR)
organization, first_user, token = make_organization_and_user_with_plugin_token(
role=permissions.LegacyAccessControlRole.EDITOR
)
second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.EDITOR)
client = APIClient()
url = reverse("api-internal:user-get-telegram-verification-code", kwargs={"pk": first_user.public_primary_key})
@ -1277,7 +1306,7 @@ def test_user_cant_get_another_user_telegram_verification_code(
def test_user_can_get_own_backend_verification_code(
make_organization_and_user_with_plugin_token, make_user_auth_headers
):
_, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR)
_, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.EDITOR)
client = APIClient()
url = (
@ -1302,8 +1331,10 @@ def test_user_cant_get_another_user_backend_verification_code(
make_user_for_organization,
make_user_auth_headers,
):
organization, first_user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR)
second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.EDITOR)
organization, first_user, token = make_organization_and_user_with_plugin_token(
role=permissions.LegacyAccessControlRole.EDITOR
)
second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.EDITOR)
client = APIClient()
url = (
@ -1324,7 +1355,7 @@ def test_user_can_unlink_own_slack_account(
):
organization, slack_team_identity = make_organization_with_slack_team_identity()
user, _ = make_user_with_slack_user_identity(
slack_team_identity, organization, slack_id="user_2", role=LegacyAccessControlRole.EDITOR
slack_team_identity, organization, slack_id="user_2", role=permissions.LegacyAccessControlRole.EDITOR
)
_, token = make_token_for_organization(organization)
@ -1339,7 +1370,7 @@ def test_user_can_unlink_own_slack_account(
@pytest.mark.django_db
def test_user_can_unlink_backend_own_account(make_organization_and_user_with_plugin_token, make_user_auth_headers):
_, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR)
_, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.EDITOR)
client = APIClient()
url = reverse("api-internal:user-unlink-backend", kwargs={"pk": user.public_primary_key}) + "?backend=TESTONLY"
@ -1351,7 +1382,7 @@ def test_user_can_unlink_backend_own_account(make_organization_and_user_with_plu
@pytest.mark.django_db
def test_user_unlink_backend_invalid_backend_id(make_organization_and_user_with_plugin_token, make_user_auth_headers):
_, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR)
_, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.EDITOR)
client = APIClient()
url = reverse("api-internal:user-unlink-backend", kwargs={"pk": user.public_primary_key}) + "?backend=INVALID"
@ -1365,7 +1396,7 @@ def test_user_unlink_backend_invalid_backend_id(make_organization_and_user_with_
def test_user_unlink_backend_backend_account_not_found(
make_organization_and_user_with_plugin_token, make_user_auth_headers
):
_, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR)
_, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.EDITOR)
client = APIClient()
url = reverse("api-internal:user-unlink-backend", kwargs={"pk": user.public_primary_key}) + "?backend=TESTONLY"
@ -1385,10 +1416,10 @@ def test_user_cant_unlink_slack_another_user(
organization, slack_team_identity = make_organization_with_slack_team_identity()
first_user, _ = make_user_with_slack_user_identity(
slack_team_identity, organization, slack_id="user_1", role=LegacyAccessControlRole.EDITOR
slack_team_identity, organization, slack_id="user_1", role=permissions.LegacyAccessControlRole.EDITOR
)
second_user, _ = make_user_with_slack_user_identity(
slack_team_identity, organization, slack_id="user_2", role=LegacyAccessControlRole.EDITOR
slack_team_identity, organization, slack_id="user_2", role=permissions.LegacyAccessControlRole.EDITOR
)
_, token = make_token_for_organization(organization)
@ -1405,8 +1436,10 @@ def test_user_cant_unlink_slack_another_user(
def test_user_cant_unlink_backend_another_user(
make_organization_and_user_with_plugin_token, make_user_for_organization, make_user_auth_headers
):
organization, first_user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR)
second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.EDITOR)
organization, first_user, token = make_organization_and_user_with_plugin_token(
role=permissions.LegacyAccessControlRole.EDITOR
)
second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.EDITOR)
client = APIClient()
url = (
@ -1424,12 +1457,14 @@ def test_user_cant_unlink_backend_another_user(
def test_viewer_cant_update_user(
make_organization_and_user_with_plugin_token, make_user_for_organization, make_user_auth_headers
):
organization, first_user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.VIEWER)
second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.VIEWER)
organization, first_user, token = make_organization_and_user_with_plugin_token(
role=permissions.LegacyAccessControlRole.VIEWER
)
second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.VIEWER)
data = {
"email": "test@amixr.io",
"role": LegacyAccessControlRole.EDITOR,
"role": permissions.LegacyAccessControlRole.EDITOR,
"username": "updated_test_username",
"unverified_phone_number": "+1234567890",
"slack_login": "",
@ -1444,11 +1479,11 @@ def test_viewer_cant_update_user(
@pytest.mark.django_db
def test_viewer_cant_update_himself(make_organization_and_user_with_plugin_token, make_user_auth_headers):
_, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.VIEWER)
_, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.VIEWER)
data = {
"email": "test@amixr.io",
"role": LegacyAccessControlRole.VIEWER,
"role": permissions.LegacyAccessControlRole.VIEWER,
"username": "updated_test_username",
"unverified_phone_number": "+1234567890",
"slack_login": "",
@ -1463,7 +1498,7 @@ def test_viewer_cant_update_himself(make_organization_and_user_with_plugin_token
@pytest.mark.django_db
def test_viewer_can_list_users(make_organization_and_user_with_plugin_token, make_user_auth_headers):
_, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.VIEWER)
_, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.VIEWER)
client = APIClient()
url = reverse("api-internal:user-list")
@ -1477,7 +1512,7 @@ def test_viewer_can_list_users(make_organization_and_user_with_plugin_token, mak
def test_viewer_cant_get_own_verification_code(
mock_verification_start, make_organization_and_user_with_plugin_token, make_user_auth_headers
):
_, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.VIEWER)
_, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.VIEWER)
client = APIClient()
url = reverse("api-internal:user-get-verification-code", kwargs={"pk": user.public_primary_key})
@ -1494,8 +1529,10 @@ def test_viewer_cant_get_another_user_verification_code(
make_user_for_organization,
make_user_auth_headers,
):
organization, first_user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR)
second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.VIEWER)
organization, first_user, token = make_organization_and_user_with_plugin_token(
role=permissions.LegacyAccessControlRole.EDITOR
)
second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.VIEWER)
client = APIClient()
url = reverse("api-internal:user-get-verification-code", kwargs={"pk": first_user.public_primary_key})
@ -1509,7 +1546,7 @@ def test_viewer_cant_get_another_user_verification_code(
def test_viewer_cant_verify_own_phone(
mocked_verification_check, make_organization_and_user_with_plugin_token, make_user_auth_headers
):
_, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.VIEWER)
_, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.VIEWER)
client = APIClient()
url = reverse("api-internal:user-verify-number", kwargs={"pk": user.public_primary_key})
@ -1526,8 +1563,10 @@ def test_viewer_cant_verify_another_user_phone(
make_user_for_organization,
make_user_auth_headers,
):
organization, first_user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR)
second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.VIEWER)
organization, first_user, token = make_organization_and_user_with_plugin_token(
role=permissions.LegacyAccessControlRole.EDITOR
)
second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.VIEWER)
client = APIClient()
url = reverse("api-internal:user-verify-number", kwargs={"pk": first_user.public_primary_key})
@ -1540,7 +1579,7 @@ def test_viewer_cant_verify_another_user_phone(
def test_viewer_cant_get_own_telegram_verification_code(
make_organization_and_user_with_plugin_token, make_user_auth_headers
):
_, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.VIEWER)
_, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.VIEWER)
client = APIClient()
url = reverse("api-internal:user-get-telegram-verification-code", kwargs={"pk": user.public_primary_key})
@ -1553,8 +1592,10 @@ def test_viewer_cant_get_own_telegram_verification_code(
def test_viewer_cant_get_another_user_telegram_verification_code(
make_organization_and_user_with_plugin_token, make_user_for_organization, make_user_auth_headers
):
organization, first_user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR)
second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.VIEWER)
organization, first_user, token = make_organization_and_user_with_plugin_token(
role=permissions.LegacyAccessControlRole.EDITOR
)
second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.VIEWER)
client = APIClient()
url = reverse("api-internal:user-get-telegram-verification-code", kwargs={"pk": first_user.public_primary_key})
@ -1567,12 +1608,12 @@ def test_viewer_cant_get_another_user_telegram_verification_code(
@pytest.mark.parametrize(
"role,expected_status,initial_unverified_number,initial_verified_number",
[
(LegacyAccessControlRole.ADMIN, status.HTTP_200_OK, "+1234567890", None),
(LegacyAccessControlRole.EDITOR, status.HTTP_200_OK, "+1234567890", None),
(LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN, "+1234567890", None),
(LegacyAccessControlRole.ADMIN, status.HTTP_200_OK, None, "+1234567890"),
(LegacyAccessControlRole.EDITOR, status.HTTP_200_OK, None, "+1234567890"),
(LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN, None, "+1234567890"),
(permissions.LegacyAccessControlRole.ADMIN, status.HTTP_200_OK, "+1234567890", None),
(permissions.LegacyAccessControlRole.EDITOR, status.HTTP_200_OK, "+1234567890", None),
(permissions.LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN, "+1234567890", None),
(permissions.LegacyAccessControlRole.ADMIN, status.HTTP_200_OK, None, "+1234567890"),
(permissions.LegacyAccessControlRole.EDITOR, status.HTTP_200_OK, None, "+1234567890"),
(permissions.LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN, None, "+1234567890"),
],
)
def test_forget_own_number(
@ -1613,12 +1654,12 @@ def test_forget_own_number(
@pytest.mark.parametrize(
"role,expected_status,initial_unverified_number,initial_verified_number",
[
(LegacyAccessControlRole.ADMIN, status.HTTP_200_OK, "+1234567890", None),
(LegacyAccessControlRole.EDITOR, status.HTTP_403_FORBIDDEN, "+1234567890", None),
(LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN, "+1234567890", None),
(LegacyAccessControlRole.ADMIN, status.HTTP_200_OK, None, "+1234567890"),
(LegacyAccessControlRole.EDITOR, status.HTTP_403_FORBIDDEN, None, "+1234567890"),
(LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN, None, "+1234567890"),
(permissions.LegacyAccessControlRole.ADMIN, status.HTTP_200_OK, "+1234567890", None),
(permissions.LegacyAccessControlRole.EDITOR, status.HTTP_403_FORBIDDEN, "+1234567890", None),
(permissions.LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN, "+1234567890", None),
(permissions.LegacyAccessControlRole.ADMIN, status.HTTP_200_OK, None, "+1234567890"),
(permissions.LegacyAccessControlRole.EDITOR, status.HTTP_403_FORBIDDEN, None, "+1234567890"),
(permissions.LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN, None, "+1234567890"),
],
)
def test_forget_other_number(
@ -1662,8 +1703,10 @@ def test_forget_other_number(
def test_viewer_cant_get_another_user_backend_verification_code(
make_organization_and_user_with_plugin_token, make_user_for_organization, make_user_auth_headers
):
organization, first_user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR)
second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.VIEWER)
organization, first_user, token = make_organization_and_user_with_plugin_token(
role=permissions.LegacyAccessControlRole.EDITOR
)
second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.VIEWER)
client = APIClient()
url = (
@ -1679,8 +1722,10 @@ def test_viewer_cant_get_another_user_backend_verification_code(
def test_viewer_cant_unlink_backend_another_user(
make_organization_and_user_with_plugin_token, make_user_for_organization, make_user_auth_headers
):
organization, first_user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR)
second_user = make_user_for_organization(organization, role=LegacyAccessControlRole.VIEWER)
organization, first_user, token = make_organization_and_user_with_plugin_token(
role=permissions.LegacyAccessControlRole.EDITOR
)
second_user = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.VIEWER)
client = APIClient()
url = (
@ -1693,7 +1738,7 @@ def test_viewer_cant_unlink_backend_another_user(
@pytest.mark.django_db
def test_change_timezone(make_organization_and_user_with_plugin_token, make_user_auth_headers):
_, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR)
_, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.EDITOR)
client = APIClient()
url = reverse("api-internal:user-detail", kwargs={"pk": user.public_primary_key})
@ -1709,7 +1754,7 @@ def test_change_timezone(make_organization_and_user_with_plugin_token, make_user
@pytest.mark.django_db
@pytest.mark.parametrize("timezone", ["", 1, "NotATimezone"])
def test_invalid_timezone(make_organization_and_user_with_plugin_token, make_user_auth_headers, timezone):
_, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR)
_, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.EDITOR)
client = APIClient()
url = reverse("api-internal:user-detail", kwargs={"pk": user.public_primary_key})
@ -1722,7 +1767,7 @@ def test_invalid_timezone(make_organization_and_user_with_plugin_token, make_use
@pytest.mark.django_db
def test_change_working_hours(make_organization_and_user_with_plugin_token, make_user_auth_headers):
_, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR)
_, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.EDITOR)
client = APIClient()
url = reverse("api-internal:user-detail", kwargs={"pk": user.public_primary_key})
@ -1758,7 +1803,7 @@ def test_change_working_hours(make_organization_and_user_with_plugin_token, make
def test_invalid_working_hours(
make_organization_and_user_with_plugin_token, make_user_auth_headers, working_hours_extra
):
_, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR)
_, user, token = make_organization_and_user_with_plugin_token(role=permissions.LegacyAccessControlRole.EDITOR)
client = APIClient()
url = reverse("api-internal:user-detail", kwargs={"pk": user.public_primary_key})

View file

@ -21,10 +21,10 @@ from rest_framework.views import APIView
from apps.api.permissions import (
ALL_PERMISSION_CHOICES,
ALL_PERMISSION_NAME_TO_CLASS_MAP,
IsOwnerOrHasRBACPermissions,
LegacyAccessControlRole,
RBACPermission,
get_permission_from_permission_string,
user_is_authorized,
)
from apps.api.serializers.team import TeamSerializer
@ -185,14 +185,12 @@ class UserFilter(ByTeamModelFieldFilterMixin, filters.FilterSet):
fields = ["email", "roles", "permission"]
def filter_by_permission(self, queryset, name, value):
rbac_permission = get_permission_from_permission_string(value)
rbac_permission = ALL_PERMISSION_NAME_TO_CLASS_MAP.get(value, None)
if not rbac_permission:
# TODO: maybe raise a 400 here?
return queryset
return queryset.filter(
**User.build_permissions_query(rbac_permission, self.request.user.organization),
)
return queryset.filter_by_permission(rbac_permission, self.request.user.organization)
class UserView(

View file

@ -23,7 +23,7 @@ def test_alert_group_details(
slack_title_template=None,
web_title_template="title: {{ payload.field2 }}",
web_message_template="Something {{ payload.field1 }} + {{ payload.field3 }}",
web_image_url_template="http://{{ payload.field1 }}",
web_image_url_template="http://{{ payload.field1 }}.com",
)
alert_group = make_alert_group(alert_receive_channel)
alert_payload = {"field1": "foo", "field2": "bar", "field3": "baz"}
@ -48,7 +48,7 @@ def test_alert_group_details(
"render_for_web": {
"title": "title: bar",
"message": "<p>Something foo + baz</p>",
"image_url": "http://foo",
"image_url": "http://foo.com",
"source_link": None,
},
}
@ -85,7 +85,7 @@ def test_alert_group_details(
"render_for_web": {
"title": "title: bar",
"message": "<p>Something foo + baz</p>",
"image_url": "http://foo",
"image_url": "http://foo.com",
"source_link": None,
},
}

View file

@ -9,7 +9,7 @@ from rest_framework import exceptions
from rest_framework.authentication import BaseAuthentication, get_authorization_header
from rest_framework.request import Request
from apps.api.permissions import GrafanaAPIPermission, LegacyAccessControlRole, RBACPermission, user_is_authorized
from apps.api.permissions import GrafanaAPIPermissions, LegacyAccessControlRole, RBACPermission, user_is_authorized
from apps.grafana_plugin.helpers.gcom import check_token
from apps.grafana_plugin.sync_data import SyncPermission, SyncUser
from apps.user_management.exceptions import OrganizationDeletedException, OrganizationMovedException
@ -385,7 +385,7 @@ class GrafanaServiceAccountAuthentication(BaseAuthentication):
name="Grafana Service Account",
username="grafana_service_account",
role=role,
permissions=[GrafanaAPIPermission(action=key) for key, _ in permissions.items()],
permissions=GrafanaAPIPermissions.construct_permissions(permissions.keys()),
)
auth_token = ApiAuthToken(organization=organization, user=user, name="Grafana Service Account")

View file

@ -8,7 +8,7 @@ import requests
from django.conf import settings
from rest_framework import status
from apps.api.permissions import GrafanaAPIPermission
from apps.api.permissions import GrafanaAPIPermission, GrafanaAPIPermissions
from common.constants.plugin_ids import PluginID
logger = logging.getLogger(__name__)
@ -238,7 +238,7 @@ class GrafanaAPIClient(APIClient):
all_users_permissions: UserPermissionsDict = {}
for user_id, user_permissions in data.items():
all_users_permissions[user_id] = [GrafanaAPIPermission(action=key) for key, _ in user_permissions.items()]
all_users_permissions[user_id] = GrafanaAPIPermissions.construct_permissions(user_permissions.keys())
return all_users_permissions

View file

@ -37,10 +37,7 @@ class CloudUsersView(CloudUsersPagination, APIView):
def get(self, request):
organization = request.user.organization
queryset = User.objects.filter(
organization=organization,
**User.build_permissions_query(RBACPermission.Permissions.NOTIFICATIONS_READ, organization),
)
queryset = User.objects.filter_by_permission(RBACPermission.Permissions.NOTIFICATIONS_READ, organization)
if request.user.current_team is not None:
queryset = queryset.filter(teams=request.user.current_team).distinct()

View file

@ -72,9 +72,12 @@ def users_in_ical(
organization: "Organization",
) -> typing.List["User"]:
"""
This method returns a sequence of `User` objects, filtered by users whose username, or case-insensitive e-mail,
This method returns a list of `User` objects, filtered by users whose username, or case-insensitive e-mail,
is present in `usernames_from_ical`.
Additionally, it filters the users by the organization they belong to and checks if they have the required
permission to receive notifications.
Parameters
----------
usernames_from_ical : typing.List[str]
@ -86,18 +89,17 @@ def users_in_ical(
emails_from_ical = [username.lower() for username in usernames_from_ical]
users_found_in_ical = organization.users.filter(
(Q(username__in=usernames_from_ical) | Q(email__lower__in=emails_from_ical))
).distinct()
# NOTE: doing a select_related for organization here, since we will be accessing u.organization for each user
# in the required_permission.user_has_permission calls below
users_found_in_ical = (
organization.users.filter((Q(username__in=usernames_from_ical) | Q(email__lower__in=emails_from_ical)))
.distinct()
.select_related("organization")
)
if organization.is_rbac_permissions_enabled:
# it is more efficient to check permissions on the subset of users filtered above
# than performing a regex query for the required permission
users_found_in_ical = [u for u in users_found_in_ical if {"action": required_permission.value} in u.permissions]
else:
users_found_in_ical = users_found_in_ical.filter(role__lte=required_permission.fallback_role.value)
return list(users_found_in_ical)
# it is more efficient to check permissions on the subset of users filtered above
# than performing a regex query for the required permission
return [u for u in users_found_in_ical if required_permission.user_has_permission(u)]
@timed_lru_cache(timeout=100)

View file

@ -9,7 +9,7 @@ import pytz
from django.core.cache import cache
from django.utils import timezone
from apps.api.permissions import LegacyAccessControlRole, RBACPermission
from apps.api.permissions import GrafanaAPIPermissions, LegacyAccessControlRole, RBACPermission
from apps.schedules.ical_utils import (
get_cached_oncall_users_for_multiple_schedules,
get_icalendar_tz_or_utc,
@ -138,7 +138,7 @@ def test_users_in_ical_rbac(make_organization_and_user, make_user_for_organizati
# viewer doesn't yet have the required permission, they shouldn't be included
assert len(users_in_ical(usernames, organization)) == 0
viewer.permissions = [{"action": permission.value}] if permission else []
viewer.permissions = GrafanaAPIPermissions.construct_permissions([permission.value]) if permission else []
viewer.save()
assert users_in_ical(usernames, organization) == ([viewer] if included else [])

View file

@ -14,18 +14,16 @@ from apps.slack.errors import (
SlackAPIInvalidAuthError,
SlackAPITokenError,
)
from apps.user_management.models.user import User
from apps.user_management.models import Organization, User
if typing.TYPE_CHECKING:
from django.db.models.manager import RelatedManager
from apps.user_management.models import Organization
logger = logging.getLogger(__name__)
class SlackTeamIdentity(models.Model):
organizations: "RelatedManager['Organization']"
organizations: "RelatedManager[Organization]"
id = models.AutoField(primary_key=True)
slack_id = models.CharField(max_length=100)
@ -141,13 +139,12 @@ class SlackTeamIdentity(models.Model):
def needs_reinstall(self):
return settings.UNIFIED_SLACK_APP_ENABLED and not self._unified_slack_app_installed
def get_users_from_slack_conversation_for_organization(self, channel_id, organization):
def get_users_from_slack_conversation_for_organization(self, channel_id: str, organization: Organization):
sc = SlackClient(self)
members = self.get_conversation_members(sc, channel_id)
return organization.users.filter(
slack_user_identity__slack_id__in=members,
**User.build_permissions_query(RBACPermission.Permissions.CHATOPS_WRITE, organization),
return User.objects.filter_by_permission(
RBACPermission.Permissions.CHATOPS_WRITE,
organization,
slack_user_identity__slack_id__in=self.get_conversation_members(sc, channel_id),
)
def get_conversation_members(self, slack_client: SlackClient, channel_id: str):

View file

@ -19,7 +19,7 @@ from apps.slack.errors import (
SlackAPIUsergroupPaidTeamOnlyError,
)
from apps.slack.models import SlackTeamIdentity, SlackUserIdentity
from apps.user_management.models import User
from apps.user_management.models import Organization, User
from common.public_primary_keys import generate_public_primary_key, increase_public_primary_key_length
if typing.TYPE_CHECKING:
@ -140,10 +140,11 @@ class SlackUserGroup(models.Model):
self.save(update_fields=("members",))
logger.info(f"Saved cached memberlist for slack user group {self.slack_id}, members {slack_ids}")
def get_users_from_members_for_organization(self, organization):
return organization.users.filter(
def get_users_from_members_for_organization(self, organization: Organization):
return User.objects.filter_by_permission(
RBACPermission.Permissions.CHATOPS_WRITE,
organization,
slack_user_identity__slack_id__in=self.members,
**User.build_permissions_query(RBACPermission.Permissions.CHATOPS_WRITE, organization),
)
@classmethod

View file

@ -15,9 +15,11 @@ from django.dispatch import receiver
from emoji import demojize
from apps.api.permissions import (
GrafanaAPIPermissions,
LegacyAccessControlCompatiblePermission,
LegacyAccessControlRole,
RBACPermission,
convert_oncall_permission_to_irm,
user_is_authorized,
)
from apps.google import utils as google_utils
@ -39,18 +41,6 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
class PermissionsQuery(typing.TypedDict):
permissions__contains: typing.Dict
class PermissionsRegexQuery(typing.TypedDict):
permissions__regex: str
class RoleInQuery(typing.TypedDict):
role__in: typing.List[int]
def generate_public_primary_key_for_user():
prefix = "U"
new_public_primary_key = generate_public_primary_key(prefix)
@ -86,6 +76,44 @@ class UserQuerySet(models.QuerySet):
def filter_with_deleted(self, *args, **kwargs):
return super().filter(*args, **kwargs)
def filter_by_permission(
self, permission: LegacyAccessControlCompatiblePermission, organization: "Organization", *args, **kwargs
):
"""
This method builds a filter query that is compatible with RBAC as well as legacy "basic" role based
authorization. If a permission is provided we simply do a regex search where the permission column
contains the permission value (need to use regex because the JSON contains method is not supported by sqlite).
Additionally, if `organization.is_grafana_irm_enabled` is True, we convert the permission to the IRM version
when filtering.
Lastly, if RBAC is not supported for the org, we make the assumption that we are looking for any users with AT
LEAST the fallback role. Ex: if the fallback role were editor than we would get editors and admins.
"""
if organization.is_rbac_permissions_enabled:
permission_value = (
convert_oncall_permission_to_irm(permission)
if organization.is_grafana_irm_enabled
else permission.value
)
# https://stackoverflow.com/a/50251879
if settings.DATABASE_TYPE == settings.DATABASE_TYPES.SQLITE3:
# contains is not supported on sqlite
# https://docs.djangoproject.com/en/4.2/topics/db/queries/#contains
query = Q(permissions__regex=re.escape(permission_value))
else:
query = Q(permissions__contains=GrafanaAPIPermissions.construct_permissions([permission_value]))
else:
query = Q(role__lte=permission.fallback_role.value)
return self.filter(
query,
*args,
**kwargs,
organization=organization,
)
def delete(self):
# is_active = None is used to be able to have multiple deleted users with the same user_id
return super().update(is_active=None)
@ -341,28 +369,6 @@ class User(models.Model):
def insight_logs_metadata(self):
return {}
@staticmethod
def build_permissions_query(
permission: LegacyAccessControlCompatiblePermission, organization
) -> typing.Union[PermissionsQuery, PermissionsRegexQuery, RoleInQuery]:
"""
This method returns a django query filter that is compatible with RBAC
as well as legacy "basic" role based authorization. If a permission is provided we simply do
a regex search where the permission column contains the permission value (need to use regex because
the JSON contains method is not supported by sqlite)
If RBAC is not supported for the org, we make the assumption that we are looking for any users with AT LEAST
the fallback role. Ex: if the fallback role were editor than we would get editors and admins.
"""
if organization.is_rbac_permissions_enabled:
# https://stackoverflow.com/a/50251879
if settings.DATABASE_TYPE == settings.DATABASE_TYPES.SQLITE3:
# https://docs.djangoproject.com/en/4.2/topics/db/queries/#contains
return PermissionsRegexQuery(permissions__regex=re.escape(permission.value))
required_permission = {"action": permission.value}
return PermissionsQuery(permissions__contains=[required_permission])
return RoleInQuery(role__lte=permission.fallback_role.value)
def get_default_fallback_notification_policy(self) -> "UserNotificationPolicy":
from apps.base.models import UserNotificationPolicy

View file

@ -3,7 +3,7 @@ import datetime
import pytest
from django.utils import timezone
from apps.api.permissions import GrafanaAPIPermission, LegacyAccessControlRole, RBACPermission
from apps.api import permissions
from apps.google import constants as google_constants
from apps.google.models import GoogleOAuth2User
from apps.user_management.models import User
@ -15,7 +15,7 @@ def test_self_or_has_user_settings_admin_permission(make_organization, make_user
organization = make_organization(is_rbac_permissions_enabled=False)
admin = make_user_for_organization(organization)
second_admin = make_user_for_organization(organization)
editor = make_user_for_organization(organization, role=LegacyAccessControlRole.EDITOR)
editor = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.EDITOR)
another_organization = make_organization(is_rbac_permissions_enabled=False)
admin_from_another_organization = make_user_for_organization(another_organization)
@ -36,12 +36,14 @@ def test_self_or_has_user_settings_admin_permission(make_organization, make_user
organization_with_rbac = make_organization(is_rbac_permissions_enabled=True)
user_with_perms = make_user_for_organization(
organization_with_rbac,
role=LegacyAccessControlRole.NONE,
permissions=[GrafanaAPIPermission(action=RBACPermission.Permissions.USER_SETTINGS_ADMIN.value)],
role=permissions.LegacyAccessControlRole.NONE,
permissions=permissions.GrafanaAPIPermissions.construct_permissions(
[permissions.RBACPermission.Permissions.USER_SETTINGS_ADMIN.value]
),
)
user_without_perms = make_user_for_organization(
organization_with_rbac,
role=LegacyAccessControlRole.NONE,
role=permissions.LegacyAccessControlRole.NONE,
permissions=[],
)
@ -69,8 +71,8 @@ def test_self_or_has_user_settings_admin_permission(make_organization, make_user
def test_is_admin(make_organization, make_user_for_organization):
# RBAC not enabled for org
organization = make_organization(is_rbac_permissions_enabled=False)
admin = make_user_for_organization(organization, role=LegacyAccessControlRole.ADMIN)
editor = make_user_for_organization(organization, role=LegacyAccessControlRole.EDITOR)
admin = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.ADMIN)
editor = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.EDITOR)
assert organization.is_rbac_permissions_enabled is False
@ -81,12 +83,14 @@ def test_is_admin(make_organization, make_user_for_organization):
organization_with_rbac = make_organization(is_rbac_permissions_enabled=True)
user_with_perms = make_user_for_organization(
organization_with_rbac,
role=LegacyAccessControlRole.NONE,
permissions=[GrafanaAPIPermission(action=RBACPermission.Permissions.ADMIN.value)],
role=permissions.LegacyAccessControlRole.NONE,
permissions=permissions.GrafanaAPIPermissions.construct_permissions(
[permissions.RBACPermission.Permissions.ADMIN.value]
),
)
user_without_perms = make_user_for_organization(
organization_with_rbac,
role=LegacyAccessControlRole.NONE,
role=permissions.LegacyAccessControlRole.NONE,
permissions=[],
)
@ -193,7 +197,7 @@ def test_has_google_oauth2_connected(make_organization_and_user, make_google_oau
@pytest.mark.django_db
def test_google_oauth2_token_is_missing_scopes(make_organization_and_user, make_google_oauth2_user_for_user):
def test_google_oauth2_token_is_missing_scopes(make_organization_and_user):
initial_granted_scope = "foo bar baz"
initial_oauth_response = {
"access_token": "access",
@ -288,3 +292,53 @@ def test_reset_google_oauth2_settings(make_organization_and_user):
assert GoogleOAuth2User.objects.filter(user=user).exists() is False
assert user.google_calendar_settings is None
@pytest.mark.django_db
def test_filter_by_permission(make_organization, make_user_for_organization):
"""
Note that there are some conditions in `UserQuerySet.filter_by_permission` that're
specific to which database engine is being used. These cases are tested on CI where
we run the test against sqlite, mysql, and postgresql
"""
permission_to_test = permissions.RBACPermission.Permissions.ALERT_GROUPS_READ
user_permissions = permissions.GrafanaAPIPermissions.construct_permissions([permission_to_test.value])
irm_permissions = permissions.GrafanaAPIPermissions.construct_permissions(
[permissions.convert_oncall_permission_to_irm(permission_to_test)]
)
org1_rbac = make_organization(is_rbac_permissions_enabled=True)
user1 = make_user_for_organization(org1_rbac, permissions=user_permissions)
user2 = make_user_for_organization(org1_rbac, permissions=user_permissions)
_ = make_user_for_organization(org1_rbac, permissions=[])
org2_rbac_irm = make_organization(is_rbac_permissions_enabled=True, is_grafana_irm_enabled=True)
user4 = make_user_for_organization(org2_rbac_irm, permissions=irm_permissions)
user5 = make_user_for_organization(org2_rbac_irm, permissions=irm_permissions)
_ = make_user_for_organization(org2_rbac_irm, permissions=[])
org3_no_rbac = make_organization(is_rbac_permissions_enabled=False)
user7 = make_user_for_organization(org3_no_rbac, role=permission_to_test.fallback_role)
user8 = make_user_for_organization(org3_no_rbac, role=permission_to_test.fallback_role)
_ = make_user_for_organization(org3_no_rbac, role=permissions.LegacyAccessControlRole.NONE)
# rbac permissions enabled
users = User.objects.filter_by_permission(permission_to_test, org1_rbac)
assert len(users) == 2
assert user1 in users
assert user2 in users
# rbac permissions + IRM enabled
users = User.objects.filter_by_permission(permission_to_test, org2_rbac_irm)
assert len(users) == 2
assert user4 in users
assert user5 in users
# rbac permissions disabled
users = User.objects.filter_by_permission(permission_to_test, org3_no_rbac)
assert len(users) == 2
assert user7 in users
assert user8 in users

View file

@ -14,6 +14,8 @@ from bs4 import BeautifulSoup
from celery.utils.log import get_task_logger
from celery.utils.time import get_exponential_backoff_interval
from django.core.cache import cache
from django.core.exceptions import ValidationError
from django.core.validators import URLValidator
from django.utils.html import urlize
logger = get_task_logger(__name__)
@ -198,6 +200,15 @@ def clean_html(text):
return text
def validate_url(url: str):
validate_url = URLValidator()
try:
validate_url(url)
except ValidationError:
return None
return url
def convert_slack_md_to_html(text):
text = re.sub(r"\*", "**", text)
return convert_md_to_html(text)