# What this PR does Closes https://github.com/grafana/irm/issues/31 (and supersedes https://github.com/grafana/oncall/pull/4784) Main changes: - updates `apps.api.permissions.user_is_authorized` to check the value of `organization.is_grafana_irm_enabled`. If it is, we check for the presence of `grafana-irm-app` prefixed RBAC permissions rather than `grafana-oncall-app` - cleans-up `engine/apps/api/tests/test_permissions.py` (bulk of the changes in the PR) - converts `apps.user_management.models.User.build_permissions_query` to a `UserQuerySet` method instead - means we can now do things like this instead: ```python3 User.objects.filter_by_permission(RBACPermission.Permissions.NOTIFICATIONS_READ, organization) ``` ## Checklist - [x] Unit, integration, and e2e (if applicable) tests updated - [x] Documentation added (or `pr:no public docs` PR label added if not required) - [x] Added the relevant release notes label (see labels prefixed w/ `release:`). These labels dictate how your PR will show up in the autogenerated release notes.
344 lines
14 KiB
Python
344 lines
14 KiB
Python
import datetime
|
|
|
|
import pytest
|
|
from django.utils import timezone
|
|
|
|
from apps.api import permissions
|
|
from apps.google import constants as google_constants
|
|
from apps.google.models import GoogleOAuth2User
|
|
from apps.user_management.models import User
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_self_or_has_user_settings_admin_permission(make_organization, make_user_for_organization):
|
|
# RBAC not enabled for org
|
|
organization = make_organization(is_rbac_permissions_enabled=False)
|
|
admin = make_user_for_organization(organization)
|
|
second_admin = make_user_for_organization(organization)
|
|
editor = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.EDITOR)
|
|
|
|
another_organization = make_organization(is_rbac_permissions_enabled=False)
|
|
admin_from_another_organization = make_user_for_organization(another_organization)
|
|
|
|
assert organization.is_rbac_permissions_enabled is False
|
|
assert another_organization.is_rbac_permissions_enabled is False
|
|
|
|
assert admin.self_or_has_user_settings_admin_permission(admin, organization) is True
|
|
assert admin.self_or_has_user_settings_admin_permission(editor, organization) is False
|
|
assert admin.self_or_has_user_settings_admin_permission(second_admin, organization) is True
|
|
|
|
assert admin.self_or_has_user_settings_admin_permission(admin_from_another_organization, organization) is False
|
|
|
|
assert editor.self_or_has_user_settings_admin_permission(editor, organization) is True
|
|
assert editor.self_or_has_user_settings_admin_permission(admin, organization) is True
|
|
|
|
# RBAC enabled org
|
|
organization_with_rbac = make_organization(is_rbac_permissions_enabled=True)
|
|
user_with_perms = make_user_for_organization(
|
|
organization_with_rbac,
|
|
role=permissions.LegacyAccessControlRole.NONE,
|
|
permissions=permissions.GrafanaAPIPermissions.construct_permissions(
|
|
[permissions.RBACPermission.Permissions.USER_SETTINGS_ADMIN.value]
|
|
),
|
|
)
|
|
user_without_perms = make_user_for_organization(
|
|
organization_with_rbac,
|
|
role=permissions.LegacyAccessControlRole.NONE,
|
|
permissions=[],
|
|
)
|
|
|
|
assert organization_with_rbac.is_rbac_permissions_enabled is True
|
|
|
|
# true because self
|
|
assert user_with_perms.self_or_has_user_settings_admin_permission(user_with_perms, organization_with_rbac) is True
|
|
assert (
|
|
user_without_perms.self_or_has_user_settings_admin_permission(user_without_perms, organization_with_rbac)
|
|
is True
|
|
)
|
|
|
|
# true because user_with_perms has proper admin RBAC permission
|
|
assert (
|
|
user_without_perms.self_or_has_user_settings_admin_permission(user_with_perms, organization_with_rbac) is True
|
|
)
|
|
|
|
# false because user_without_perms does not have proper admin RBAC permission
|
|
assert (
|
|
user_with_perms.self_or_has_user_settings_admin_permission(user_without_perms, organization_with_rbac) is False
|
|
)
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_is_admin(make_organization, make_user_for_organization):
|
|
# RBAC not enabled for org
|
|
organization = make_organization(is_rbac_permissions_enabled=False)
|
|
admin = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.ADMIN)
|
|
editor = make_user_for_organization(organization, role=permissions.LegacyAccessControlRole.EDITOR)
|
|
|
|
assert organization.is_rbac_permissions_enabled is False
|
|
|
|
assert admin.is_admin is True
|
|
assert editor.is_admin is False
|
|
|
|
# RBAC enabled org
|
|
organization_with_rbac = make_organization(is_rbac_permissions_enabled=True)
|
|
user_with_perms = make_user_for_organization(
|
|
organization_with_rbac,
|
|
role=permissions.LegacyAccessControlRole.NONE,
|
|
permissions=permissions.GrafanaAPIPermissions.construct_permissions(
|
|
[permissions.RBACPermission.Permissions.ADMIN.value]
|
|
),
|
|
)
|
|
user_without_perms = make_user_for_organization(
|
|
organization_with_rbac,
|
|
role=permissions.LegacyAccessControlRole.NONE,
|
|
permissions=[],
|
|
)
|
|
|
|
assert organization_with_rbac.is_rbac_permissions_enabled is True
|
|
|
|
assert user_with_perms.is_admin is True
|
|
assert user_without_perms.is_admin is False
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_lower_email_filter(make_organization, make_user_for_organization):
|
|
organization = make_organization()
|
|
user = make_user_for_organization(organization, email="TestingUser@test.com")
|
|
make_user_for_organization(organization, email="testing_user@test.com")
|
|
|
|
assert User.objects.get(email__lower="testinguser@test.com") == user
|
|
assert User.objects.filter(email__lower__in=["testinguser@test.com"]).get() == user
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_is_in_working_hours(make_organization, make_user_for_organization):
|
|
organization = make_organization()
|
|
user = make_user_for_organization(organization, _timezone="Europe/London")
|
|
|
|
_7_59_utc = timezone.datetime(2023, 8, 1, 7, 59, 59, tzinfo=datetime.timezone.utc)
|
|
_8_utc = timezone.datetime(2023, 8, 1, 8, 0, 0, tzinfo=datetime.timezone.utc)
|
|
_17_utc = timezone.datetime(2023, 8, 1, 16, 0, 0, tzinfo=datetime.timezone.utc)
|
|
_17_01_utc = timezone.datetime(2023, 8, 1, 16, 0, 1, tzinfo=datetime.timezone.utc)
|
|
|
|
assert user.is_in_working_hours(_7_59_utc) is False
|
|
assert user.is_in_working_hours(_8_utc) is True
|
|
assert user.is_in_working_hours(_17_utc) is True
|
|
assert user.is_in_working_hours(_17_01_utc) is False
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_is_in_working_hours_next_day(make_organization, make_user_for_organization):
|
|
organization = make_organization()
|
|
user = make_user_for_organization(
|
|
organization,
|
|
working_hours={
|
|
"tuesday": [{"start": "17:00:00", "end": "18:00:00"}],
|
|
"wednesday": [{"start": "01:00:00", "end": "02:00:00"}],
|
|
},
|
|
)
|
|
|
|
_8_59_utc = timezone.datetime(2023, 8, 1, 8, 59, 59, tzinfo=datetime.timezone.utc) # 4:59pm on Tuesday in Singapore
|
|
_9_utc = timezone.datetime(2023, 8, 1, 9, 0, 0, tzinfo=datetime.timezone.utc) # 5pm on Tuesday in Singapore
|
|
_10_utc = timezone.datetime(2023, 8, 1, 10, 0, 0, tzinfo=datetime.timezone.utc) # 6pm on Tuesday in Singapore
|
|
_10_01_utc = timezone.datetime(2023, 8, 1, 10, 0, 1, tzinfo=datetime.timezone.utc) # 6:01pm on Tuesday in Singapore
|
|
|
|
_16_59_utc = timezone.datetime(
|
|
2023, 8, 1, 16, 59, 0, tzinfo=datetime.timezone.utc
|
|
) # 00:59am on Wednesday in Singapore
|
|
_17_utc = timezone.datetime(2023, 8, 1, 17, 0, 0, tzinfo=datetime.timezone.utc) # 1am on Wednesday in Singapore
|
|
_18_utc = timezone.datetime(2023, 8, 1, 18, 0, 0, tzinfo=datetime.timezone.utc) # 2am on Wednesday in Singapore
|
|
_18_01_utc = timezone.datetime(
|
|
2023, 8, 1, 18, 0, 1, tzinfo=datetime.timezone.utc
|
|
) # 2:01am on Wednesday in Singapore
|
|
|
|
tz = "Asia/Singapore"
|
|
assert user.is_in_working_hours(_8_59_utc, tz=tz) is False
|
|
assert user.is_in_working_hours(_9_utc, tz=tz) is True
|
|
assert user.is_in_working_hours(_10_utc, tz=tz) is True
|
|
assert user.is_in_working_hours(_10_01_utc, tz=tz) is False
|
|
assert user.is_in_working_hours(_16_59_utc, tz=tz) is False
|
|
assert user.is_in_working_hours(_17_utc, tz=tz) is True
|
|
assert user.is_in_working_hours(_18_utc, tz=tz) is True
|
|
assert user.is_in_working_hours(_18_01_utc, tz=tz) is False
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_is_in_working_hours_no_timezone(make_organization, make_user_for_organization):
|
|
organization = make_organization()
|
|
user = make_user_for_organization(organization, _timezone=None)
|
|
|
|
assert user.is_in_working_hours(timezone.now()) is False
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_is_in_working_hours_weekend(make_organization, make_user_for_organization):
|
|
organization = make_organization()
|
|
user = make_user_for_organization(organization, working_hours={"saturday": []}, _timezone=None)
|
|
|
|
on_saturday = timezone.datetime(2023, 8, 5, 12, 0, 0, tzinfo=datetime.timezone.utc)
|
|
assert user.is_in_working_hours(on_saturday, "UTC") is False
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_is_telegram_connected(make_organization_and_user, make_telegram_user_connector):
|
|
_, user = make_organization_and_user()
|
|
assert user.is_telegram_connected is False
|
|
make_telegram_user_connector(user)
|
|
assert user.is_telegram_connected is True
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_has_google_oauth2_connected(make_organization_and_user, make_google_oauth2_user_for_user):
|
|
_, user = make_organization_and_user()
|
|
|
|
assert user.has_google_oauth2_connected is False
|
|
make_google_oauth2_user_for_user(user)
|
|
assert user.has_google_oauth2_connected is True
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_google_oauth2_token_is_missing_scopes(make_organization_and_user):
|
|
initial_granted_scope = "foo bar baz"
|
|
initial_oauth_response = {
|
|
"access_token": "access",
|
|
"refresh_token": "refresh",
|
|
"sub": "google_user_id",
|
|
"scope": initial_granted_scope,
|
|
}
|
|
|
|
_, user = make_organization_and_user()
|
|
|
|
# false because the user hasn't yet connected their google account
|
|
assert user.google_oauth2_token_is_missing_scopes is False
|
|
|
|
user.save_google_oauth2_settings(initial_oauth_response)
|
|
user.refresh_from_db()
|
|
|
|
# true because we're missing a granted scope
|
|
assert user.google_oauth2_token_is_missing_scopes is True
|
|
|
|
user.save_google_oauth2_settings(
|
|
{
|
|
**initial_oauth_response,
|
|
"scope": f"{initial_granted_scope} {' '.join(google_constants.REQUIRED_OAUTH_SCOPES)}",
|
|
}
|
|
)
|
|
user.refresh_from_db()
|
|
|
|
# False because we now have all the required scopes
|
|
assert user.google_oauth2_token_is_missing_scopes is False
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_save_google_oauth2_settings(make_organization_and_user):
|
|
oauth_response = {
|
|
"access_token": "access",
|
|
"refresh_token": "refresh",
|
|
"sub": "google_user_id",
|
|
"scope": "scope",
|
|
}
|
|
|
|
_, user = make_organization_and_user()
|
|
|
|
assert GoogleOAuth2User.objects.filter(user=user).exists() is False
|
|
assert user.google_calendar_settings is None
|
|
|
|
user.save_google_oauth2_settings(oauth_response)
|
|
user.refresh_from_db()
|
|
|
|
google_oauth_user = user.google_oauth2_user
|
|
assert google_oauth_user.google_user_id == "google_user_id"
|
|
assert google_oauth_user.access_token == "access"
|
|
assert google_oauth_user.refresh_token == "refresh"
|
|
assert google_oauth_user.oauth_scope == "scope"
|
|
assert user.google_calendar_settings["oncall_schedules_to_consider_for_shift_swaps"] == []
|
|
|
|
oauth_response2 = {
|
|
"access_token": "access2",
|
|
"refresh_token": "refresh2",
|
|
"sub": "google_user_id2",
|
|
"scope": "scope2",
|
|
}
|
|
|
|
user.save_google_oauth2_settings(oauth_response2)
|
|
user.refresh_from_db()
|
|
|
|
google_oauth_user = user.google_oauth2_user
|
|
assert google_oauth_user.google_user_id == "google_user_id2"
|
|
assert google_oauth_user.access_token == "access2"
|
|
assert google_oauth_user.refresh_token == "refresh2"
|
|
assert google_oauth_user.oauth_scope == "scope2"
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_reset_google_oauth2_settings(make_organization_and_user):
|
|
_, user = make_organization_and_user()
|
|
|
|
user.save_google_oauth2_settings(
|
|
{
|
|
"access_token": "access",
|
|
"refresh_token": "refresh",
|
|
"sub": "google_user_id",
|
|
"scope": "scope",
|
|
}
|
|
)
|
|
user.refresh_from_db()
|
|
|
|
assert user.google_oauth2_user is not None
|
|
assert user.google_calendar_settings is not None
|
|
|
|
user.reset_google_oauth2_settings()
|
|
user.refresh_from_db()
|
|
|
|
assert GoogleOAuth2User.objects.filter(user=user).exists() is False
|
|
assert user.google_calendar_settings is None
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_filter_by_permission(make_organization, make_user_for_organization):
|
|
"""
|
|
Note that there are some conditions in `UserQuerySet.filter_by_permission` that're
|
|
specific to which database engine is being used. These cases are tested on CI where
|
|
we run the test against sqlite, mysql, and postgresql
|
|
"""
|
|
permission_to_test = permissions.RBACPermission.Permissions.ALERT_GROUPS_READ
|
|
user_permissions = permissions.GrafanaAPIPermissions.construct_permissions([permission_to_test.value])
|
|
irm_permissions = permissions.GrafanaAPIPermissions.construct_permissions(
|
|
[permissions.convert_oncall_permission_to_irm(permission_to_test)]
|
|
)
|
|
|
|
org1_rbac = make_organization(is_rbac_permissions_enabled=True)
|
|
user1 = make_user_for_organization(org1_rbac, permissions=user_permissions)
|
|
user2 = make_user_for_organization(org1_rbac, permissions=user_permissions)
|
|
_ = make_user_for_organization(org1_rbac, permissions=[])
|
|
|
|
org2_rbac_irm = make_organization(is_rbac_permissions_enabled=True, is_grafana_irm_enabled=True)
|
|
user4 = make_user_for_organization(org2_rbac_irm, permissions=irm_permissions)
|
|
user5 = make_user_for_organization(org2_rbac_irm, permissions=irm_permissions)
|
|
_ = make_user_for_organization(org2_rbac_irm, permissions=[])
|
|
|
|
org3_no_rbac = make_organization(is_rbac_permissions_enabled=False)
|
|
user7 = make_user_for_organization(org3_no_rbac, role=permission_to_test.fallback_role)
|
|
user8 = make_user_for_organization(org3_no_rbac, role=permission_to_test.fallback_role)
|
|
_ = make_user_for_organization(org3_no_rbac, role=permissions.LegacyAccessControlRole.NONE)
|
|
|
|
# rbac permissions enabled
|
|
users = User.objects.filter_by_permission(permission_to_test, org1_rbac)
|
|
|
|
assert len(users) == 2
|
|
assert user1 in users
|
|
assert user2 in users
|
|
|
|
# rbac permissions + IRM enabled
|
|
users = User.objects.filter_by_permission(permission_to_test, org2_rbac_irm)
|
|
|
|
assert len(users) == 2
|
|
assert user4 in users
|
|
assert user5 in users
|
|
|
|
# rbac permissions disabled
|
|
users = User.objects.filter_by_permission(permission_to_test, org3_no_rbac)
|
|
|
|
assert len(users) == 2
|
|
assert user7 in users
|
|
assert user8 in users
|