Add permission checks for Slack paging and shift swaps actions (#3861)

Fixes https://github.com/grafana/oncall/issues/3109
This commit is contained in:
Matias Bordese 2024-02-09 09:30:05 -03:00 committed by GitHub
parent 040c14453a
commit 160d501bbe
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 89 additions and 19 deletions

View file

@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Improved zvonok verification call @sreway ([#3768](https://github.com/grafana/oncall/pull/3768))
- Add permission checks for Slack paging and shift swaps actions ([#3861](https://github.com/grafana/oncall/pull/3861))
### Changed

View file

@ -9,6 +9,7 @@ from rest_framework.response import Response
from apps.alerts.models import AlertReceiveChannel
from apps.alerts.paging import DirectPagingUserTeamValidationError, UserNotifications, direct_paging, user_is_oncall
from apps.api.permissions import RBACPermission
from apps.schedules.ical_utils import get_cached_oncall_users_for_multiple_schedules
from apps.slack.constants import DIVIDER, PRIVATE_METADATA_MAX_LENGTH
from apps.slack.errors import SlackAPIChannelNotFoundError
@ -113,6 +114,7 @@ def get_current_items(
class StartDirectPaging(scenario_step.ScenarioStep):
"""Handle slash command invocation and show initial dialog."""
REQUIRED_PERMISSIONS = [RBACPermission.Permissions.ALERT_GROUPS_DIRECT_PAGING]
command_name = [settings.SLACK_DIRECT_PAGING_SLASH_COMMAND]
def process_scenario(
@ -121,6 +123,10 @@ class StartDirectPaging(scenario_step.ScenarioStep):
slack_team_identity: "SlackTeamIdentity",
payload: EventPayload,
) -> None:
if not self.is_authorized():
self.open_unauthorized_warning(payload)
return
input_id_prefix = _generate_input_id_prefix()
try:
@ -145,12 +151,18 @@ class StartDirectPaging(scenario_step.ScenarioStep):
class FinishDirectPaging(scenario_step.ScenarioStep):
"""Handle page command dialog submit."""
REQUIRED_PERMISSIONS = [RBACPermission.Permissions.ALERT_GROUPS_DIRECT_PAGING]
def process_scenario(
self,
slack_user_identity: "SlackUserIdentity",
slack_team_identity: "SlackTeamIdentity",
payload: EventPayload,
) -> None:
if not self.is_authorized():
self.open_unauthorized_warning(payload)
return
message = _get_message_from_payload(payload)
private_metadata = json.loads(payload["view"]["private_metadata"])
channel_id = private_metadata["channel_id"]

View file

@ -2,18 +2,21 @@ import importlib
import logging
import typing
from apps.api.permissions import LegacyAccessControlCompatiblePermissions, user_is_authorized
from apps.slack.alert_group_slack_service import AlertGroupSlackService
from apps.slack.client import SlackClient
from apps.slack.types import EventPayload
if typing.TYPE_CHECKING:
from apps.slack.models import SlackTeamIdentity, SlackUserIdentity
from apps.slack.types import EventPayload
from apps.user_management.models import Organization, User
logger = logging.getLogger(__name__)
class ScenarioStep(object):
REQUIRED_PERMISSIONS: LegacyAccessControlCompatiblePermissions = []
def __init__(
self,
slack_team_identity: "SlackTeamIdentity",
@ -27,6 +30,19 @@ class ScenarioStep(object):
self.alert_group_slack_service = AlertGroupSlackService(slack_team_identity, self._slack_client)
def is_authorized(self) -> bool:
"""
Check that user has required permissions to perform an action.
"""
return self.user is not None and user_is_authorized(self.user, self.REQUIRED_PERMISSIONS)
def open_unauthorized_warning(self, payload: EventPayload) -> None:
self.open_warning_window(
payload,
warning_text="You do not have permission to perform this action. Ask an admin to upgrade your permissions.",
title="Permission denied",
)
def process_scenario(
self,
slack_user_identity: "SlackUserIdentity",

View file

@ -5,6 +5,7 @@ import typing
import humanize
from django.utils import timezone
from apps.api.permissions import RBACPermission
from apps.slack.models import SlackMessage
from apps.slack.scenarios import scenario_step
from apps.slack.types import Block, BlockActionType, EventPayload, PayloadType, ScenarioRoute
@ -21,6 +22,8 @@ SHIFT_SWAP_PK_ACTION_KEY = "shift_swap_request_pk"
class BaseShiftSwapRequestStep(scenario_step.ScenarioStep):
REQUIRED_PERMISSIONS = [RBACPermission.Permissions.SCHEDULES_WRITE]
def _generate_blocks(self, shift_swap_request: "ShiftSwapRequest") -> Block.AnyBlocks:
pk = shift_swap_request.pk
@ -198,6 +201,10 @@ class AcceptShiftSwapRequestStep(BaseShiftSwapRequestStep):
from apps.schedules import exceptions
from apps.schedules.models import ShiftSwapRequest
if not self.is_authorized():
self.open_unauthorized_warning(payload)
return
shift_swap_request_pk = json.loads(payload["actions"][0]["value"])[SHIFT_SWAP_PK_ACTION_KEY]
try:

View file

@ -2,10 +2,9 @@ import json
import logging
from apps.alerts.models import AlertGroup
from apps.api.permissions import LegacyAccessControlCompatiblePermissions, user_is_authorized
from apps.api.permissions import user_is_authorized
from apps.slack.models import SlackMessage, SlackTeamIdentity
from apps.slack.types import EventPayload
from apps.user_management.models import User
logger = logging.getLogger(__name__)
@ -15,10 +14,6 @@ class AlertGroupActionsMixin:
Mixin for alert group actions (ack, resolve, etc.). Intended to be used as a mixin along with ScenarioStep.
"""
user: User | None
REQUIRED_PERMISSIONS: LegacyAccessControlCompatiblePermissions = []
def get_alert_group(self, slack_team_identity: SlackTeamIdentity, payload: EventPayload) -> AlertGroup:
"""
Get AlertGroup instance on Slack message button click or select menu change.
@ -39,7 +34,7 @@ class AlertGroupActionsMixin:
def is_authorized(self, alert_group: AlertGroup) -> bool:
"""
Check that user has required permissions to perform an action.
Customize ScenarioStep.is_authorized method to check for alert group permissions.
"""
return (
@ -48,13 +43,6 @@ class AlertGroupActionsMixin:
and user_is_authorized(self.user, self.REQUIRED_PERMISSIONS)
)
def open_unauthorized_warning(self, payload: EventPayload) -> None:
self.open_warning_window(
payload,
warning_text="You do not have permission to perform this action. Ask an admin to upgrade your permissions.",
title="Permission denied",
)
def _repair_alert_group(
self, slack_team_identity: SlackTeamIdentity, alert_group: AlertGroup, payload: EventPayload
) -> None:

View file

@ -5,6 +5,7 @@ import pytest
from django.utils import timezone
from apps.alerts.models import AlertReceiveChannel
from apps.api.permissions import LegacyAccessControlRole
from apps.schedules.models import CustomOnCallShift, OnCallScheduleWeb
from apps.slack.scenarios.paging import (
DIRECT_PAGING_MESSAGE_INPUT_ID,
@ -64,10 +65,10 @@ def make_slack_payload(organization, team=None, user=None, current_users=None, a
def test_initial_state(
make_organization_and_user_with_slack_identities,
):
_, _, slack_team_identity, slack_user_identity = make_organization_and_user_with_slack_identities()
_, user, slack_team_identity, slack_user_identity = make_organization_and_user_with_slack_identities()
payload = {"channel_id": "123", "trigger_id": "111"}
step = StartDirectPaging(slack_team_identity)
step = StartDirectPaging(slack_team_identity, user=user)
with patch.object(step._slack_client, "views_open") as mock_slack_api_call:
step.process_scenario(slack_user_identity, slack_team_identity, payload)
@ -75,6 +76,19 @@ def test_initial_state(
assert metadata[DataKey.USERS] == {}
@pytest.mark.parametrize("role", (LegacyAccessControlRole.VIEWER, LegacyAccessControlRole.NONE))
@pytest.mark.django_db
def test_initial_unauthorized(make_organization_and_user_with_slack_identities, role):
_, user, slack_team_identity, slack_user_identity = make_organization_and_user_with_slack_identities(role=role)
payload = {"channel_id": "123", "trigger_id": "111"}
step = StartDirectPaging(slack_team_identity, user=user)
with patch.object(step, "open_unauthorized_warning") as mock_open_unauthorized_warning:
step.process_scenario(slack_user_identity, slack_team_identity, payload)
mock_open_unauthorized_warning.assert_called_once()
@pytest.mark.django_db
def test_add_user_no_warning(make_organization_and_user_with_slack_identities, make_schedule, make_on_call_shift):
organization, user, slack_team_identity, slack_user_identity = make_organization_and_user_with_slack_identities()
@ -214,10 +228,10 @@ def test_remove_user(make_organization_and_user_with_slack_identities):
@pytest.mark.django_db
def test_trigger_paging_no_team_or_user_selected(make_organization_and_user_with_slack_identities):
organization, _, slack_team_identity, slack_user_identity = make_organization_and_user_with_slack_identities()
organization, user, slack_team_identity, slack_user_identity = make_organization_and_user_with_slack_identities()
payload = make_slack_payload(organization=organization)
step = FinishDirectPaging(slack_team_identity)
step = FinishDirectPaging(slack_team_identity, user=user)
with patch.object(step._slack_client, "api_call"):
response = step.process_scenario(slack_user_identity, slack_team_identity, payload)
@ -231,6 +245,21 @@ def test_trigger_paging_no_team_or_user_selected(make_organization_and_user_with
)
@pytest.mark.parametrize("role", (LegacyAccessControlRole.VIEWER, LegacyAccessControlRole.NONE))
@pytest.mark.django_db
def test_trigger_paging_unauthorized(make_organization_and_user_with_slack_identities, role):
organization, user, slack_team_identity, slack_user_identity = make_organization_and_user_with_slack_identities(
role=role
)
payload = make_slack_payload(organization=organization)
step = FinishDirectPaging(slack_team_identity, user=user)
with patch.object(step, "open_unauthorized_warning") as mock_open_unauthorized_warning:
step.process_scenario(slack_user_identity, slack_team_identity, payload)
mock_open_unauthorized_warning.assert_called_once()
@pytest.mark.django_db
def test_trigger_paging_additional_responders(make_organization_and_user_with_slack_identities, make_team):
organization, user, slack_team_identity, slack_user_identity = make_organization_and_user_with_slack_identities()

View file

@ -5,6 +5,7 @@ from unittest.mock import patch
import pytest
import pytz
from apps.api.permissions import LegacyAccessControlRole
from apps.schedules import exceptions
from apps.slack.scenarios import shift_swap_requests as scenarios
@ -252,6 +253,22 @@ class TestAcceptShiftSwapRequestStep:
mock_update_message.assert_called_once_with(ssr)
@pytest.mark.parametrize("role", (LegacyAccessControlRole.VIEWER, LegacyAccessControlRole.NONE))
@pytest.mark.django_db
def test_process_scenario_unauthorized(self, setup, payload, make_user_for_organization, role) -> None:
ssr, _, benefactor, slack_user_identity = setup()
event_payload = payload(ssr.pk)
organization = ssr.organization
slack_team_identity = organization.slack_team_identity
benefactor = make_user_for_organization(organization, role=role)
step = scenarios.AcceptShiftSwapRequestStep(slack_team_identity, organization, benefactor)
with patch.object(step, "open_unauthorized_warning") as mock_open_unauthorized_warning:
step.process_scenario(slack_user_identity, slack_team_identity, event_payload)
mock_open_unauthorized_warning.assert_called_once()
@patch("apps.schedules.models.shift_swap_request.ShiftSwapRequest.take")
@pytest.mark.django_db
def test_process_scenario_ssr_does_not_exist(self, mock_take, setup, payload) -> None: