Add more validation when updating api_token (#4918)
# What this PR does Skip updating stored api_token for Grafana if it does not look like one. Note: Exact format is not checked (prefix) since there are some differences between versions for what API tokens might look like and this should tolerate those differences. ## Which issue(s) this PR closes Related to [issue link here] <!-- *Note*: If you want the issue to be auto-closed once the PR is merged, change "Related to" to "Closes" in the line above. If you have more than one GitHub issue that this PR closes, be sure to preface each issue link with a [closing keyword](https://docs.github.com/en/get-started/writing-on-github/working-with-advanced-formatting/using-keywords-in-issues-and-pull-requests#linking-a-pull-request-to-an-issue). This ensures that the issue(s) are auto-closed once the PR has been merged. --> ## Checklist - [x] Unit, integration, and e2e (if applicable) tests updated - [x] Documentation added (or `pr:no public docs` PR label added if not required) - [x] Added the relevant release notes label (see labels prefixed w/ `release:`). These labels dictate how your PR will show up in the autogenerated release notes.
This commit is contained in:
parent
fefa9d1106
commit
a5770309d9
2 changed files with 41 additions and 6 deletions
|
|
@ -12,6 +12,7 @@ from apps.user_management.models import Organization
|
|||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
GCOM_TOKEN_CHECK_PERIOD = timezone.timedelta(minutes=60)
|
||||
MIN_GRAFANA_TOKEN_LENGTH = 16
|
||||
|
||||
|
||||
class GcomToken:
|
||||
|
|
@ -19,6 +20,14 @@ class GcomToken:
|
|||
self.organization = organization
|
||||
|
||||
|
||||
def _validate_grafana_token_format(grafana_token: str) -> bool:
|
||||
if not grafana_token or not isinstance(grafana_token, str):
|
||||
return False
|
||||
if len(grafana_token) < MIN_GRAFANA_TOKEN_LENGTH:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def check_gcom_permission(token_string: str, context) -> GcomToken:
|
||||
"""
|
||||
Verify that request from plugin is valid. Check it and synchronize the organization details
|
||||
|
|
@ -45,6 +54,8 @@ def check_gcom_permission(token_string: str, context) -> GcomToken:
|
|||
if not instance_info or str(instance_info["orgId"]) != org_id:
|
||||
raise InvalidToken
|
||||
|
||||
grafana_token_format_is_valid = _validate_grafana_token_format(grafana_token)
|
||||
|
||||
if not organization:
|
||||
from apps.base.models import DynamicSetting
|
||||
|
||||
|
|
@ -52,6 +63,11 @@ def check_gcom_permission(token_string: str, context) -> GcomToken:
|
|||
name="allow_plugin_organization_signup", defaults={"boolean_value": True}
|
||||
)[0].boolean_value
|
||||
if allow_signup:
|
||||
if not grafana_token_format_is_valid:
|
||||
logger.debug(
|
||||
f"grafana token sent when creating stack_id={stack_id} was invalid format. api_token will still be written to DB"
|
||||
)
|
||||
|
||||
# Get org from db or create a new one
|
||||
organization, _ = Organization.objects.get_or_create(
|
||||
stack_id=instance_info["id"],
|
||||
|
|
@ -74,8 +90,13 @@ def check_gcom_permission(token_string: str, context) -> GcomToken:
|
|||
organization.grafana_url = instance_info["url"]
|
||||
organization.cluster_slug = instance_info["clusterSlug"]
|
||||
organization.gcom_token = token_string
|
||||
organization.api_token = grafana_token
|
||||
organization.gcom_token_org_last_time_synced = timezone.now()
|
||||
if not grafana_token_format_is_valid:
|
||||
logger.debug(
|
||||
f"grafana token sent when updating stack_id={stack_id} was invalid, api_token in DB will be unchanged"
|
||||
)
|
||||
else:
|
||||
organization.api_token = grafana_token
|
||||
organization.save(
|
||||
update_fields=[
|
||||
"stack_slug",
|
||||
|
|
|
|||
|
|
@ -5,10 +5,22 @@ import pytest
|
|||
from apps.grafana_plugin.helpers.gcom import check_gcom_permission
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"api_token, api_token_updated",
|
||||
[
|
||||
("glsa_abcdefghijklmnopqrztuvwxyz", True),
|
||||
("abcdefghijklmnopqrztuvwxyz", True),
|
||||
("abc", False),
|
||||
("", False),
|
||||
("<no_value>", False),
|
||||
(None, False),
|
||||
(24, False),
|
||||
],
|
||||
)
|
||||
@pytest.mark.django_db
|
||||
def test_check_gcom_permission_updates_fields(make_organization):
|
||||
def test_check_gcom_permission_updates_fields(make_organization, api_token, api_token_updated):
|
||||
gcom_token = "gcom:test_token"
|
||||
fixed_token = "fixed_token"
|
||||
broken_token = "broken_token"
|
||||
instance_info = {
|
||||
"id": 324534,
|
||||
"slug": "testinstance",
|
||||
|
|
@ -22,10 +34,11 @@ def test_check_gcom_permission_updates_fields(make_organization):
|
|||
context = {
|
||||
"stack_id": str(instance_info["id"]),
|
||||
"org_id": str(instance_info["orgId"]),
|
||||
"grafana_token": fixed_token,
|
||||
"grafana_token": api_token,
|
||||
}
|
||||
|
||||
org = make_organization(stack_id=instance_info["id"], org_id=instance_info["orgId"], api_token="broken_token")
|
||||
org = make_organization(stack_id=instance_info["id"], org_id=instance_info["orgId"], api_token=broken_token)
|
||||
last_time_gcom_synced = org.gcom_token_org_last_time_synced
|
||||
|
||||
with patch(
|
||||
"apps.grafana_plugin.helpers.GcomAPIClient.get_instance_info",
|
||||
|
|
@ -43,5 +56,6 @@ def test_check_gcom_permission_updates_fields(make_organization):
|
|||
assert org.org_title == instance_info["orgName"]
|
||||
assert org.region_slug == instance_info["regionSlug"]
|
||||
assert org.cluster_slug == instance_info["clusterSlug"]
|
||||
assert org.api_token == fixed_token
|
||||
assert org.api_token == api_token if api_token_updated else broken_token
|
||||
assert org.gcom_token == gcom_token
|
||||
assert org.gcom_token_org_last_time_synced != last_time_gcom_synced
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue