refactor the is_rbac_permissions_enabled check to be more robust (#1099)

# What this PR does
Checks the `is_rbac_permissions_enabled` flag differently based on
whether we are dealing with an open-source, or cloud installation:
- for open-source installations, simply continue making a `HEAD` request
to the list RBAC permissions Grafana API endpoint.
- for cloud installations, use the `config` object returned from `GET
/instances/{instance_id}?config=true` and check whether
`instance_info["config"]["feature_toggles"]["accessControlOnCall"] ==
"true"`

## Which issue(s) this PR fixes
Resolves the issue in hosted grafana where when a stack is inactive, the
hosted grafana gateway, returns 200 to the `HEAD` request (which
erroneously sets the `is_rbac_permissions_enabled` flag to `true`)

## Checklist

- [x] Tests updated (N/A)
- [ ] Documentation added
- [x] `CHANGELOG.md` updated
This commit is contained in:
Joey Orlando 2023-01-11 12:48:30 +01:00 committed by GitHub
parent 231c0f45a3
commit babacf4da8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 117 additions and 13 deletions

View file

@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## v1.1.16 (TBD)
### Fixed
- Minor bug fix in how the value of `Organization.is_rbac_permissions_enabled` is determined
## v1.1.15 (2023-01-10)
### Changed

View file

@ -30,6 +30,14 @@ class GrafanaUserWithPermissions(GrafanaUser):
permissions: List[GrafanaAPIPermission]
class GCOMInstanceInfoConfigFeatureToggles(TypedDict):
accessControlOnCall: str
class GCOMInstanceInfoConfig(TypedDict):
feature_toggles: GCOMInstanceInfoConfigFeatureToggles
class GCOMInstanceInfo(TypedDict):
id: int
orgId: int
@ -38,6 +46,7 @@ class GCOMInstanceInfo(TypedDict):
orgName: str
url: str
status: str
config: Optional[GCOMInstanceInfoConfig]
class APIClient:
@ -190,10 +199,26 @@ class GcomAPIClient(APIClient):
def __init__(self, api_token: str):
super().__init__(settings.GRAFANA_COM_API_URL, api_token)
def get_instance_info(self, stack_id: str) -> Optional[GCOMInstanceInfo]:
data, _ = self.api_get(f"instances/{stack_id}")
def get_instance_info(self, stack_id: str, include_config_query_param: bool = False) -> Optional[GCOMInstanceInfo]:
"""
NOTE: in order to use ?config=true, an "Admin" GCOM token must be used to make the API call
"""
url = f"instances/{stack_id}"
if include_config_query_param:
url += "?config=true"
data, _ = self.api_get(url)
return data
def is_rbac_enabled_for_stack(self, stack_id: str) -> bool:
"""
NOTE: must use an "Admin" GCOM token when calling this method
"""
instance_info = self.get_instance_info(stack_id, True)
if not instance_info:
return False
return instance_info.get("config", {}).get("feature_toggles", {}).get("accessControlOnCall", "false") == "true"
def get_instances(self, query: str):
return self.api_get(query)

View file

@ -0,0 +1,29 @@
from unittest.mock import patch
import pytest
from apps.grafana_plugin.helpers.client import GcomAPIClient
class TestIsRbacEnabledForStack:
@pytest.mark.parametrize(
"gcom_api_response,expected",
[
(None, False),
({}, False),
({"config": {}}, False),
({"config": {"feature_toggles": {}}}, False),
({"config": {"feature_toggles": {"accessControlOnCall": "false"}}}, False),
({"config": {"feature_toggles": {"accessControlOnCall": "true"}}}, True),
],
)
@patch("apps.grafana_plugin.helpers.client.GcomAPIClient.api_get")
def test_it_returns_based_on_feature_toggle_value(
self, mocked_gcom_api_client_api_get, gcom_api_response, expected
):
stack_id = 5
mocked_gcom_api_client_api_get.return_value = (gcom_api_response, {"status_code": 200})
api_client = GcomAPIClient("someFakeApiToken")
assert api_client.is_rbac_enabled_for_stack(stack_id) == expected
assert mocked_gcom_api_client_api_get.called_once_with(f"instances/{stack_id}?config=true")

View file

@ -15,7 +15,7 @@ class TestGetUsersPermissions:
permissions = api_client.get_users_permissions(False)
assert len(permissions.keys()) == 0
@patch("apps.grafana_plugin.views.self_hosted_install.GrafanaAPIClient.api_get")
@patch("apps.grafana_plugin.helpers.client.GrafanaAPIClient.api_get")
def test_api_call_returns_none(self, mocked_grafana_api_client_api_get):
mocked_grafana_api_client_api_get.return_value = (None, "dfkjfdkj")
@ -24,7 +24,7 @@ class TestGetUsersPermissions:
permissions = api_client.get_users_permissions(True)
assert len(permissions.keys()) == 0
@patch("apps.grafana_plugin.views.self_hosted_install.GrafanaAPIClient.api_get")
@patch("apps.grafana_plugin.helpers.client.GrafanaAPIClient.api_get")
def test_it_properly_transforms_the_data(self, mocked_grafana_api_client_api_get):
mocked_grafana_api_client_api_get.return_value = (
{"1": {"grafana-oncall-app.alert-groups:read": [""], "grafana-oncall-app.alert-groups:write": [""]}},
@ -50,7 +50,7 @@ class TestIsRbacEnabledForOrganization:
(status.HTTP_404_NOT_FOUND, False),
],
)
@patch("apps.grafana_plugin.views.self_hosted_install.GrafanaAPIClient.api_head")
@patch("apps.grafana_plugin.helpers.client.GrafanaAPIClient.api_head")
def test_it_returns_based_on_status_code_of_head_call(
self, mocked_grafana_api_client_api_head, grafana_api_status_code, expected
):

