Rework organization sync and grafana plugin engine backend (#4756)

Related to
https://github.com/grafana/oncall-private/issues/2806#issuecomment-2246286918.

Prepare engine for the backend plugin enablement/migration:

 - Refactor sync code
- Improve plugin user authentication to set up user on-the-fly (when
missing)
- Implement v2 endpoints for install, sync and status (to be used via
the backend plugin)

(most of the changes come from
https://github.com/grafana/oncall/pull/4657; backport all engine changes
that keep backwards compatibility)
This commit is contained in:
Matias Bordese 2024-07-31 13:12:56 -03:00 committed by GitHub
parent 551cebddb9
commit 35f23cdcc6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
23 changed files with 953 additions and 255 deletions

View file

@ -11,9 +11,11 @@ from rest_framework.request import Request
from apps.api.permissions import GrafanaAPIPermission, LegacyAccessControlRole, RBACPermission, user_is_authorized
from apps.grafana_plugin.helpers.gcom import check_token
from apps.grafana_plugin.sync_data import SyncUser
from apps.user_management.exceptions import OrganizationDeletedException, OrganizationMovedException
from apps.user_management.models import User
from apps.user_management.models.organization import Organization
from apps.user_management.sync import get_or_create_user
from settings.base import SELF_HOSTED_SETTINGS
from .constants import GOOGLE_OAUTH2_AUTH_TOKEN_NAME, SCHEDULE_EXPORT_TOKEN_NAME, SLACK_AUTH_TOKEN_NAME
@ -149,19 +151,34 @@ class PluginAuthentication(BasePluginAuthentication):
except (ValueError, TypeError):
raise exceptions.AuthenticationFailed("Grafana context must be JSON dict.")
if "UserId" not in context and "UserID" not in context:
raise exceptions.AuthenticationFailed("Invalid Grafana context.")
try:
user_id = context["UserId"]
except KeyError:
user_id = context["UserID"]
try:
return organization.users.get(user_id=user_id)
user_id = context.get("UserId", context.get("UserID"))
if user_id is not None:
return organization.users.get(user_id=user_id)
elif "Login" in context:
return organization.users.get(username=context["Login"])
else:
raise exceptions.AuthenticationFailed("Grafana context must specify a User or UserID.")
except User.DoesNotExist:
logger.debug(f"Could not get user from grafana request. Context {context}")
raise exceptions.AuthenticationFailed("Non-existent or anonymous user.")
try:
user_data = dict(json.loads(request.headers.get("X-Oncall-User-Context")))
except (ValueError, TypeError):
raise exceptions.AuthenticationFailed("User context must be JSON dict.")
if user_data:
user_sync_data = SyncUser(
id=user_data["id"],
name=user_data["name"],
login=user_data["login"],
email=user_data["email"],
role=user_data["role"],
avatar_url=user_data["avatar_url"],
permissions=user_data["permissions"] or [],
teams=user_data.get("teams", None),
)
return get_or_create_user(organization, user_sync_data)
else:
logger.debug("Could not get user from grafana request.")
raise exceptions.AuthenticationFailed("Non-existent or anonymous user.")
class PluginAuthenticationSchema(OpenApiAuthenticationExtension):

View file

@ -1,3 +1,5 @@
import json
import pytest
from django.utils import timezone
from rest_framework.exceptions import AuthenticationFailed
@ -5,6 +7,8 @@ from rest_framework.test import APIRequestFactory
from apps.auth_token.auth import PluginAuthentication
INSTANCE_CONTEXT = '{"stack_id": 42, "org_id": 24, "grafana_token": "abc"}'
@pytest.mark.django_db
def test_plugin_authentication_self_hosted_success(make_organization, make_user, make_token_for_organization):
@ -14,7 +18,7 @@ def test_plugin_authentication_self_hosted_success(make_organization, make_user,
headers = {
"HTTP_AUTHORIZATION": token_string,
"HTTP_X-Instance-Context": '{"stack_id": 42, "org_id": 24}',
"HTTP_X-Instance-Context": INSTANCE_CONTEXT,
"HTTP_X-Grafana-Context": '{"UserId": 12}',
}
request = APIRequestFactory().get("/", **headers)
@ -26,13 +30,13 @@ def test_plugin_authentication_self_hosted_success(make_organization, make_user,
def test_plugin_authentication_gcom_success(make_organization, make_user, make_token_for_organization):
# Setting gcom_token_org_last_time_synced to now, so it doesn't try to sync with gcom
organization = make_organization(
stack_id=42, org_id=24, gcom_token="123", gcom_token_org_last_time_synced=timezone.now()
stack_id=42, org_id=24, gcom_token="123", api_token="abc", gcom_token_org_last_time_synced=timezone.now()
)
user = make_user(organization=organization, user_id=12)
headers = {
"HTTP_AUTHORIZATION": "gcom:123",
"HTTP_X-Instance-Context": '{"stack_id": 42, "org_id": 24}',
"HTTP_X-Instance-Context": INSTANCE_CONTEXT,
"HTTP_X-Grafana-Context": '{"UserId": 12}',
}
request = APIRequestFactory().get("/", **headers)
@ -50,7 +54,7 @@ def test_plugin_authentication_fail_grafana_context(
organization = make_organization(stack_id=42, org_id=24)
token, token_string = make_token_for_organization(organization)
headers = {"HTTP_AUTHORIZATION": token_string, "HTTP_X-Instance-Context": '{"stack_id": 42, "org_id": 24}'}
headers = {"HTTP_AUTHORIZATION": token_string, "HTTP_X-Instance-Context": INSTANCE_CONTEXT}
if grafana_context is not None:
headers["HTTP_X-Grafana-Context"] = grafana_context
@ -61,7 +65,9 @@ def test_plugin_authentication_fail_grafana_context(
@pytest.mark.django_db
@pytest.mark.parametrize("authorization", [None, "", "123", "gcom:123"])
@pytest.mark.parametrize("instance_context", [None, "", "non-json", '"string"', "{}", '{"stack_id": 1, "org_id": 1}'])
@pytest.mark.parametrize(
"instance_context", [None, "", "non-json", '"string"', "{}", '{"stack_id": 1, "org_id": 1, "grafana_token": "abc"}']
)
def test_plugin_authentication_fail(authorization, instance_context):
headers = {}
@ -75,3 +81,73 @@ def test_plugin_authentication_fail(authorization, instance_context):
with pytest.raises(AuthenticationFailed):
PluginAuthentication().authenticate(request)
@pytest.mark.django_db
def test_plugin_authentication_gcom_setup_new_user(make_organization):
# Setting gcom_token_org_last_time_synced to now, so it doesn't try to sync with gcom
organization = make_organization(
stack_id=42, org_id=24, gcom_token="123", api_token="abc", gcom_token_org_last_time_synced=timezone.now()
)
assert organization.users.count() == 0
# user = make_user(organization=organization, user_id=12)
# logged in user data available through header
user_data = {
"id": 12,
"name": "Test User",
"login": "test_user",
"email": "test@test.com",
"role": "Admin",
"avatar_url": "http://test.com/avatar.png",
"permissions": None,
"teams": None,
}
headers = {
"HTTP_AUTHORIZATION": "gcom:123",
"HTTP_X-Instance-Context": INSTANCE_CONTEXT,
"HTTP_X-Grafana-Context": '{"UserId": 12}',
"HTTP_X-Oncall-User-Context": json.dumps(user_data),
}
request = APIRequestFactory().get("/", **headers)
ret_user, ret_token = PluginAuthentication().authenticate(request)
assert ret_user.user_id == 12
assert ret_token.organization == organization
assert organization.users.count() == 1
@pytest.mark.django_db
def test_plugin_authentication_self_hosted_setup_new_user(make_organization, make_token_for_organization):
# Setting gcom_token_org_last_time_synced to now, so it doesn't try to sync with gcom
organization = make_organization(stack_id=42, org_id=24)
token, token_string = make_token_for_organization(organization)
assert organization.users.count() == 0
# logged in user data available through header
user_data = {
"id": 12,
"name": "Test User",
"login": "test_user",
"email": "test@test.com",
"role": "Admin",
"avatar_url": "http://test.com/avatar.png",
"permissions": None,
"teams": None,
}
headers = {
"HTTP_AUTHORIZATION": token_string,
"HTTP_X-Instance-Context": INSTANCE_CONTEXT,
"HTTP_X-Grafana-Context": '{"UserId": 12}',
"HTTP_X-Oncall-User-Context": json.dumps(user_data),
}
request = APIRequestFactory().get("/", **headers)
ret_user, ret_token = PluginAuthentication().authenticate(request)
assert ret_user.user_id == 12
assert ret_token.organization == organization
assert organization.users.count() == 1

View file

@ -327,6 +327,9 @@ class GrafanaAPIClient(APIClient):
def get_service_account_token_permissions(self) -> APIClientResponse[typing.Dict[str, typing.List[str]]]:
return self.api_get("api/access-control/user/permissions")
def sync(self) -> APIClientResponse:
return self.api_post("api/plugins/grafana-oncall-app/resources/plugin/sync")
class GcomAPIClient(APIClient):
ACTIVE_INSTANCE_QUERY = "instances?status=active"

View file

@ -27,10 +27,12 @@ def check_gcom_permission(token_string: str, context) -> GcomToken:
stack_id = context["stack_id"]
org_id = context["org_id"]
grafana_token = context["grafana_token"]
organization = Organization.objects.filter(stack_id=stack_id, org_id=org_id).first()
if (
organization
and organization.gcom_token == token_string
and organization.api_token == grafana_token
and organization.gcom_token_org_last_time_synced
and timezone.now() - organization.gcom_token_org_last_time_synced < GCOM_TOKEN_CHECK_PERIOD
):
@ -61,6 +63,7 @@ def check_gcom_permission(token_string: str, context) -> GcomToken:
region_slug=instance_info["regionSlug"],
cluster_slug=instance_info["clusterSlug"],
gcom_token=token_string,
api_token=grafana_token,
defaults={"gcom_token_org_last_time_synced": timezone.now()},
)
else:
@ -71,6 +74,7 @@ def check_gcom_permission(token_string: str, context) -> GcomToken:
organization.grafana_url = instance_info["url"]
organization.cluster_slug = instance_info["clusterSlug"]
organization.gcom_token = token_string
organization.api_token = grafana_token
organization.gcom_token_org_last_time_synced = timezone.now()
organization.save(
update_fields=[

View file

@ -0,0 +1,105 @@
from dataclasses import asdict
from typing import Dict, List
from rest_framework import serializers
from apps.grafana_plugin.sync_data import SyncData, SyncPermission, SyncSettings, SyncTeam, SyncUser
class SyncPermissionSerializer(serializers.Serializer):
action = serializers.CharField()
def create(self, validated_data):
return SyncPermission(**validated_data)
def to_representation(self, instance):
return asdict(instance)
class SyncUserSerializer(serializers.Serializer):
id = serializers.IntegerField()
name = serializers.CharField(allow_blank=True)
login = serializers.CharField()
email = serializers.CharField()
role = serializers.CharField()
avatar_url = serializers.CharField()
permissions = SyncPermissionSerializer(many=True, allow_empty=True, allow_null=True)
teams = serializers.ListField(child=serializers.IntegerField(), allow_empty=True, allow_null=True)
def create(self, validated_data):
return SyncUser(**validated_data)
def to_representation(self, instance):
return asdict(instance)
class SyncTeamSerializer(serializers.Serializer):
team_id = serializers.IntegerField()
name = serializers.CharField()
email = serializers.EmailField(allow_blank=True)
avatar_url = serializers.CharField()
def create(self, validated_data):
return SyncTeam(**validated_data)
def to_representation(self, instance):
return asdict(instance)
class TeamMemberMappingField(serializers.Field):
def to_representation(self, value: Dict[int, List[int]]):
return {str(k): v for k, v in value.items()}
def to_internal_value(self, data):
if not isinstance(data, dict):
raise serializers.ValidationError("Expected a dictionary")
try:
return {int(k): v for k, v in data.items()}
except ValueError:
raise serializers.ValidationError("All keys must be convertible to integers")
class SyncOnCallSettingsSerializer(serializers.Serializer):
stack_id = serializers.IntegerField()
org_id = serializers.IntegerField()
license = serializers.CharField()
oncall_api_url = serializers.CharField()
oncall_token = serializers.CharField(allow_blank=True)
grafana_url = serializers.CharField()
grafana_token = serializers.CharField()
rbac_enabled = serializers.BooleanField()
incident_enabled = serializers.BooleanField()
incident_backend_url = serializers.CharField(allow_blank=True)
labels_enabled = serializers.BooleanField()
def create(self, validated_data):
return SyncSettings(**validated_data)
def to_representation(self, instance):
return asdict(instance)
class SyncDataSerializer(serializers.Serializer):
users = serializers.ListField(child=SyncUserSerializer())
teams = serializers.ListField(child=SyncTeamSerializer(), allow_null=True, allow_empty=True)
team_members = TeamMemberMappingField()
settings = SyncOnCallSettingsSerializer()
def create(self, validated_data):
return SyncData(**validated_data)
def to_representation(self, instance):
return asdict(instance)
def to_internal_value(self, data):
data = super().to_internal_value(data)
users = data.get("users")
if users:
data["users"] = [SyncUser(**user) for user in users]
teams = data.get("teams")
if teams:
data["teams"] = [SyncTeam(**team) for team in teams]
settings = data.get("settings")
if settings:
data["settings"] = SyncSettings(**settings)
return data

View file

@ -0,0 +1,50 @@
from dataclasses import dataclass
from typing import Dict, List, Optional
@dataclass
class SyncPermission:
action: str
@dataclass
class SyncUser:
id: int
name: str
login: str
email: str
role: str
avatar_url: str
permissions: List[SyncPermission]
teams: Optional[List[int]]
@dataclass
class SyncTeam:
team_id: int
name: str
email: str
avatar_url: str
@dataclass
class SyncSettings:
stack_id: int
org_id: int
license: str
oncall_api_url: str
oncall_token: str
grafana_url: str
grafana_token: str
rbac_enabled: bool
incident_enabled: bool
incident_backend_url: str
labels_enabled: bool
@dataclass
class SyncData:
users: List[SyncUser]
teams: List[SyncTeam]
team_members: Dict[int, List[int]]
settings: SyncSettings

View file

@ -3,3 +3,4 @@ from .sync import ( # noqa: F401
sync_organization_async,
sync_team_members_for_organization_async,
)
from .sync_v2 import sync_organizations_v2 # noqa: F401

View file

@ -0,0 +1,42 @@
import logging
import math
from time import sleep
from celery.utils.log import get_task_logger
from django.utils import timezone
from apps.grafana_plugin.helpers.client import GrafanaAPIClient
from apps.grafana_plugin.helpers.gcom import get_active_instance_ids
from apps.user_management.models import Organization
from common.custom_celery_tasks import shared_dedicated_queue_retry_task
logger = get_task_logger(__name__)
logger.setLevel(logging.DEBUG)
SYNC_PERIOD = timezone.timedelta(minutes=4)
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=0)
def sync_organizations_v2():
organization_qs = Organization.objects.all()
active_instance_ids, is_cloud_configured = get_active_instance_ids()
if is_cloud_configured:
if not active_instance_ids:
logger.warning("Did not find any active instances!")
return
else:
logger.debug(f"Found {len(active_instance_ids)} active instances")
organization_qs = organization_qs.filter(stack_id__in=active_instance_ids)
orgs_per_second = math.ceil(len(organization_qs) / SYNC_PERIOD.seconds)
for idx, org in enumerate(organization_qs):
client = GrafanaAPIClient(api_url=org.grafana_url, api_token=org.api_token)
_, status = client.sync()
if status["status_code"] != 200:
logger.error(
f"Failed to request sync stack_slug={org.stack_slug} status_code={status['status_code']} url={status['url']} message={status['message']}"
)
if idx % orgs_per_second == 0:
logger.info(f"Sleep 1s after {idx + 1} organizations processed")
sleep(1)

View file

@ -127,6 +127,7 @@ class TestIsRbacEnabledForOrganization:
def test_it_returns_based_on_status_code_of_head_call(
self, mocked_grafana_api_client_api_head, api_response_connected, api_status_code, expected
):
mocked_grafana_api_client_api_head.return_value = (None, {"connected": api_response_connected})
mocked_grafana_api_client_api_head.return_value = (
None,
{"connected": api_response_connected, "status_code": api_status_code},

View file

@ -0,0 +1,43 @@
from unittest.mock import patch
import pytest
from django.urls import reverse
from rest_framework import status
from rest_framework.test import APIClient
from apps.api.permissions import LegacyAccessControlRole
@pytest.mark.django_db
def test_auth_success(make_organization_and_user_with_plugin_token, make_user_auth_headers):
organization, user, token = make_organization_and_user_with_plugin_token()
client = APIClient()
auth_headers = make_user_auth_headers(user, token)
with patch("apps.grafana_plugin.views.sync_v2.SyncV2View.do_sync", return_value=organization) as mock_sync:
response = client.post(reverse("grafana-plugin:sync-v2"), format="json", **auth_headers)
assert response.status_code == status.HTTP_200_OK
assert mock_sync.called
@pytest.mark.django_db
def test_invalid_auth(make_organization_and_user_with_plugin_token, make_user_auth_headers):
organization, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR)
client = APIClient()
auth_headers = make_user_auth_headers(user, "invalid-token")
with patch("apps.grafana_plugin.views.sync_v2.SyncV2View.do_sync", return_value=organization) as mock_sync:
response = client.post(reverse("grafana-plugin:sync-v2"), format="json", **auth_headers)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
assert not mock_sync.called
auth_headers = make_user_auth_headers(user, token)
with patch("apps.grafana_plugin.views.sync_v2.SyncV2View.do_sync", return_value=organization) as mock_sync:
response = client.post(reverse("grafana-plugin:sync-v2"), format="json", **auth_headers)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert not mock_sync.called

View file

@ -1,12 +1,25 @@
from django.urls import re_path
from apps.grafana_plugin.views import InstallView, SelfHostedInstallView, StatusView, SyncOrganizationView
from apps.grafana_plugin.views import (
InstallV2View,
InstallView,
RecaptchaView,
SelfHostedInstallView,
StatusV2View,
StatusView,
SyncOrganizationView,
SyncV2View,
)
app_name = "grafana-plugin"
urlpatterns = [
re_path(r"v2/sync/?", SyncV2View().as_view(), name="sync-v2"),
re_path(r"v2/status/?", StatusV2View().as_view(), name="status-v2"),
re_path(r"v2/install/?", InstallV2View().as_view(), name="install-v2"),
re_path(r"self-hosted/install/?", SelfHostedInstallView().as_view(), name="self-hosted-install"),
re_path(r"status/?", StatusView().as_view(), name="status"),
re_path(r"install/?", InstallView().as_view(), name="install"),
re_path(r"sync_organization/?", SyncOrganizationView().as_view(), name="sync-organization"),
re_path(r"recaptcha/?", RecaptchaView().as_view(), name="recaptcha"),
]

View file

@ -1,4 +1,8 @@
from .install import InstallView # noqa: F401
from .install_v2 import InstallV2View # noqa: F401
from .recaptcha import RecaptchaView # noqa: F401
from .self_hosted_install import SelfHostedInstallView # noqa: F401
from .status import StatusView # noqa: F401
from .status_v2 import StatusV2View # noqa: F401
from .sync_organization import SyncOrganizationView # noqa: F401
from .sync_v2 import SyncV2View # noqa: F401

View file

@ -0,0 +1,30 @@
import logging
from django.conf import settings
from rest_framework import status
from rest_framework.request import Request
from rest_framework.response import Response
from apps.grafana_plugin.views.sync_v2 import SyncException, SyncV2View
from common.api_helpers.errors import SELF_HOSTED_ONLY_FEATURE_ERROR
logger = logging.getLogger(__name__)
class InstallV2View(SyncV2View):
authentication_classes = ()
permission_classes = ()
def post(self, request: Request) -> Response:
if settings.LICENSE != settings.OPEN_SOURCE_LICENSE_NAME:
return Response(data=SELF_HOSTED_ONLY_FEATURE_ERROR, status=status.HTTP_403_FORBIDDEN)
try:
organization = self.do_sync(request)
except SyncException as e:
return Response(data=e.error_data, status=status.HTTP_400_BAD_REQUEST)
organization.revoke_plugin()
provisioned_data = organization.provision_plugin()
return Response(data=provisioned_data, status=status.HTTP_200_OK)

View file

@ -0,0 +1,17 @@
from django.conf import settings
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.views import APIView
from apps.auth_token.auth import PluginAuthentication
class RecaptchaView(APIView):
authentication_classes = (PluginAuthentication,)
def get(self, request: Request) -> Response:
return Response(
data={
"recaptcha_site_key": settings.RECAPTCHA_V3_SITE_KEY,
}
)

View file

@ -0,0 +1,52 @@
import logging
from django.conf import settings
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.views import APIView
from apps.auth_token.auth import BasePluginAuthentication
from apps.grafana_plugin.helpers import GrafanaAPIClient
from apps.mobile_app.auth import MobileAppAuthTokenAuthentication
from common.api_helpers.mixins import GrafanaHeadersMixin
from common.api_helpers.utils import create_engine_url
logger = logging.getLogger(__name__)
class StatusV2View(GrafanaHeadersMixin, APIView):
authentication_classes = (
MobileAppAuthTokenAuthentication,
BasePluginAuthentication,
)
def get(self, request: Request) -> Response:
# Check if the plugin is currently undergoing maintenance, and return response without querying db
if settings.CURRENTLY_UNDERGOING_MAINTENANCE_MESSAGE:
return Response(
data={
"currently_undergoing_maintenance_message": settings.CURRENTLY_UNDERGOING_MAINTENANCE_MESSAGE,
}
)
organization = request.auth.organization
api_url = create_engine_url("")
# If /status is called frequently this can be skipped with a cache
grafana_api_client = GrafanaAPIClient(api_url=organization.grafana_url, api_token=organization.api_token)
_, call_status = grafana_api_client.check_token()
return Response(
data={
"connection_to_grafana": {
"url": call_status["url"],
"connected": call_status["connected"],
"status_code": call_status["status_code"],
"message": call_status["message"],
},
"license": settings.LICENSE,
"version": settings.VERSION,
"currently_undergoing_maintenance_message": settings.CURRENTLY_UNDERGOING_MAINTENANCE_MESSAGE,
"api_url": api_url,
}
)

View file

@ -0,0 +1,59 @@
import logging
from django.conf import settings
from rest_framework import status
from rest_framework.permissions import IsAuthenticated
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.views import APIView
from apps.api.permissions import RBACPermission
from apps.auth_token.auth import PluginAuthentication
from apps.grafana_plugin.serializers.sync_data import SyncDataSerializer
from apps.user_management.models import Organization
from apps.user_management.sync import apply_sync_data, get_or_create_organization
from common.api_helpers.errors import INVALID_SELF_HOSTED_ID
logger = logging.getLogger(__name__)
class SyncException(Exception):
def __init__(self, error_data):
self.error_data = error_data
class SyncV2View(APIView):
authentication_classes = (PluginAuthentication,)
permission_classes = [IsAuthenticated, RBACPermission]
rbac_permissions = {
"post": [RBACPermission.Permissions.USER_SETTINGS_ADMIN],
}
def do_sync(self, request: Request) -> Organization:
serializer = SyncDataSerializer(data=request.data)
if not serializer.is_valid():
raise SyncException(serializer.errors)
sync_data = serializer.save()
if settings.LICENSE == settings.OPEN_SOURCE_LICENSE_NAME:
stack_id = settings.SELF_HOSTED_SETTINGS["STACK_ID"]
org_id = settings.SELF_HOSTED_SETTINGS["ORG_ID"]
else:
org_id = request.auth.organization
stack_id = request.auth.organization.stack_id
if sync_data.settings.org_id != org_id or sync_data.settings.stack_id != stack_id:
raise SyncException(INVALID_SELF_HOSTED_ID)
organization = get_or_create_organization(sync_data.settings.org_id, sync_data.settings.stack_id, sync_data)
apply_sync_data(organization, sync_data)
return organization
def post(self, request: Request) -> Response:
try:
self.do_sync(request)
except SyncException as e:
return Response(data=e.error_data, status=status.HTTP_400_BAD_REQUEST)
return Response(status=status.HTTP_200_OK)

View file

@ -4,18 +4,14 @@ from django.conf import settings
from django.core.validators import MinLengthValidator
from django.db import models
from apps.alerts.models import AlertReceiveChannel
from apps.metrics_exporter.helpers import metrics_bulk_update_team_label_cache
from apps.metrics_exporter.metrics_cache_manager import MetricsCacheManager
from common.public_primary_keys import generate_public_primary_key, increase_public_primary_key_length
if typing.TYPE_CHECKING:
from django.db.models.manager import RelatedManager
from apps.alerts.models import AlertGroupLogRecord
from apps.grafana_plugin.helpers.client import GrafanaAPIClient
from apps.schedules.models import CustomOnCallShift
from apps.user_management.models import Organization, User
from apps.user_management.models import User
def generate_public_primary_key_for_team() -> str:
@ -33,66 +29,7 @@ def generate_public_primary_key_for_team() -> str:
class TeamManager(models.Manager["Team"]):
@staticmethod
def sync_for_organization(
organization: "Organization", api_teams: typing.List["GrafanaAPIClient.Types.GrafanaTeam"]
) -> None:
grafana_teams = {team["id"]: team for team in api_teams}
existing_team_ids: typing.Set[int] = set(organization.teams.all().values_list("team_id", flat=True))
# create missing teams
teams_to_create = tuple(
Team(
organization_id=organization.pk,
team_id=team["id"],
name=team["name"],
email=team["email"],
avatar_url=team["avatarUrl"],
)
for team in grafana_teams.values()
if team["id"] not in existing_team_ids
)
# create entries, ignore failed insertions if team_id already exists in the organization
organization.teams.bulk_create(teams_to_create, batch_size=5000, ignore_conflicts=True)
# create missing direct paging integrations
AlertReceiveChannel.objects.create_missing_direct_paging_integrations(organization)
# delete excess teams and their direct paging integrations
team_ids_to_delete = existing_team_ids - grafana_teams.keys()
organization.alert_receive_channels.filter(
team__team_id__in=team_ids_to_delete, integration=AlertReceiveChannel.INTEGRATION_DIRECT_PAGING
).delete()
organization.teams.filter(team_id__in=team_ids_to_delete).delete()
# collect teams diffs to update metrics cache
metrics_teams_to_update: MetricsCacheManager.TeamsDiffMap = {}
for team_id in team_ids_to_delete:
metrics_teams_to_update = MetricsCacheManager.update_team_diff(
metrics_teams_to_update, team_id, deleted=True
)
# update existing teams if any fields have changed
teams_to_update = []
for team in organization.teams.filter(team_id__in=existing_team_ids):
grafana_team = grafana_teams[team.team_id]
if (
team.name != grafana_team["name"]
or team.email != grafana_team["email"]
or team.avatar_url != grafana_team["avatarUrl"]
):
if team.name != grafana_team["name"]:
# collect teams diffs to update metrics cache
metrics_teams_to_update = MetricsCacheManager.update_team_diff(
metrics_teams_to_update, team.id, new_name=grafana_team["name"]
)
team.name = grafana_team["name"]
team.email = grafana_team["email"]
team.avatar_url = grafana_team["avatarUrl"]
teams_to_update.append(team)
organization.teams.bulk_update(teams_to_update, ["name", "email", "avatar_url"], batch_size=5000)
metrics_bulk_update_team_label_cache(metrics_teams_to_update, organization.id)
pass
class Team(models.Model):

View file

@ -1,5 +1,4 @@
import datetime
import json
import logging
import re
import typing
@ -76,69 +75,7 @@ def default_working_hours():
class UserManager(models.Manager["User"]):
@staticmethod
def sync_for_team(team, api_members: list[dict]):
user_ids = tuple(member["userId"] for member in api_members)
users = team.organization.users.filter(user_id__in=user_ids)
team.users.set(users)
@staticmethod
def sync_for_organization(organization, api_users: list[dict]):
grafana_users = {user["userId"]: user for user in api_users}
existing_user_ids = set(organization.users.all().values_list("user_id", flat=True))
# create missing users
users_to_create = tuple(
User(
organization_id=organization.pk,
user_id=user["userId"],
email=user["email"],
name=user["name"],
username=user["login"],
role=getattr(LegacyAccessControlRole, user["role"].upper(), LegacyAccessControlRole.NONE),
avatar_url=user["avatarUrl"],
permissions=user["permissions"],
)
for user in grafana_users.values()
if user["userId"] not in existing_user_ids
)
organization.users.bulk_create(users_to_create, batch_size=5000)
# delete excess users
user_ids_to_delete = existing_user_ids - grafana_users.keys()
organization.users.filter(user_id__in=user_ids_to_delete).delete()
# update existing users if any fields have changed
users_to_update = []
for user in organization.users.filter(user_id__in=existing_user_ids):
grafana_user = grafana_users[user.user_id]
g_user_role = getattr(LegacyAccessControlRole, grafana_user["role"].upper(), LegacyAccessControlRole.NONE)
if (
user.email != grafana_user["email"]
or user.name != grafana_user["name"]
or user.username != grafana_user["login"]
or user.role != g_user_role
or user.avatar_url != grafana_user["avatarUrl"]
# instead of looping through the array of permission objects, simply take the hash
# of the string representation of the data structures and compare.
# Need to first convert the lists of objects to strings because lists/dicts are not hashable
# (because lists and dicts are not hashable.. as they are mutable)
# https://stackoverflow.com/a/22003440
or hash(json.dumps(user.permissions)) != hash(json.dumps(grafana_user["permissions"]))
):
user.email = grafana_user["email"]
user.name = grafana_user["name"]
user.username = grafana_user["login"]
user.role = g_user_role
user.avatar_url = grafana_user["avatarUrl"]
user.permissions = grafana_user["permissions"]
users_to_update.append(user)
organization.users.bulk_update(
users_to_update, ["email", "name", "username", "role", "avatar_url", "permissions"], batch_size=5000
)
pass
class UserQuerySet(models.QuerySet):

View file

@ -1,14 +1,21 @@
import logging
import typing
import uuid
from celery.utils.log import get_task_logger
from django.conf import settings
from django.utils import timezone
from apps.grafana_plugin.helpers.client import GcomAPIClient, GrafanaAPIClient
from apps.alerts.models import AlertReceiveChannel
from apps.api.permissions import LegacyAccessControlRole
from apps.auth_token.exceptions import InvalidToken
from apps.grafana_plugin.helpers.client import GcomAPIClient, GCOMInstanceInfo, GrafanaAPIClient
from apps.grafana_plugin.sync_data import SyncData, SyncPermission, SyncSettings, SyncTeam, SyncUser
from apps.metrics_exporter.helpers import metrics_bulk_update_team_label_cache
from apps.metrics_exporter.metrics_cache_manager import MetricsCacheManager
from apps.user_management.models import Organization, Team, User
from apps.user_management.signals import org_sync_signal
from common.utils import task_lock
from settings.base import CLOUD_LICENSE_NAME, OPEN_SOURCE_LICENSE_NAME
logger = get_task_logger(__name__)
logger.setLevel(logging.DEBUG)
@ -30,47 +37,50 @@ def _sync_organization(organization: Organization) -> None:
grafana_api_client = GrafanaAPIClient(api_url=organization.grafana_url, api_token=organization.api_token)
gcom_client = GcomAPIClient(settings.GRAFANA_COM_ADMIN_API_TOKEN)
rbac_is_enabled = organization.is_rbac_permissions_enabled
# Update organization's RBAC status if it's an open-source instance, or it's an active cloud instance.
# Don't update non-active cloud instances (e.g. paused) as they can return 200 OK but not have RBAC enabled.
if settings.LICENSE == settings.OPEN_SOURCE_LICENSE_NAME or gcom_client.is_stack_active(organization.stack_id):
rbac_enabled, server_error = grafana_api_client.is_rbac_enabled_for_organization()
rbac_enabled_update, server_error = grafana_api_client.is_rbac_enabled_for_organization()
if not server_error: # Only update RBAC status if Grafana didn't return a server error
organization.is_rbac_permissions_enabled = rbac_enabled
logger.info(f"RBAC status org={organization.pk} rbac_enabled={organization.is_rbac_permissions_enabled}")
rbac_is_enabled = rbac_enabled_update
_sync_instance_info(organization)
# get incident plugin settings
grafana_incident_settings, _ = grafana_api_client.get_grafana_incident_plugin_settings()
is_grafana_incident_enabled = False
grafana_incident_backend_url = None
if grafana_incident_settings is not None:
is_grafana_incident_enabled = grafana_incident_settings["enabled"]
grafana_incident_backend_url = (grafana_incident_settings.get("jsonData") or {}).get(
GrafanaAPIClient.GRAFANA_INCIDENT_PLUGIN_BACKEND_URL_KEY
)
_, check_token_call_status = grafana_api_client.check_token()
if check_token_call_status["connected"]:
organization.api_token_status = Organization.API_TOKEN_STATUS_OK
sync_users_and_teams(grafana_api_client, organization)
organization.last_time_synced = timezone.now()
# get labels plugin settings
is_grafana_labels_enabled = False
grafana_labels_plugin_settings, _ = grafana_api_client.get_grafana_labels_plugin_settings()
if grafana_labels_plugin_settings is not None:
is_grafana_labels_enabled = grafana_labels_plugin_settings["enabled"]
_sync_grafana_incident_plugin(organization, grafana_api_client)
_sync_grafana_labels_plugin(organization, grafana_api_client)
else:
organization.api_token_status = Organization.API_TOKEN_STATUS_FAILED
logger.warning(f"Sync not successful org={organization.pk} token_status=FAILED")
oncall_api_url = settings.BASE_URL
if settings.LICENSE == CLOUD_LICENSE_NAME:
oncall_api_url = settings.GRAFANA_CLOUD_ONCALL_API_URL
organization.save(
update_fields=[
"cluster_slug",
"stack_slug",
"org_slug",
"org_title",
"region_slug",
"grafana_url",
"last_time_synced",
"api_token_status",
"gcom_token_org_last_time_synced",
"is_rbac_permissions_enabled",
"is_grafana_incident_enabled",
"is_grafana_labels_enabled",
"grafana_incident_backend_url",
]
sync_settings = SyncSettings(
stack_id=organization.stack_id,
org_id=organization.org_id,
license=settings.LICENSE,
oncall_api_url=oncall_api_url,
oncall_token=organization.gcom_token,
grafana_url=organization.grafana_url,
grafana_token=organization.api_token,
rbac_enabled=rbac_is_enabled,
incident_enabled=is_grafana_incident_enabled,
incident_backend_url=grafana_incident_backend_url,
labels_enabled=is_grafana_labels_enabled,
)
org_sync_signal.send(sender=None, organization=organization)
_sync_organization_data(organization, sync_settings)
if organization.api_token_status == Organization.API_TOKEN_STATUS_OK:
sync_users_and_teams(grafana_api_client, organization)
def _sync_instance_info(organization: Organization) -> None:
@ -90,32 +100,6 @@ def _sync_instance_info(organization: Organization) -> None:
organization.gcom_token_org_last_time_synced = timezone.now()
def _sync_grafana_labels_plugin(organization: Organization, grafana_api_client) -> None:
"""
_sync_grafana_labels_plugin checks if grafana-labels-app plugin is enabled and sets a flag in the organization.
It intended to use only inside _sync_organization. It mutates, but not saves org, it's saved in _sync_organization.
"""
grafana_labels_plugin_settings, _ = grafana_api_client.get_grafana_labels_plugin_settings()
if grafana_labels_plugin_settings is not None:
organization.is_grafana_labels_enabled = grafana_labels_plugin_settings["enabled"]
def _sync_grafana_incident_plugin(organization: Organization, grafana_api_client) -> None:
"""
_sync_grafana_incident_plugin check if incident plugin is enabled and sets a flag and its url in the organization.
It intended to use only inside _sync_organization. It mutates, but not saves org, it's saved in _sync_organization.
"""
grafana_incident_settings, _ = grafana_api_client.get_grafana_incident_plugin_settings()
organization.is_grafana_incident_enabled = False
organization.grafana_incident_backend_url = None
if grafana_incident_settings is not None:
organization.is_grafana_incident_enabled = grafana_incident_settings["enabled"]
organization.grafana_incident_backend_url = (grafana_incident_settings.get("jsonData") or {}).get(
GrafanaAPIClient.GRAFANA_INCIDENT_PLUGIN_BACKEND_URL_KEY
)
def sync_users_and_teams(client: GrafanaAPIClient, organization: Organization) -> None:
sync_users(client, organization)
sync_teams(client, organization)
@ -127,7 +111,21 @@ def sync_users(client: GrafanaAPIClient, organization: Organization, **kwargs) -
# check if api_users are shaped correctly. e.g. for paused instance, the response is not a list.
if not api_users or not isinstance(api_users, (tuple, list)):
return
User.objects.sync_for_organization(organization=organization, api_users=api_users)
sync_users = [
SyncUser(
id=user["userId"],
name=user["name"],
login=user["login"],
email=user["email"],
role=user["role"],
avatar_url=user["avatarUrl"],
teams=None,
permissions=[SyncPermission(action=permission["permission"]) for permission in user["permissions"]],
)
for user in api_users
]
_sync_users_data(organization, sync_users, delete_extra=True)
def sync_teams(client: GrafanaAPIClient, organization: Organization, **kwargs) -> None:
@ -135,23 +133,26 @@ def sync_teams(client: GrafanaAPIClient, organization: Organization, **kwargs) -
if not api_teams_result:
return
api_teams = api_teams_result["teams"]
Team.objects.sync_for_organization(organization=organization, api_teams=api_teams)
sync_teams = [
SyncTeam(
team_id=team["id"],
name=team["name"],
email=team["email"],
avatar_url=team["avatarUrl"],
)
for team in api_teams
]
_sync_teams_data(organization, sync_teams)
def sync_team_members(client: GrafanaAPIClient, organization: Organization) -> None:
team_members = {}
for team in organization.teams.all():
members, _ = client.get_team_members(team.team_id)
if not members:
continue
User.objects.sync_for_team(team=team, api_members=members)
def sync_users_for_teams(client: GrafanaAPIClient, organization: Organization, **kwargs) -> None:
api_teams_result, _ = client.get_teams(**kwargs)
if not api_teams_result:
return
api_teams = api_teams_result["teams"]
Team.objects.sync_for_organization(organization=organization, api_teams=api_teams)
team_members[team.team_id] = [member["userId"] for member in members]
_sync_teams_members_data(organization, team_members)
def delete_organization_if_needed(organization: Organization) -> bool:
@ -198,3 +199,233 @@ def cleanup_organization(organization_pk: int) -> None:
except Organization.DoesNotExist:
logger.info(f"Organization {organization_pk} was not found")
def _create_cloud_organization(
org_id: int, stack_id: int, sync_data: SyncData, instance_info: GCOMInstanceInfo
) -> Organization:
client = GcomAPIClient(sync_data.settings.oncall_token)
if not instance_info:
instance_info = client.get_instance_info(stack_id)
if not instance_info or str(instance_info["orgId"]) != org_id:
raise InvalidToken
return Organization.objects.create(
stack_id=str(instance_info["id"]),
stack_slug=instance_info["slug"],
grafana_url=instance_info["url"],
org_id=str(instance_info["orgId"]),
org_slug=instance_info["orgSlug"],
org_title=instance_info["orgName"],
region_slug=instance_info["regionSlug"],
cluster_slug=instance_info["clusterSlug"],
api_token=sync_data.settings.grafana_token,
gcom_token=sync_data.settings.oncall_token,
is_rbac_permissions_enabled=sync_data.settings.rbac_enabled,
defaults={"gcom_token_org_last_time_synced": timezone.now()},
)
def _create_oss_organization(sync_data: SyncData) -> Organization:
return Organization.objects.create(
stack_id=settings.SELF_HOSTED_SETTINGS["STACK_ID"],
stack_slug=settings.SELF_HOSTED_SETTINGS["STACK_SLUG"],
org_id=settings.SELF_HOSTED_SETTINGS["ORG_ID"],
org_slug=settings.SELF_HOSTED_SETTINGS["ORG_SLUG"],
org_title=settings.SELF_HOSTED_SETTINGS["ORG_TITLE"],
region_slug=settings.SELF_HOSTED_SETTINGS["REGION_SLUG"],
cluster_slug=settings.SELF_HOSTED_SETTINGS["CLUSTER_SLUG"],
grafana_url=sync_data.settings.grafana_url,
api_token=sync_data.settings.grafana_token,
is_rbac_permissions_enabled=sync_data.settings.rbac_enabled,
)
def _create_organization(
org_id: int, stack_id: int, sync_data: SyncData, instance_info: GCOMInstanceInfo
) -> typing.Optional[Organization]:
if settings.LICENSE == CLOUD_LICENSE_NAME:
return _create_cloud_organization(org_id, stack_id, sync_data, instance_info)
elif settings.LICENSE == OPEN_SOURCE_LICENSE_NAME:
return _create_oss_organization(sync_data)
return None
def get_or_create_organization(
org_id: int, stack_id: int, sync_data: SyncData = None, instance_info: GCOMInstanceInfo = None
) -> Organization:
organization = Organization.objects.filter(org_id=org_id, stack_id=stack_id).first()
if not organization:
organization = _create_organization(org_id, stack_id, sync_data, instance_info)
return organization
def get_or_create_user(organization: Organization, sync_user: SyncUser) -> User:
_sync_users_data(organization, [sync_user], delete_extra=False)
user = organization.users.get(user_id=sync_user.id)
# update team membership if needed
# (not removing user from teams, assuming this is called on user creation/first login only;
# periodic sync will keep teams updated)
membership = sync_user.teams or []
for team_id in membership:
team = organization.teams.filter(team_id=team_id).first()
if team:
user.teams.add(team)
return user
def _sync_organization_data(organization: Organization, sync_settings: SyncSettings):
organization.is_rbac_permissions_enabled = sync_settings.rbac_enabled
logger.info(f"RBAC status org={organization.pk} rbac_enabled={organization.is_rbac_permissions_enabled}")
organization.is_grafana_labels_enabled = sync_settings.labels_enabled
organization.is_grafana_incident_enabled = sync_settings.incident_enabled
organization.grafana_incident_backend_url = sync_settings.incident_backend_url
organization.grafana_url = sync_settings.grafana_url
organization.api_token = sync_settings.grafana_token
organization.last_time_synced = timezone.now()
_sync_instance_info(organization)
grafana_api_client = GrafanaAPIClient(api_url=organization.grafana_url, api_token=organization.api_token)
_, check_token_call_status = grafana_api_client.check_token()
if check_token_call_status["connected"]:
organization.api_token_status = Organization.API_TOKEN_STATUS_OK
organization.last_time_synced = timezone.now()
else:
organization.api_token_status = Organization.API_TOKEN_STATUS_FAILED
logger.warning(f"Sync not successful org={organization.pk} token_status=FAILED")
organization.save(
update_fields=[
"api_token",
"api_token_status",
"cluster_slug",
"stack_slug",
"org_slug",
"org_title",
"region_slug",
"grafana_url",
"last_time_synced",
"gcom_token_org_last_time_synced",
"is_rbac_permissions_enabled",
"is_grafana_incident_enabled",
"is_grafana_labels_enabled",
"grafana_incident_backend_url",
]
)
def _sync_users_data(organization: Organization, sync_users: list[SyncUser], delete_extra=False):
users_to_sync = (
User(
organization_id=organization.pk,
user_id=user.id,
email=user.email,
name=user.name,
username=user.login,
role=getattr(LegacyAccessControlRole, user.role.upper(), LegacyAccessControlRole.NONE),
avatar_url=user.avatar_url,
permissions=user.permissions or [],
)
for user in sync_users
)
existing_user_ids = set(organization.users.all().values_list("user_id", flat=True))
kwargs = {}
if settings.DATABASE_TYPE in ("sqlite3", "postgresql"):
# unique_fields is required for sqlite and postgresql setups
kwargs["unique_fields"] = ("organization_id", "user_id", "is_active")
organization.users.bulk_create(
users_to_sync,
update_conflicts=True,
update_fields=("email", "name", "username", "role", "avatar_url", "permissions"),
batch_size=5000,
**kwargs,
)
# Retrieve primary keys for the newly created users
#
# If the models primary key is an AutoField, the primary key attribute can only be retrieved
# on certain databases (currently PostgreSQL, MariaDB 10.5+, and SQLite 3.35+).
# On other databases, it will not be set.
# https://docs.djangoproject.com/en/4.1/ref/models/querysets/#django.db.models.query.QuerySet.bulk_create
created_users = organization.users.exclude(user_id__in=existing_user_ids)
if delete_extra:
# delete removed users
existing_user_ids |= set(u.user_id for u in created_users)
user_ids_to_delete = existing_user_ids - {user.id for user in sync_users}
organization.users.filter(user_id__in=user_ids_to_delete).delete()
def _sync_teams_data(organization: Organization, sync_teams: list[SyncTeam]):
if sync_teams is None:
sync_teams = []
# keep existing team names mapping to check for possible metrics cache updates
existing_team_names = {team.team_id: team.name for team in organization.teams.all()}
teams_to_sync = tuple(
Team(
organization_id=organization.pk,
team_id=team.team_id,
name=team.name,
email=team.email,
avatar_url=team.avatar_url,
)
for team in sync_teams
)
# create entries, update if team_id already exists in the organization
kwargs = {}
if settings.DATABASE_TYPE in ("sqlite3", "postgresql"):
# unique_fields is required for sqlite and postgresql setups
kwargs["unique_fields"] = ("organization_id", "team_id")
organization.teams.bulk_create(
teams_to_sync,
batch_size=5000,
update_conflicts=True,
update_fields=("name", "email", "avatar_url"),
**kwargs,
)
# create missing direct paging integrations
AlertReceiveChannel.objects.create_missing_direct_paging_integrations(organization)
# delete removed teams and their direct paging integrations
existing_team_ids = set(organization.teams.all().values_list("team_id", flat=True))
team_ids_to_delete = existing_team_ids - set(t.team_id for t in sync_teams)
organization.alert_receive_channels.filter(
team__team_id__in=team_ids_to_delete, integration=AlertReceiveChannel.INTEGRATION_DIRECT_PAGING
).delete()
organization.teams.filter(team_id__in=team_ids_to_delete).delete()
# collect teams diffs to update metrics cache
metrics_teams_to_update: MetricsCacheManager.TeamsDiffMap = {}
for team_id in team_ids_to_delete:
metrics_teams_to_update = MetricsCacheManager.update_team_diff(metrics_teams_to_update, team_id, deleted=True)
for team in sync_teams:
previous_name = existing_team_names.get(team.team_id)
if previous_name and previous_name != team.name:
metrics_teams_to_update = MetricsCacheManager.update_team_diff(
metrics_teams_to_update, team.team_id, new_name=team.name
)
metrics_bulk_update_team_label_cache(metrics_teams_to_update, organization.id)
def _sync_teams_members_data(organization: Organization, team_members: dict[int, list[int]]):
# set team members
for team_id, members_ids in team_members.items():
team = organization.teams.get(team_id=team_id)
team.users.set(organization.users.filter(user_id__in=members_ids))
def apply_sync_data(organization: Organization, sync_data: SyncData):
# update org + settings
_sync_organization_data(organization, sync_data.settings)
# update or create users
_sync_users_data(organization, sync_data.users, delete_extra=True)
# update or create teams + direct paging integrations
_sync_teams_data(organization, sync_data.teams)
# update team members
_sync_teams_members_data(organization, sync_data.team_members)

View file

@ -9,13 +9,15 @@ from django.test import override_settings
from apps.alerts.models import AlertReceiveChannel
from apps.api.permissions import LegacyAccessControlRole
from apps.grafana_plugin.helpers.client import GrafanaAPIClient
from apps.user_management.models import Team, User
from apps.grafana_plugin.sync_data import SyncUser
from apps.user_management.models import User
from apps.user_management.sync import (
_sync_grafana_incident_plugin,
_sync_grafana_labels_plugin,
cleanup_organization,
get_or_create_user,
sync_organization,
sync_team_members,
sync_teams,
sync_users,
)
MOCK_GRAFANA_INCIDENT_BACKEND_URL = "https://grafana-incident.test"
@ -96,8 +98,9 @@ def test_sync_users_for_organization(make_organization, make_user_for_organizati
}
for user_id in (2, 3)
)
User.objects.sync_for_organization(organization, api_users=api_users)
with patched_grafana_api_client(organization) as mock_grafana_api_client:
mock_grafana_api_client.get_users.return_value = api_users
sync_users(mock_grafana_api_client, organization)
assert organization.users.count() == 2
@ -137,7 +140,9 @@ def test_sync_users_for_organization_role_none(make_organization, make_user_for_
for user_id in (2, 3)
)
User.objects.sync_for_organization(organization, api_users=api_users)
with patched_grafana_api_client(organization) as mock_grafana_api_client:
mock_grafana_api_client.get_users.return_value = api_users
sync_users(mock_grafana_api_client, organization)
assert organization.users.count() == 2
@ -170,7 +175,9 @@ def test_sync_teams_for_organization(make_organization, make_team, make_alert_re
for team_id in (2, 3, 4)
)
Team.objects.sync_for_organization(organization, api_teams=api_teams)
with patched_grafana_api_client(organization) as mock_grafana_api_client:
mock_grafana_api_client.get_teams.return_value = ({"teams": api_teams}, None)
sync_teams(mock_grafana_api_client, organization)
assert organization.teams.count() == 3
@ -224,15 +231,16 @@ def test_sync_users_for_team(make_organization, make_user_for_organization, make
},
)
User.objects.sync_for_team(team, api_members=api_members)
with patched_grafana_api_client(organization) as mock_grafana_api_client:
mock_grafana_api_client.get_team_members.return_value = (api_members, None)
sync_team_members(mock_grafana_api_client, organization)
assert team.users.count() == 1
assert team.users.get() == users[0]
@pytest.mark.django_db
@patch("apps.user_management.sync.org_sync_signal")
def test_sync_organization(mocked_org_sync_signal, make_organization):
def test_sync_organization(make_organization):
organization = make_organization()
with patched_grafana_api_client(organization):
@ -259,8 +267,6 @@ def test_sync_organization(mocked_org_sync_signal, make_organization):
# check that is_grafana_labels_enabled flag is set
assert organization.is_grafana_labels_enabled is True
mocked_org_sync_signal.send.assert_called_once_with(sender=None, organization=organization)
@pytest.mark.parametrize(
"is_rbac_enabled_for_organization,expected",
@ -333,15 +339,26 @@ def test_duplicate_user_ids(make_organization, make_user_for_organization):
organization = make_organization()
user = make_user_for_organization(organization, user_id=1)
api_users = []
User.objects.sync_for_organization(organization, api_users=api_users)
api_users = [
{
"userId": 2,
"email": "other@test.test",
"name": "Other",
"login": "other",
"role": "admin",
"avatarUrl": "other.test/test",
"permissions": [],
}
]
with patched_grafana_api_client(organization) as mock_grafana_api_client:
mock_grafana_api_client.get_users.return_value = api_users
sync_users(mock_grafana_api_client, organization)
user.refresh_from_db()
assert user.is_active is None
assert organization.users.count() == 0
assert User.objects.filter_with_deleted().count() == 1
assert organization.users.count() == 1
assert User.objects.filter_with_deleted(organization=organization).count() == 2
api_users = [
{
@ -355,11 +372,13 @@ def test_duplicate_user_ids(make_organization, make_user_for_organization):
}
]
User.objects.sync_for_organization(organization, api_users=api_users)
with patched_grafana_api_client(organization) as mock_grafana_api_client:
mock_grafana_api_client.get_users.return_value = api_users
sync_users(mock_grafana_api_client, organization)
assert organization.users.count() == 1
assert organization.users.get().email == "newtest@test.test"
assert User.objects.filter_with_deleted().count() == 2
assert User.objects.filter_with_deleted(organization=organization).count() == 3
@pytest.mark.django_db
@ -410,7 +429,9 @@ def test_sync_organization_lock(
mock_task_lock.assert_called_once_with(lock_cache_key, "random")
if task_lock_acquired:
mock_grafana_api_client.assert_called_once()
# 2 calls: get client to fetch organization data,
# and then another one to check token in the refactored sync function
assert mock_grafana_api_client.call_count == 2
else:
# task lock could not be acquired
mock_grafana_api_client.assert_not_called()
@ -433,18 +454,14 @@ class TestSyncGrafanaLabelsPluginParams:
TestSyncGrafanaLabelsPluginParams(({"enabled": False}, None), False),
],
)
@pytest.mark.django_db
def test_sync_grafana_labels_plugin(make_organization, test_params: TestSyncGrafanaLabelsPluginParams):
organization = make_organization()
organization.is_grafana_labels_enabled = False # by default in tests it's true, so setting to false
with patch.object(
GrafanaAPIClient,
"get_grafana_labels_plugin_settings",
return_value=test_params.response,
):
grafana_api_client = GrafanaAPIClient(api_url=organization.grafana_url, api_token=organization.api_token)
_sync_grafana_labels_plugin(organization, grafana_api_client)
with patched_grafana_api_client(organization) as mock_grafana_api_client:
mock_grafana_api_client.return_value.get_grafana_labels_plugin_settings.return_value = test_params.response
sync_organization(organization)
organization.refresh_from_db()
assert organization.is_grafana_labels_enabled is test_params.expected_result
@ -472,15 +489,52 @@ class TestSyncGrafanaIncidentParams:
TestSyncGrafanaIncidentParams(({"enabled": False}, None), False, None), # plugin is disabled for some reason
],
)
@pytest.mark.django_db
def test_sync_grafana_incident_plugin(make_organization, test_params: TestSyncGrafanaIncidentParams):
organization = make_organization()
with patch.object(
GrafanaAPIClient,
"get_grafana_incident_plugin_settings",
return_value=test_params.response,
):
grafana_api_client = GrafanaAPIClient(api_url=organization.grafana_url, api_token=organization.api_token)
_sync_grafana_incident_plugin(organization, grafana_api_client)
with patched_grafana_api_client(organization) as mock_grafana_api_client:
mock_grafana_api_client.return_value.get_grafana_incident_plugin_settings.return_value = test_params.response
sync_organization(organization)
organization.refresh_from_db()
assert organization.is_grafana_incident_enabled is test_params.expected_flag
assert organization.grafana_incident_backend_url is test_params.expected_url
assert organization.grafana_incident_backend_url == test_params.expected_url
@pytest.mark.django_db
def test_get_or_create_user(make_organization, make_team, make_user_for_organization):
organization = make_organization()
team = make_team(organization)
# add an existing_user
existing_user = make_user_for_organization(organization)
team.users.add(existing_user)
assert organization.users.count() == 1
assert team.users.count() == 1
sync_user = SyncUser(
id=42,
email="test@test.com",
name="Test",
login="test",
avatar_url="https://test.com/test",
role="admin",
permissions=[],
teams=None,
)
# create user
user = get_or_create_user(organization, sync_user)
assert user.user_id == sync_user.id
assert user.name == sync_user.name
assert user.email == sync_user.email
assert user.avatar_full_url == sync_user.avatar_url
assert organization.users.count() == 2
assert team.users.count() == 1
# update user
sync_user.teams = [team.team_id]
user = get_or_create_user(organization, sync_user)
assert organization.users.count() == 2
assert team.users.count() == 2
assert team.users.filter(pk=user.pk).exists()

View file

@ -0,0 +1,20 @@
from dataclasses import dataclass
from typing import Dict, List, Optional
@dataclass
class OnCallError:
code: int
message: str
fields: Optional[Dict[str, List[str]]] = None
SELF_HOSTED_ONLY_FEATURE_ERROR = OnCallError(
code=1001, message="This feature is not available in Cloud versions of OnCall"
)
INVALID_SELF_HOSTED_ID = OnCallError(code=1001, message="Invalid stack or org id for self-hosted organization")
CLOUD_ONLY_FEATURE_ERROR = OnCallError(code=1002, message="This feature is not available in OSS versions of OnCall")
INSTALL_ERROR = OnCallError(code=1003, message="Install failed check /plugin/status for details")

View file

@ -23,6 +23,7 @@ from .views import HealthCheckView, MaintenanceModeStatusView, ReadinessCheckVie
paths_to_work_even_when_maintenance_mode_is_active: list[URLPattern | URLResolver] = [
path("", HealthCheckView.as_view()),
path("health/", HealthCheckView.as_view()),
path("api/internal/v1/health/", HealthCheckView.as_view()),
path("ready/", ReadinessCheckView.as_view()),
path("startupprobe/", StartupProbeView.as_view()),
path("api/internal/v1/maintenance-mode-status", MaintenanceModeStatusView.as_view()),

View file

@ -153,6 +153,7 @@ CELERY_TASK_ROUTES = {
"apps.grafana_plugin.tasks.sync.start_sync_regions": {"queue": "long"},
"apps.metrics_exporter.tasks.calculate_and_cache_metrics": {"queue": "long"},
"apps.metrics_exporter.tasks.calculate_and_cache_user_was_notified_metric": {"queue": "long"},
"apps.grafana_plugin.tasks.sync_v2.sync_organizations_v2": {"queue": "long"},
# SLACK
"apps.integrations.tasks.notify_about_integration_ratelimit_in_slack": {"queue": "slack"},
"apps.slack.helpers.alert_group_representative.on_alert_group_action_triggered_async": {"queue": "slack"},