View file

@ -12,18 +12,29 @@ logger.setLevel(logging.DEBUG)
def sync_organization(organization):
client = GrafanaAPIClient(api_url=organization.grafana_url, api_token=organization.api_token)
grafana_api_client = GrafanaAPIClient(api_url=organization.grafana_url, api_token=organization.api_token)
# NOTE: checking whether or not RBAC is enabled depends on whether we are dealing with an open-source or cloud
# stack. For Cloud we should make a call to the GCOM API, using an admin API token, and get the list of
# feature_toggles enabled for the stack. For open-source, simply make a HEAD request to the grafana instance's API
# and consider RBAC enabled if the list RBAC permissions endpoint returns 200. We cannot simply rely on the HEAD
# call in cloud because if an instance is not active, the grafana gateway will still return 200 for the
# HEAD request.
if settings.LICENSE == settings.CLOUD_LICENSE_NAME:
gcom_client = GcomAPIClient(settings.GRAFANA_COM_ADMIN_API_TOKEN)
rbac_is_enabled = gcom_client.is_rbac_enabled_for_stack(organization.stack_id)
else:
rbac_is_enabled = grafana_api_client.is_rbac_enabled_for_organization()
rbac_is_enabled = client.is_rbac_enabled_for_organization()
organization.is_rbac_permissions_enabled = rbac_is_enabled
_sync_instance_info(organization)
api_users = client.get_users(rbac_is_enabled)
api_users = grafana_api_client.get_users(rbac_is_enabled)
if api_users:
organization.api_token_status = Organization.API_TOKEN_STATUS_OK
sync_users_and_teams(client, api_users, organization)
sync_users_and_teams(grafana_api_client, api_users, organization)
else:
organization.api_token_status = Organization.API_TOKEN_STATUS_FAILED

View file

@ -1,11 +1,12 @@
from unittest.mock import patch
import pytest
from django.conf import settings
from django.test import override_settings
from apps.grafana_plugin.helpers.client import GcomAPIClient, GrafanaAPIClient
from apps.user_management.models import Team, User
from apps.user_management.sync import cleanup_organization, sync_organization
from conftest import IS_RBAC_ENABLED
@pytest.mark.django_db
@ -133,7 +134,7 @@ def test_sync_organization(make_organization, make_team, make_user_for_organizat
},
)
with patch.object(GrafanaAPIClient, "is_rbac_enabled_for_organization", return_value=IS_RBAC_ENABLED):
with patch.object(GrafanaAPIClient, "is_rbac_enabled_for_organization", return_value=False):
with patch.object(GrafanaAPIClient, "get_users", return_value=api_users_response):
with patch.object(GrafanaAPIClient, "get_teams", return_value=(api_teams_response, None)):
with patch.object(GrafanaAPIClient, "get_team_members", return_value=(api_members_response, None)):
@ -153,8 +154,40 @@ def test_sync_organization(make_organization, make_team, make_user_for_organizat
assert team.users.count() == 1
assert team.users.get() == user
# check that the rbac flag is properly set on the org
assert organization.is_rbac_permissions_enabled == IS_RBAC_ENABLED
@pytest.mark.parametrize("grafana_api_response", [False, True])
@override_settings(LICENSE=settings.OPEN_SOURCE_LICENSE_NAME)
@pytest.mark.django_db
def test_sync_organization_is_rbac_permissions_enabled_open_source(make_organization, grafana_api_response):
organization = make_organization()
with patch.object(GrafanaAPIClient, "is_rbac_enabled_for_organization", return_value=grafana_api_response):
with patch.object(GrafanaAPIClient, "get_users", return_value=[]):
sync_organization(organization)
organization.refresh_from_db()
assert organization.is_rbac_permissions_enabled == grafana_api_response
@pytest.mark.parametrize("gcom_api_response", [False, True])
@patch("apps.user_management.sync.GcomAPIClient")
@override_settings(LICENSE=settings.CLOUD_LICENSE_NAME)
@override_settings(GRAFANA_COM_ADMIN_API_TOKEN="mockedToken")
@pytest.mark.django_db
def test_sync_organization_is_rbac_permissions_enabled_cloud(mocked_gcom_client, make_organization, gcom_api_response):
stack_id = 5
organization = make_organization(stack_id=stack_id)
mocked_gcom_client.return_value.is_rbac_enabled_for_stack.return_value = gcom_api_response
with patch.object(GrafanaAPIClient, "get_users", return_value=[]):
sync_organization(organization)
organization.refresh_from_db()
assert mocked_gcom_client.return_value.called_once_with("mockedToken")
assert mocked_gcom_client.return_value.is_rbac_enabled_for_stack.called_once_with(stack_id)
assert organization.is_rbac_permissions_enabled == gcom_api_response
@pytest.mark.django_db