diff --git a/engine/apps/auth_token/auth.py b/engine/apps/auth_token/auth.py index 755947c3..41d7f80a 100644 --- a/engine/apps/auth_token/auth.py +++ b/engine/apps/auth_token/auth.py @@ -11,9 +11,11 @@ from rest_framework.request import Request from apps.api.permissions import GrafanaAPIPermission, LegacyAccessControlRole, RBACPermission, user_is_authorized from apps.grafana_plugin.helpers.gcom import check_token +from apps.grafana_plugin.sync_data import SyncUser from apps.user_management.exceptions import OrganizationDeletedException, OrganizationMovedException from apps.user_management.models import User from apps.user_management.models.organization import Organization +from apps.user_management.sync import get_or_create_user from settings.base import SELF_HOSTED_SETTINGS from .constants import GOOGLE_OAUTH2_AUTH_TOKEN_NAME, SCHEDULE_EXPORT_TOKEN_NAME, SLACK_AUTH_TOKEN_NAME @@ -149,19 +151,34 @@ class PluginAuthentication(BasePluginAuthentication): except (ValueError, TypeError): raise exceptions.AuthenticationFailed("Grafana context must be JSON dict.") - if "UserId" not in context and "UserID" not in context: - raise exceptions.AuthenticationFailed("Invalid Grafana context.") - try: - user_id = context["UserId"] - except KeyError: - user_id = context["UserID"] - - try: - return organization.users.get(user_id=user_id) + user_id = context.get("UserId", context.get("UserID")) + if user_id is not None: + return organization.users.get(user_id=user_id) + elif "Login" in context: + return organization.users.get(username=context["Login"]) + else: + raise exceptions.AuthenticationFailed("Grafana context must specify a User or UserID.") except User.DoesNotExist: - logger.debug(f"Could not get user from grafana request. Context {context}") - raise exceptions.AuthenticationFailed("Non-existent or anonymous user.") + try: + user_data = dict(json.loads(request.headers.get("X-Oncall-User-Context"))) + except (ValueError, TypeError): + raise exceptions.AuthenticationFailed("User context must be JSON dict.") + if user_data: + user_sync_data = SyncUser( + id=user_data["id"], + name=user_data["name"], + login=user_data["login"], + email=user_data["email"], + role=user_data["role"], + avatar_url=user_data["avatar_url"], + permissions=user_data["permissions"] or [], + teams=user_data.get("teams", None), + ) + return get_or_create_user(organization, user_sync_data) + else: + logger.debug("Could not get user from grafana request.") + raise exceptions.AuthenticationFailed("Non-existent or anonymous user.") class PluginAuthenticationSchema(OpenApiAuthenticationExtension): diff --git a/engine/apps/auth_token/tests/test_plugin_auth.py b/engine/apps/auth_token/tests/test_plugin_auth.py index 664ff9d3..56fe5697 100644 --- a/engine/apps/auth_token/tests/test_plugin_auth.py +++ b/engine/apps/auth_token/tests/test_plugin_auth.py @@ -1,3 +1,5 @@ +import json + import pytest from django.utils import timezone from rest_framework.exceptions import AuthenticationFailed @@ -5,6 +7,8 @@ from rest_framework.test import APIRequestFactory from apps.auth_token.auth import PluginAuthentication +INSTANCE_CONTEXT = '{"stack_id": 42, "org_id": 24, "grafana_token": "abc"}' + @pytest.mark.django_db def test_plugin_authentication_self_hosted_success(make_organization, make_user, make_token_for_organization): @@ -14,7 +18,7 @@ def test_plugin_authentication_self_hosted_success(make_organization, make_user, headers = { "HTTP_AUTHORIZATION": token_string, - "HTTP_X-Instance-Context": '{"stack_id": 42, "org_id": 24}', + "HTTP_X-Instance-Context": INSTANCE_CONTEXT, "HTTP_X-Grafana-Context": '{"UserId": 12}', } request = APIRequestFactory().get("/", **headers) @@ -26,13 +30,13 @@ def test_plugin_authentication_self_hosted_success(make_organization, make_user, def test_plugin_authentication_gcom_success(make_organization, make_user, make_token_for_organization): # Setting gcom_token_org_last_time_synced to now, so it doesn't try to sync with gcom organization = make_organization( - stack_id=42, org_id=24, gcom_token="123", gcom_token_org_last_time_synced=timezone.now() + stack_id=42, org_id=24, gcom_token="123", api_token="abc", gcom_token_org_last_time_synced=timezone.now() ) user = make_user(organization=organization, user_id=12) headers = { "HTTP_AUTHORIZATION": "gcom:123", - "HTTP_X-Instance-Context": '{"stack_id": 42, "org_id": 24}', + "HTTP_X-Instance-Context": INSTANCE_CONTEXT, "HTTP_X-Grafana-Context": '{"UserId": 12}', } request = APIRequestFactory().get("/", **headers) @@ -50,7 +54,7 @@ def test_plugin_authentication_fail_grafana_context( organization = make_organization(stack_id=42, org_id=24) token, token_string = make_token_for_organization(organization) - headers = {"HTTP_AUTHORIZATION": token_string, "HTTP_X-Instance-Context": '{"stack_id": 42, "org_id": 24}'} + headers = {"HTTP_AUTHORIZATION": token_string, "HTTP_X-Instance-Context": INSTANCE_CONTEXT} if grafana_context is not None: headers["HTTP_X-Grafana-Context"] = grafana_context @@ -61,7 +65,9 @@ def test_plugin_authentication_fail_grafana_context( @pytest.mark.django_db @pytest.mark.parametrize("authorization", [None, "", "123", "gcom:123"]) -@pytest.mark.parametrize("instance_context", [None, "", "non-json", '"string"', "{}", '{"stack_id": 1, "org_id": 1}']) +@pytest.mark.parametrize( + "instance_context", [None, "", "non-json", '"string"', "{}", '{"stack_id": 1, "org_id": 1, "grafana_token": "abc"}'] +) def test_plugin_authentication_fail(authorization, instance_context): headers = {} @@ -75,3 +81,73 @@ def test_plugin_authentication_fail(authorization, instance_context): with pytest.raises(AuthenticationFailed): PluginAuthentication().authenticate(request) + + +@pytest.mark.django_db +def test_plugin_authentication_gcom_setup_new_user(make_organization): + # Setting gcom_token_org_last_time_synced to now, so it doesn't try to sync with gcom + organization = make_organization( + stack_id=42, org_id=24, gcom_token="123", api_token="abc", gcom_token_org_last_time_synced=timezone.now() + ) + assert organization.users.count() == 0 + # user = make_user(organization=organization, user_id=12) + + # logged in user data available through header + user_data = { + "id": 12, + "name": "Test User", + "login": "test_user", + "email": "test@test.com", + "role": "Admin", + "avatar_url": "http://test.com/avatar.png", + "permissions": None, + "teams": None, + } + + headers = { + "HTTP_AUTHORIZATION": "gcom:123", + "HTTP_X-Instance-Context": INSTANCE_CONTEXT, + "HTTP_X-Grafana-Context": '{"UserId": 12}', + "HTTP_X-Oncall-User-Context": json.dumps(user_data), + } + request = APIRequestFactory().get("/", **headers) + + ret_user, ret_token = PluginAuthentication().authenticate(request) + + assert ret_user.user_id == 12 + assert ret_token.organization == organization + assert organization.users.count() == 1 + + +@pytest.mark.django_db +def test_plugin_authentication_self_hosted_setup_new_user(make_organization, make_token_for_organization): + # Setting gcom_token_org_last_time_synced to now, so it doesn't try to sync with gcom + organization = make_organization(stack_id=42, org_id=24) + token, token_string = make_token_for_organization(organization) + assert organization.users.count() == 0 + + # logged in user data available through header + user_data = { + "id": 12, + "name": "Test User", + "login": "test_user", + "email": "test@test.com", + "role": "Admin", + "avatar_url": "http://test.com/avatar.png", + "permissions": None, + "teams": None, + } + + headers = { + "HTTP_AUTHORIZATION": token_string, + "HTTP_X-Instance-Context": INSTANCE_CONTEXT, + "HTTP_X-Grafana-Context": '{"UserId": 12}', + "HTTP_X-Oncall-User-Context": json.dumps(user_data), + } + request = APIRequestFactory().get("/", **headers) + + ret_user, ret_token = PluginAuthentication().authenticate(request) + + assert ret_user.user_id == 12 + assert ret_token.organization == organization + assert organization.users.count() == 1 diff --git a/engine/apps/grafana_plugin/helpers/client.py b/engine/apps/grafana_plugin/helpers/client.py index 8eec042e..73efeb26 100644 --- a/engine/apps/grafana_plugin/helpers/client.py +++ b/engine/apps/grafana_plugin/helpers/client.py @@ -327,6 +327,9 @@ class GrafanaAPIClient(APIClient): def get_service_account_token_permissions(self) -> APIClientResponse[typing.Dict[str, typing.List[str]]]: return self.api_get("api/access-control/user/permissions") + def sync(self) -> APIClientResponse: + return self.api_post("api/plugins/grafana-oncall-app/resources/plugin/sync") + class GcomAPIClient(APIClient): ACTIVE_INSTANCE_QUERY = "instances?status=active" diff --git a/engine/apps/grafana_plugin/helpers/gcom.py b/engine/apps/grafana_plugin/helpers/gcom.py index 91838b44..c2bae3d1 100644 --- a/engine/apps/grafana_plugin/helpers/gcom.py +++ b/engine/apps/grafana_plugin/helpers/gcom.py @@ -27,10 +27,12 @@ def check_gcom_permission(token_string: str, context) -> GcomToken: stack_id = context["stack_id"] org_id = context["org_id"] + grafana_token = context["grafana_token"] organization = Organization.objects.filter(stack_id=stack_id, org_id=org_id).first() if ( organization and organization.gcom_token == token_string + and organization.api_token == grafana_token and organization.gcom_token_org_last_time_synced and timezone.now() - organization.gcom_token_org_last_time_synced < GCOM_TOKEN_CHECK_PERIOD ): @@ -61,6 +63,7 @@ def check_gcom_permission(token_string: str, context) -> GcomToken: region_slug=instance_info["regionSlug"], cluster_slug=instance_info["clusterSlug"], gcom_token=token_string, + api_token=grafana_token, defaults={"gcom_token_org_last_time_synced": timezone.now()}, ) else: @@ -71,6 +74,7 @@ def check_gcom_permission(token_string: str, context) -> GcomToken: organization.grafana_url = instance_info["url"] organization.cluster_slug = instance_info["clusterSlug"] organization.gcom_token = token_string + organization.api_token = grafana_token organization.gcom_token_org_last_time_synced = timezone.now() organization.save( update_fields=[ diff --git a/engine/apps/grafana_plugin/serializers/sync_data.py b/engine/apps/grafana_plugin/serializers/sync_data.py new file mode 100644 index 00000000..321fad2f --- /dev/null +++ b/engine/apps/grafana_plugin/serializers/sync_data.py @@ -0,0 +1,105 @@ +from dataclasses import asdict +from typing import Dict, List + +from rest_framework import serializers + +from apps.grafana_plugin.sync_data import SyncData, SyncPermission, SyncSettings, SyncTeam, SyncUser + + +class SyncPermissionSerializer(serializers.Serializer): + action = serializers.CharField() + + def create(self, validated_data): + return SyncPermission(**validated_data) + + def to_representation(self, instance): + return asdict(instance) + + +class SyncUserSerializer(serializers.Serializer): + id = serializers.IntegerField() + name = serializers.CharField(allow_blank=True) + login = serializers.CharField() + email = serializers.CharField() + role = serializers.CharField() + avatar_url = serializers.CharField() + permissions = SyncPermissionSerializer(many=True, allow_empty=True, allow_null=True) + teams = serializers.ListField(child=serializers.IntegerField(), allow_empty=True, allow_null=True) + + def create(self, validated_data): + return SyncUser(**validated_data) + + def to_representation(self, instance): + return asdict(instance) + + +class SyncTeamSerializer(serializers.Serializer): + team_id = serializers.IntegerField() + name = serializers.CharField() + email = serializers.EmailField(allow_blank=True) + avatar_url = serializers.CharField() + + def create(self, validated_data): + return SyncTeam(**validated_data) + + def to_representation(self, instance): + return asdict(instance) + + +class TeamMemberMappingField(serializers.Field): + def to_representation(self, value: Dict[int, List[int]]): + return {str(k): v for k, v in value.items()} + + def to_internal_value(self, data): + if not isinstance(data, dict): + raise serializers.ValidationError("Expected a dictionary") + try: + return {int(k): v for k, v in data.items()} + except ValueError: + raise serializers.ValidationError("All keys must be convertible to integers") + + +class SyncOnCallSettingsSerializer(serializers.Serializer): + stack_id = serializers.IntegerField() + org_id = serializers.IntegerField() + license = serializers.CharField() + oncall_api_url = serializers.CharField() + oncall_token = serializers.CharField(allow_blank=True) + grafana_url = serializers.CharField() + grafana_token = serializers.CharField() + rbac_enabled = serializers.BooleanField() + incident_enabled = serializers.BooleanField() + incident_backend_url = serializers.CharField(allow_blank=True) + labels_enabled = serializers.BooleanField() + + def create(self, validated_data): + return SyncSettings(**validated_data) + + def to_representation(self, instance): + return asdict(instance) + + +class SyncDataSerializer(serializers.Serializer): + users = serializers.ListField(child=SyncUserSerializer()) + teams = serializers.ListField(child=SyncTeamSerializer(), allow_null=True, allow_empty=True) + team_members = TeamMemberMappingField() + settings = SyncOnCallSettingsSerializer() + + def create(self, validated_data): + return SyncData(**validated_data) + + def to_representation(self, instance): + return asdict(instance) + + def to_internal_value(self, data): + data = super().to_internal_value(data) + users = data.get("users") + if users: + data["users"] = [SyncUser(**user) for user in users] + teams = data.get("teams") + if teams: + data["teams"] = [SyncTeam(**team) for team in teams] + settings = data.get("settings") + if settings: + data["settings"] = SyncSettings(**settings) + return data diff --git a/engine/apps/grafana_plugin/sync_data.py b/engine/apps/grafana_plugin/sync_data.py new file mode 100644 index 00000000..b4a86857 --- /dev/null +++ b/engine/apps/grafana_plugin/sync_data.py @@ -0,0 +1,50 @@ +from dataclasses import dataclass +from typing import Dict, List, Optional + + +@dataclass +class SyncPermission: + action: str + + +@dataclass +class SyncUser: + id: int + name: str + login: str + email: str + role: str + avatar_url: str + permissions: List[SyncPermission] + teams: Optional[List[int]] + + +@dataclass +class SyncTeam: + team_id: int + name: str + email: str + avatar_url: str + + +@dataclass +class SyncSettings: + stack_id: int + org_id: int + license: str + oncall_api_url: str + oncall_token: str + grafana_url: str + grafana_token: str + rbac_enabled: bool + incident_enabled: bool + incident_backend_url: str + labels_enabled: bool + + +@dataclass +class SyncData: + users: List[SyncUser] + teams: List[SyncTeam] + team_members: Dict[int, List[int]] + settings: SyncSettings diff --git a/engine/apps/grafana_plugin/tasks/__init__.py b/engine/apps/grafana_plugin/tasks/__init__.py index 049a25bb..0a42197b 100644 --- a/engine/apps/grafana_plugin/tasks/__init__.py +++ b/engine/apps/grafana_plugin/tasks/__init__.py @@ -3,3 +3,4 @@ from .sync import ( # noqa: F401 sync_organization_async, sync_team_members_for_organization_async, ) +from .sync_v2 import sync_organizations_v2 # noqa: F401 diff --git a/engine/apps/grafana_plugin/tasks/sync_v2.py b/engine/apps/grafana_plugin/tasks/sync_v2.py new file mode 100644 index 00000000..d610c75c --- /dev/null +++ b/engine/apps/grafana_plugin/tasks/sync_v2.py @@ -0,0 +1,42 @@ +import logging +import math +from time import sleep + +from celery.utils.log import get_task_logger +from django.utils import timezone + +from apps.grafana_plugin.helpers.client import GrafanaAPIClient +from apps.grafana_plugin.helpers.gcom import get_active_instance_ids +from apps.user_management.models import Organization +from common.custom_celery_tasks import shared_dedicated_queue_retry_task + +logger = get_task_logger(__name__) +logger.setLevel(logging.DEBUG) + + +SYNC_PERIOD = timezone.timedelta(minutes=4) + + +@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=0) +def sync_organizations_v2(): + organization_qs = Organization.objects.all() + active_instance_ids, is_cloud_configured = get_active_instance_ids() + if is_cloud_configured: + if not active_instance_ids: + logger.warning("Did not find any active instances!") + return + else: + logger.debug(f"Found {len(active_instance_ids)} active instances") + organization_qs = organization_qs.filter(stack_id__in=active_instance_ids) + + orgs_per_second = math.ceil(len(organization_qs) / SYNC_PERIOD.seconds) + for idx, org in enumerate(organization_qs): + client = GrafanaAPIClient(api_url=org.grafana_url, api_token=org.api_token) + _, status = client.sync() + if status["status_code"] != 200: + logger.error( + f"Failed to request sync stack_slug={org.stack_slug} status_code={status['status_code']} url={status['url']} message={status['message']}" + ) + if idx % orgs_per_second == 0: + logger.info(f"Sleep 1s after {idx + 1} organizations processed") + sleep(1) diff --git a/engine/apps/grafana_plugin/tests/test_grafana_api_client.py b/engine/apps/grafana_plugin/tests/test_grafana_api_client.py index 4e72df10..27186246 100644 --- a/engine/apps/grafana_plugin/tests/test_grafana_api_client.py +++ b/engine/apps/grafana_plugin/tests/test_grafana_api_client.py @@ -127,6 +127,7 @@ class TestIsRbacEnabledForOrganization: def test_it_returns_based_on_status_code_of_head_call( self, mocked_grafana_api_client_api_head, api_response_connected, api_status_code, expected ): + mocked_grafana_api_client_api_head.return_value = (None, {"connected": api_response_connected}) mocked_grafana_api_client_api_head.return_value = ( None, {"connected": api_response_connected, "status_code": api_status_code}, diff --git a/engine/apps/grafana_plugin/tests/test_sync_v2.py b/engine/apps/grafana_plugin/tests/test_sync_v2.py new file mode 100644 index 00000000..bf7a9a06 --- /dev/null +++ b/engine/apps/grafana_plugin/tests/test_sync_v2.py @@ -0,0 +1,43 @@ +from unittest.mock import patch + +import pytest +from django.urls import reverse +from rest_framework import status +from rest_framework.test import APIClient + +from apps.api.permissions import LegacyAccessControlRole + + +@pytest.mark.django_db +def test_auth_success(make_organization_and_user_with_plugin_token, make_user_auth_headers): + organization, user, token = make_organization_and_user_with_plugin_token() + client = APIClient() + + auth_headers = make_user_auth_headers(user, token) + + with patch("apps.grafana_plugin.views.sync_v2.SyncV2View.do_sync", return_value=organization) as mock_sync: + response = client.post(reverse("grafana-plugin:sync-v2"), format="json", **auth_headers) + + assert response.status_code == status.HTTP_200_OK + assert mock_sync.called + + +@pytest.mark.django_db +def test_invalid_auth(make_organization_and_user_with_plugin_token, make_user_auth_headers): + organization, user, token = make_organization_and_user_with_plugin_token(role=LegacyAccessControlRole.EDITOR) + client = APIClient() + + auth_headers = make_user_auth_headers(user, "invalid-token") + + with patch("apps.grafana_plugin.views.sync_v2.SyncV2View.do_sync", return_value=organization) as mock_sync: + response = client.post(reverse("grafana-plugin:sync-v2"), format="json", **auth_headers) + + assert response.status_code == status.HTTP_401_UNAUTHORIZED + assert not mock_sync.called + + auth_headers = make_user_auth_headers(user, token) + with patch("apps.grafana_plugin.views.sync_v2.SyncV2View.do_sync", return_value=organization) as mock_sync: + response = client.post(reverse("grafana-plugin:sync-v2"), format="json", **auth_headers) + + assert response.status_code == status.HTTP_403_FORBIDDEN + assert not mock_sync.called diff --git a/engine/apps/grafana_plugin/urls.py b/engine/apps/grafana_plugin/urls.py index 0b3e76c8..55ad0a0f 100644 --- a/engine/apps/grafana_plugin/urls.py +++ b/engine/apps/grafana_plugin/urls.py @@ -1,12 +1,25 @@ from django.urls import re_path -from apps.grafana_plugin.views import InstallView, SelfHostedInstallView, StatusView, SyncOrganizationView +from apps.grafana_plugin.views import ( + InstallV2View, + InstallView, + RecaptchaView, + SelfHostedInstallView, + StatusV2View, + StatusView, + SyncOrganizationView, + SyncV2View, +) app_name = "grafana-plugin" urlpatterns = [ + re_path(r"v2/sync/?", SyncV2View().as_view(), name="sync-v2"), + re_path(r"v2/status/?", StatusV2View().as_view(), name="status-v2"), + re_path(r"v2/install/?", InstallV2View().as_view(), name="install-v2"), re_path(r"self-hosted/install/?", SelfHostedInstallView().as_view(), name="self-hosted-install"), re_path(r"status/?", StatusView().as_view(), name="status"), re_path(r"install/?", InstallView().as_view(), name="install"), re_path(r"sync_organization/?", SyncOrganizationView().as_view(), name="sync-organization"), + re_path(r"recaptcha/?", RecaptchaView().as_view(), name="recaptcha"), ] diff --git a/engine/apps/grafana_plugin/views/__init__.py b/engine/apps/grafana_plugin/views/__init__.py index 23e24373..0abedced 100644 --- a/engine/apps/grafana_plugin/views/__init__.py +++ b/engine/apps/grafana_plugin/views/__init__.py @@ -1,4 +1,8 @@ from .install import InstallView # noqa: F401 +from .install_v2 import InstallV2View # noqa: F401 +from .recaptcha import RecaptchaView # noqa: F401 from .self_hosted_install import SelfHostedInstallView # noqa: F401 from .status import StatusView # noqa: F401 +from .status_v2 import StatusV2View # noqa: F401 from .sync_organization import SyncOrganizationView # noqa: F401 +from .sync_v2 import SyncV2View # noqa: F401 diff --git a/engine/apps/grafana_plugin/views/install_v2.py b/engine/apps/grafana_plugin/views/install_v2.py new file mode 100644 index 00000000..4b35c772 --- /dev/null +++ b/engine/apps/grafana_plugin/views/install_v2.py @@ -0,0 +1,30 @@ +import logging + +from django.conf import settings +from rest_framework import status +from rest_framework.request import Request +from rest_framework.response import Response + +from apps.grafana_plugin.views.sync_v2 import SyncException, SyncV2View +from common.api_helpers.errors import SELF_HOSTED_ONLY_FEATURE_ERROR + +logger = logging.getLogger(__name__) + + +class InstallV2View(SyncV2View): + authentication_classes = () + permission_classes = () + + def post(self, request: Request) -> Response: + if settings.LICENSE != settings.OPEN_SOURCE_LICENSE_NAME: + return Response(data=SELF_HOSTED_ONLY_FEATURE_ERROR, status=status.HTTP_403_FORBIDDEN) + + try: + organization = self.do_sync(request) + except SyncException as e: + return Response(data=e.error_data, status=status.HTTP_400_BAD_REQUEST) + + organization.revoke_plugin() + provisioned_data = organization.provision_plugin() + + return Response(data=provisioned_data, status=status.HTTP_200_OK) diff --git a/engine/apps/grafana_plugin/views/recaptcha.py b/engine/apps/grafana_plugin/views/recaptcha.py new file mode 100644 index 00000000..0f742b79 --- /dev/null +++ b/engine/apps/grafana_plugin/views/recaptcha.py @@ -0,0 +1,17 @@ +from django.conf import settings +from rest_framework.request import Request +from rest_framework.response import Response +from rest_framework.views import APIView + +from apps.auth_token.auth import PluginAuthentication + + +class RecaptchaView(APIView): + authentication_classes = (PluginAuthentication,) + + def get(self, request: Request) -> Response: + return Response( + data={ + "recaptcha_site_key": settings.RECAPTCHA_V3_SITE_KEY, + } + ) diff --git a/engine/apps/grafana_plugin/views/status_v2.py b/engine/apps/grafana_plugin/views/status_v2.py new file mode 100644 index 00000000..ce5a33af --- /dev/null +++ b/engine/apps/grafana_plugin/views/status_v2.py @@ -0,0 +1,52 @@ +import logging + +from django.conf import settings +from rest_framework.request import Request +from rest_framework.response import Response +from rest_framework.views import APIView + +from apps.auth_token.auth import BasePluginAuthentication +from apps.grafana_plugin.helpers import GrafanaAPIClient +from apps.mobile_app.auth import MobileAppAuthTokenAuthentication +from common.api_helpers.mixins import GrafanaHeadersMixin +from common.api_helpers.utils import create_engine_url + +logger = logging.getLogger(__name__) + + +class StatusV2View(GrafanaHeadersMixin, APIView): + authentication_classes = ( + MobileAppAuthTokenAuthentication, + BasePluginAuthentication, + ) + + def get(self, request: Request) -> Response: + # Check if the plugin is currently undergoing maintenance, and return response without querying db + if settings.CURRENTLY_UNDERGOING_MAINTENANCE_MESSAGE: + return Response( + data={ + "currently_undergoing_maintenance_message": settings.CURRENTLY_UNDERGOING_MAINTENANCE_MESSAGE, + } + ) + + organization = request.auth.organization + api_url = create_engine_url("") + + # If /status is called frequently this can be skipped with a cache + grafana_api_client = GrafanaAPIClient(api_url=organization.grafana_url, api_token=organization.api_token) + _, call_status = grafana_api_client.check_token() + + return Response( + data={ + "connection_to_grafana": { + "url": call_status["url"], + "connected": call_status["connected"], + "status_code": call_status["status_code"], + "message": call_status["message"], + }, + "license": settings.LICENSE, + "version": settings.VERSION, + "currently_undergoing_maintenance_message": settings.CURRENTLY_UNDERGOING_MAINTENANCE_MESSAGE, + "api_url": api_url, + } + ) diff --git a/engine/apps/grafana_plugin/views/sync_v2.py b/engine/apps/grafana_plugin/views/sync_v2.py new file mode 100644 index 00000000..b762c7b7 --- /dev/null +++ b/engine/apps/grafana_plugin/views/sync_v2.py @@ -0,0 +1,59 @@ +import logging + +from django.conf import settings +from rest_framework import status +from rest_framework.permissions import IsAuthenticated +from rest_framework.request import Request +from rest_framework.response import Response +from rest_framework.views import APIView + +from apps.api.permissions import RBACPermission +from apps.auth_token.auth import PluginAuthentication +from apps.grafana_plugin.serializers.sync_data import SyncDataSerializer +from apps.user_management.models import Organization +from apps.user_management.sync import apply_sync_data, get_or_create_organization +from common.api_helpers.errors import INVALID_SELF_HOSTED_ID + +logger = logging.getLogger(__name__) + + +class SyncException(Exception): + def __init__(self, error_data): + self.error_data = error_data + + +class SyncV2View(APIView): + authentication_classes = (PluginAuthentication,) + permission_classes = [IsAuthenticated, RBACPermission] + rbac_permissions = { + "post": [RBACPermission.Permissions.USER_SETTINGS_ADMIN], + } + + def do_sync(self, request: Request) -> Organization: + serializer = SyncDataSerializer(data=request.data) + if not serializer.is_valid(): + raise SyncException(serializer.errors) + + sync_data = serializer.save() + + if settings.LICENSE == settings.OPEN_SOURCE_LICENSE_NAME: + stack_id = settings.SELF_HOSTED_SETTINGS["STACK_ID"] + org_id = settings.SELF_HOSTED_SETTINGS["ORG_ID"] + else: + org_id = request.auth.organization + stack_id = request.auth.organization.stack_id + + if sync_data.settings.org_id != org_id or sync_data.settings.stack_id != stack_id: + raise SyncException(INVALID_SELF_HOSTED_ID) + + organization = get_or_create_organization(sync_data.settings.org_id, sync_data.settings.stack_id, sync_data) + apply_sync_data(organization, sync_data) + return organization + + def post(self, request: Request) -> Response: + try: + self.do_sync(request) + except SyncException as e: + return Response(data=e.error_data, status=status.HTTP_400_BAD_REQUEST) + + return Response(status=status.HTTP_200_OK) diff --git a/engine/apps/user_management/models/team.py b/engine/apps/user_management/models/team.py index 50d8dd69..32b8af53 100644 --- a/engine/apps/user_management/models/team.py +++ b/engine/apps/user_management/models/team.py @@ -4,18 +4,14 @@ from django.conf import settings from django.core.validators import MinLengthValidator from django.db import models -from apps.alerts.models import AlertReceiveChannel -from apps.metrics_exporter.helpers import metrics_bulk_update_team_label_cache -from apps.metrics_exporter.metrics_cache_manager import MetricsCacheManager from common.public_primary_keys import generate_public_primary_key, increase_public_primary_key_length if typing.TYPE_CHECKING: from django.db.models.manager import RelatedManager from apps.alerts.models import AlertGroupLogRecord - from apps.grafana_plugin.helpers.client import GrafanaAPIClient from apps.schedules.models import CustomOnCallShift - from apps.user_management.models import Organization, User + from apps.user_management.models import User def generate_public_primary_key_for_team() -> str: @@ -33,66 +29,7 @@ def generate_public_primary_key_for_team() -> str: class TeamManager(models.Manager["Team"]): - @staticmethod - def sync_for_organization( - organization: "Organization", api_teams: typing.List["GrafanaAPIClient.Types.GrafanaTeam"] - ) -> None: - grafana_teams = {team["id"]: team for team in api_teams} - existing_team_ids: typing.Set[int] = set(organization.teams.all().values_list("team_id", flat=True)) - - # create missing teams - teams_to_create = tuple( - Team( - organization_id=organization.pk, - team_id=team["id"], - name=team["name"], - email=team["email"], - avatar_url=team["avatarUrl"], - ) - for team in grafana_teams.values() - if team["id"] not in existing_team_ids - ) - # create entries, ignore failed insertions if team_id already exists in the organization - organization.teams.bulk_create(teams_to_create, batch_size=5000, ignore_conflicts=True) - - # create missing direct paging integrations - AlertReceiveChannel.objects.create_missing_direct_paging_integrations(organization) - - # delete excess teams and their direct paging integrations - team_ids_to_delete = existing_team_ids - grafana_teams.keys() - organization.alert_receive_channels.filter( - team__team_id__in=team_ids_to_delete, integration=AlertReceiveChannel.INTEGRATION_DIRECT_PAGING - ).delete() - organization.teams.filter(team_id__in=team_ids_to_delete).delete() - - # collect teams diffs to update metrics cache - metrics_teams_to_update: MetricsCacheManager.TeamsDiffMap = {} - for team_id in team_ids_to_delete: - metrics_teams_to_update = MetricsCacheManager.update_team_diff( - metrics_teams_to_update, team_id, deleted=True - ) - - # update existing teams if any fields have changed - teams_to_update = [] - for team in organization.teams.filter(team_id__in=existing_team_ids): - grafana_team = grafana_teams[team.team_id] - if ( - team.name != grafana_team["name"] - or team.email != grafana_team["email"] - or team.avatar_url != grafana_team["avatarUrl"] - ): - if team.name != grafana_team["name"]: - # collect teams diffs to update metrics cache - metrics_teams_to_update = MetricsCacheManager.update_team_diff( - metrics_teams_to_update, team.id, new_name=grafana_team["name"] - ) - team.name = grafana_team["name"] - team.email = grafana_team["email"] - team.avatar_url = grafana_team["avatarUrl"] - teams_to_update.append(team) - organization.teams.bulk_update(teams_to_update, ["name", "email", "avatar_url"], batch_size=5000) - - metrics_bulk_update_team_label_cache(metrics_teams_to_update, organization.id) + pass class Team(models.Model): diff --git a/engine/apps/user_management/models/user.py b/engine/apps/user_management/models/user.py index c6de3813..1e459207 100644 --- a/engine/apps/user_management/models/user.py +++ b/engine/apps/user_management/models/user.py @@ -1,5 +1,4 @@ import datetime -import json import logging import re import typing @@ -76,69 +75,7 @@ def default_working_hours(): class UserManager(models.Manager["User"]): - @staticmethod - def sync_for_team(team, api_members: list[dict]): - user_ids = tuple(member["userId"] for member in api_members) - users = team.organization.users.filter(user_id__in=user_ids) - team.users.set(users) - - @staticmethod - def sync_for_organization(organization, api_users: list[dict]): - grafana_users = {user["userId"]: user for user in api_users} - existing_user_ids = set(organization.users.all().values_list("user_id", flat=True)) - - # create missing users - users_to_create = tuple( - User( - organization_id=organization.pk, - user_id=user["userId"], - email=user["email"], - name=user["name"], - username=user["login"], - role=getattr(LegacyAccessControlRole, user["role"].upper(), LegacyAccessControlRole.NONE), - avatar_url=user["avatarUrl"], - permissions=user["permissions"], - ) - for user in grafana_users.values() - if user["userId"] not in existing_user_ids - ) - - organization.users.bulk_create(users_to_create, batch_size=5000) - - # delete excess users - user_ids_to_delete = existing_user_ids - grafana_users.keys() - organization.users.filter(user_id__in=user_ids_to_delete).delete() - - # update existing users if any fields have changed - users_to_update = [] - for user in organization.users.filter(user_id__in=existing_user_ids): - grafana_user = grafana_users[user.user_id] - g_user_role = getattr(LegacyAccessControlRole, grafana_user["role"].upper(), LegacyAccessControlRole.NONE) - - if ( - user.email != grafana_user["email"] - or user.name != grafana_user["name"] - or user.username != grafana_user["login"] - or user.role != g_user_role - or user.avatar_url != grafana_user["avatarUrl"] - # instead of looping through the array of permission objects, simply take the hash - # of the string representation of the data structures and compare. - # Need to first convert the lists of objects to strings because lists/dicts are not hashable - # (because lists and dicts are not hashable.. as they are mutable) - # https://stackoverflow.com/a/22003440 - or hash(json.dumps(user.permissions)) != hash(json.dumps(grafana_user["permissions"])) - ): - user.email = grafana_user["email"] - user.name = grafana_user["name"] - user.username = grafana_user["login"] - user.role = g_user_role - user.avatar_url = grafana_user["avatarUrl"] - user.permissions = grafana_user["permissions"] - users_to_update.append(user) - - organization.users.bulk_update( - users_to_update, ["email", "name", "username", "role", "avatar_url", "permissions"], batch_size=5000 - ) + pass class UserQuerySet(models.QuerySet): diff --git a/engine/apps/user_management/sync.py b/engine/apps/user_management/sync.py index 42b83835..68bf8ffc 100644 --- a/engine/apps/user_management/sync.py +++ b/engine/apps/user_management/sync.py @@ -1,14 +1,21 @@ import logging +import typing import uuid from celery.utils.log import get_task_logger from django.conf import settings from django.utils import timezone -from apps.grafana_plugin.helpers.client import GcomAPIClient, GrafanaAPIClient +from apps.alerts.models import AlertReceiveChannel +from apps.api.permissions import LegacyAccessControlRole +from apps.auth_token.exceptions import InvalidToken +from apps.grafana_plugin.helpers.client import GcomAPIClient, GCOMInstanceInfo, GrafanaAPIClient +from apps.grafana_plugin.sync_data import SyncData, SyncPermission, SyncSettings, SyncTeam, SyncUser +from apps.metrics_exporter.helpers import metrics_bulk_update_team_label_cache +from apps.metrics_exporter.metrics_cache_manager import MetricsCacheManager from apps.user_management.models import Organization, Team, User -from apps.user_management.signals import org_sync_signal from common.utils import task_lock +from settings.base import CLOUD_LICENSE_NAME, OPEN_SOURCE_LICENSE_NAME logger = get_task_logger(__name__) logger.setLevel(logging.DEBUG) @@ -30,47 +37,50 @@ def _sync_organization(organization: Organization) -> None: grafana_api_client = GrafanaAPIClient(api_url=organization.grafana_url, api_token=organization.api_token) gcom_client = GcomAPIClient(settings.GRAFANA_COM_ADMIN_API_TOKEN) + rbac_is_enabled = organization.is_rbac_permissions_enabled # Update organization's RBAC status if it's an open-source instance, or it's an active cloud instance. # Don't update non-active cloud instances (e.g. paused) as they can return 200 OK but not have RBAC enabled. if settings.LICENSE == settings.OPEN_SOURCE_LICENSE_NAME or gcom_client.is_stack_active(organization.stack_id): - rbac_enabled, server_error = grafana_api_client.is_rbac_enabled_for_organization() + rbac_enabled_update, server_error = grafana_api_client.is_rbac_enabled_for_organization() if not server_error: # Only update RBAC status if Grafana didn't return a server error - organization.is_rbac_permissions_enabled = rbac_enabled - logger.info(f"RBAC status org={organization.pk} rbac_enabled={organization.is_rbac_permissions_enabled}") + rbac_is_enabled = rbac_enabled_update - _sync_instance_info(organization) + # get incident plugin settings + grafana_incident_settings, _ = grafana_api_client.get_grafana_incident_plugin_settings() + is_grafana_incident_enabled = False + grafana_incident_backend_url = None + if grafana_incident_settings is not None: + is_grafana_incident_enabled = grafana_incident_settings["enabled"] + grafana_incident_backend_url = (grafana_incident_settings.get("jsonData") or {}).get( + GrafanaAPIClient.GRAFANA_INCIDENT_PLUGIN_BACKEND_URL_KEY + ) - _, check_token_call_status = grafana_api_client.check_token() - if check_token_call_status["connected"]: - organization.api_token_status = Organization.API_TOKEN_STATUS_OK - sync_users_and_teams(grafana_api_client, organization) - organization.last_time_synced = timezone.now() + # get labels plugin settings + is_grafana_labels_enabled = False + grafana_labels_plugin_settings, _ = grafana_api_client.get_grafana_labels_plugin_settings() + if grafana_labels_plugin_settings is not None: + is_grafana_labels_enabled = grafana_labels_plugin_settings["enabled"] - _sync_grafana_incident_plugin(organization, grafana_api_client) - _sync_grafana_labels_plugin(organization, grafana_api_client) - else: - organization.api_token_status = Organization.API_TOKEN_STATUS_FAILED - logger.warning(f"Sync not successful org={organization.pk} token_status=FAILED") + oncall_api_url = settings.BASE_URL + if settings.LICENSE == CLOUD_LICENSE_NAME: + oncall_api_url = settings.GRAFANA_CLOUD_ONCALL_API_URL - organization.save( - update_fields=[ - "cluster_slug", - "stack_slug", - "org_slug", - "org_title", - "region_slug", - "grafana_url", - "last_time_synced", - "api_token_status", - "gcom_token_org_last_time_synced", - "is_rbac_permissions_enabled", - "is_grafana_incident_enabled", - "is_grafana_labels_enabled", - "grafana_incident_backend_url", - ] + sync_settings = SyncSettings( + stack_id=organization.stack_id, + org_id=organization.org_id, + license=settings.LICENSE, + oncall_api_url=oncall_api_url, + oncall_token=organization.gcom_token, + grafana_url=organization.grafana_url, + grafana_token=organization.api_token, + rbac_enabled=rbac_is_enabled, + incident_enabled=is_grafana_incident_enabled, + incident_backend_url=grafana_incident_backend_url, + labels_enabled=is_grafana_labels_enabled, ) - - org_sync_signal.send(sender=None, organization=organization) + _sync_organization_data(organization, sync_settings) + if organization.api_token_status == Organization.API_TOKEN_STATUS_OK: + sync_users_and_teams(grafana_api_client, organization) def _sync_instance_info(organization: Organization) -> None: @@ -90,32 +100,6 @@ def _sync_instance_info(organization: Organization) -> None: organization.gcom_token_org_last_time_synced = timezone.now() -def _sync_grafana_labels_plugin(organization: Organization, grafana_api_client) -> None: - """ - _sync_grafana_labels_plugin checks if grafana-labels-app plugin is enabled and sets a flag in the organization. - It intended to use only inside _sync_organization. It mutates, but not saves org, it's saved in _sync_organization. - """ - grafana_labels_plugin_settings, _ = grafana_api_client.get_grafana_labels_plugin_settings() - if grafana_labels_plugin_settings is not None: - organization.is_grafana_labels_enabled = grafana_labels_plugin_settings["enabled"] - - -def _sync_grafana_incident_plugin(organization: Organization, grafana_api_client) -> None: - """ - _sync_grafana_incident_plugin check if incident plugin is enabled and sets a flag and its url in the organization. - It intended to use only inside _sync_organization. It mutates, but not saves org, it's saved in _sync_organization. - """ - grafana_incident_settings, _ = grafana_api_client.get_grafana_incident_plugin_settings() - organization.is_grafana_incident_enabled = False - organization.grafana_incident_backend_url = None - - if grafana_incident_settings is not None: - organization.is_grafana_incident_enabled = grafana_incident_settings["enabled"] - organization.grafana_incident_backend_url = (grafana_incident_settings.get("jsonData") or {}).get( - GrafanaAPIClient.GRAFANA_INCIDENT_PLUGIN_BACKEND_URL_KEY - ) - - def sync_users_and_teams(client: GrafanaAPIClient, organization: Organization) -> None: sync_users(client, organization) sync_teams(client, organization) @@ -127,7 +111,21 @@ def sync_users(client: GrafanaAPIClient, organization: Organization, **kwargs) - # check if api_users are shaped correctly. e.g. for paused instance, the response is not a list. if not api_users or not isinstance(api_users, (tuple, list)): return - User.objects.sync_for_organization(organization=organization, api_users=api_users) + + sync_users = [ + SyncUser( + id=user["userId"], + name=user["name"], + login=user["login"], + email=user["email"], + role=user["role"], + avatar_url=user["avatarUrl"], + teams=None, + permissions=[SyncPermission(action=permission["permission"]) for permission in user["permissions"]], + ) + for user in api_users + ] + _sync_users_data(organization, sync_users, delete_extra=True) def sync_teams(client: GrafanaAPIClient, organization: Organization, **kwargs) -> None: @@ -135,23 +133,26 @@ def sync_teams(client: GrafanaAPIClient, organization: Organization, **kwargs) - if not api_teams_result: return api_teams = api_teams_result["teams"] - Team.objects.sync_for_organization(organization=organization, api_teams=api_teams) + sync_teams = [ + SyncTeam( + team_id=team["id"], + name=team["name"], + email=team["email"], + avatar_url=team["avatarUrl"], + ) + for team in api_teams + ] + _sync_teams_data(organization, sync_teams) def sync_team_members(client: GrafanaAPIClient, organization: Organization) -> None: + team_members = {} for team in organization.teams.all(): members, _ = client.get_team_members(team.team_id) if not members: continue - User.objects.sync_for_team(team=team, api_members=members) - - -def sync_users_for_teams(client: GrafanaAPIClient, organization: Organization, **kwargs) -> None: - api_teams_result, _ = client.get_teams(**kwargs) - if not api_teams_result: - return - api_teams = api_teams_result["teams"] - Team.objects.sync_for_organization(organization=organization, api_teams=api_teams) + team_members[team.team_id] = [member["userId"] for member in members] + _sync_teams_members_data(organization, team_members) def delete_organization_if_needed(organization: Organization) -> bool: @@ -198,3 +199,233 @@ def cleanup_organization(organization_pk: int) -> None: except Organization.DoesNotExist: logger.info(f"Organization {organization_pk} was not found") + + +def _create_cloud_organization( + org_id: int, stack_id: int, sync_data: SyncData, instance_info: GCOMInstanceInfo +) -> Organization: + client = GcomAPIClient(sync_data.settings.oncall_token) + if not instance_info: + instance_info = client.get_instance_info(stack_id) + if not instance_info or str(instance_info["orgId"]) != org_id: + raise InvalidToken + + return Organization.objects.create( + stack_id=str(instance_info["id"]), + stack_slug=instance_info["slug"], + grafana_url=instance_info["url"], + org_id=str(instance_info["orgId"]), + org_slug=instance_info["orgSlug"], + org_title=instance_info["orgName"], + region_slug=instance_info["regionSlug"], + cluster_slug=instance_info["clusterSlug"], + api_token=sync_data.settings.grafana_token, + gcom_token=sync_data.settings.oncall_token, + is_rbac_permissions_enabled=sync_data.settings.rbac_enabled, + defaults={"gcom_token_org_last_time_synced": timezone.now()}, + ) + + +def _create_oss_organization(sync_data: SyncData) -> Organization: + return Organization.objects.create( + stack_id=settings.SELF_HOSTED_SETTINGS["STACK_ID"], + stack_slug=settings.SELF_HOSTED_SETTINGS["STACK_SLUG"], + org_id=settings.SELF_HOSTED_SETTINGS["ORG_ID"], + org_slug=settings.SELF_HOSTED_SETTINGS["ORG_SLUG"], + org_title=settings.SELF_HOSTED_SETTINGS["ORG_TITLE"], + region_slug=settings.SELF_HOSTED_SETTINGS["REGION_SLUG"], + cluster_slug=settings.SELF_HOSTED_SETTINGS["CLUSTER_SLUG"], + grafana_url=sync_data.settings.grafana_url, + api_token=sync_data.settings.grafana_token, + is_rbac_permissions_enabled=sync_data.settings.rbac_enabled, + ) + + +def _create_organization( + org_id: int, stack_id: int, sync_data: SyncData, instance_info: GCOMInstanceInfo +) -> typing.Optional[Organization]: + if settings.LICENSE == CLOUD_LICENSE_NAME: + return _create_cloud_organization(org_id, stack_id, sync_data, instance_info) + elif settings.LICENSE == OPEN_SOURCE_LICENSE_NAME: + return _create_oss_organization(sync_data) + return None + + +def get_or_create_organization( + org_id: int, stack_id: int, sync_data: SyncData = None, instance_info: GCOMInstanceInfo = None +) -> Organization: + organization = Organization.objects.filter(org_id=org_id, stack_id=stack_id).first() + if not organization: + organization = _create_organization(org_id, stack_id, sync_data, instance_info) + return organization + + +def get_or_create_user(organization: Organization, sync_user: SyncUser) -> User: + _sync_users_data(organization, [sync_user], delete_extra=False) + user = organization.users.get(user_id=sync_user.id) + + # update team membership if needed + # (not removing user from teams, assuming this is called on user creation/first login only; + # periodic sync will keep teams updated) + membership = sync_user.teams or [] + for team_id in membership: + team = organization.teams.filter(team_id=team_id).first() + if team: + user.teams.add(team) + + return user + + +def _sync_organization_data(organization: Organization, sync_settings: SyncSettings): + organization.is_rbac_permissions_enabled = sync_settings.rbac_enabled + logger.info(f"RBAC status org={organization.pk} rbac_enabled={organization.is_rbac_permissions_enabled}") + + organization.is_grafana_labels_enabled = sync_settings.labels_enabled + organization.is_grafana_incident_enabled = sync_settings.incident_enabled + organization.grafana_incident_backend_url = sync_settings.incident_backend_url + organization.grafana_url = sync_settings.grafana_url + organization.api_token = sync_settings.grafana_token + organization.last_time_synced = timezone.now() + + _sync_instance_info(organization) + + grafana_api_client = GrafanaAPIClient(api_url=organization.grafana_url, api_token=organization.api_token) + _, check_token_call_status = grafana_api_client.check_token() + if check_token_call_status["connected"]: + organization.api_token_status = Organization.API_TOKEN_STATUS_OK + organization.last_time_synced = timezone.now() + else: + organization.api_token_status = Organization.API_TOKEN_STATUS_FAILED + logger.warning(f"Sync not successful org={organization.pk} token_status=FAILED") + + organization.save( + update_fields=[ + "api_token", + "api_token_status", + "cluster_slug", + "stack_slug", + "org_slug", + "org_title", + "region_slug", + "grafana_url", + "last_time_synced", + "gcom_token_org_last_time_synced", + "is_rbac_permissions_enabled", + "is_grafana_incident_enabled", + "is_grafana_labels_enabled", + "grafana_incident_backend_url", + ] + ) + + +def _sync_users_data(organization: Organization, sync_users: list[SyncUser], delete_extra=False): + users_to_sync = ( + User( + organization_id=organization.pk, + user_id=user.id, + email=user.email, + name=user.name, + username=user.login, + role=getattr(LegacyAccessControlRole, user.role.upper(), LegacyAccessControlRole.NONE), + avatar_url=user.avatar_url, + permissions=user.permissions or [], + ) + for user in sync_users + ) + + existing_user_ids = set(organization.users.all().values_list("user_id", flat=True)) + kwargs = {} + if settings.DATABASE_TYPE in ("sqlite3", "postgresql"): + # unique_fields is required for sqlite and postgresql setups + kwargs["unique_fields"] = ("organization_id", "user_id", "is_active") + organization.users.bulk_create( + users_to_sync, + update_conflicts=True, + update_fields=("email", "name", "username", "role", "avatar_url", "permissions"), + batch_size=5000, + **kwargs, + ) + + # Retrieve primary keys for the newly created users + # + # If the model’s primary key is an AutoField, the primary key attribute can only be retrieved + # on certain databases (currently PostgreSQL, MariaDB 10.5+, and SQLite 3.35+). + # On other databases, it will not be set. + # https://docs.djangoproject.com/en/4.1/ref/models/querysets/#django.db.models.query.QuerySet.bulk_create + created_users = organization.users.exclude(user_id__in=existing_user_ids) + + if delete_extra: + # delete removed users + existing_user_ids |= set(u.user_id for u in created_users) + user_ids_to_delete = existing_user_ids - {user.id for user in sync_users} + organization.users.filter(user_id__in=user_ids_to_delete).delete() + + +def _sync_teams_data(organization: Organization, sync_teams: list[SyncTeam]): + if sync_teams is None: + sync_teams = [] + # keep existing team names mapping to check for possible metrics cache updates + existing_team_names = {team.team_id: team.name for team in organization.teams.all()} + teams_to_sync = tuple( + Team( + organization_id=organization.pk, + team_id=team.team_id, + name=team.name, + email=team.email, + avatar_url=team.avatar_url, + ) + for team in sync_teams + ) + # create entries, update if team_id already exists in the organization + kwargs = {} + if settings.DATABASE_TYPE in ("sqlite3", "postgresql"): + # unique_fields is required for sqlite and postgresql setups + kwargs["unique_fields"] = ("organization_id", "team_id") + organization.teams.bulk_create( + teams_to_sync, + batch_size=5000, + update_conflicts=True, + update_fields=("name", "email", "avatar_url"), + **kwargs, + ) + + # create missing direct paging integrations + AlertReceiveChannel.objects.create_missing_direct_paging_integrations(organization) + + # delete removed teams and their direct paging integrations + existing_team_ids = set(organization.teams.all().values_list("team_id", flat=True)) + team_ids_to_delete = existing_team_ids - set(t.team_id for t in sync_teams) + organization.alert_receive_channels.filter( + team__team_id__in=team_ids_to_delete, integration=AlertReceiveChannel.INTEGRATION_DIRECT_PAGING + ).delete() + organization.teams.filter(team_id__in=team_ids_to_delete).delete() + + # collect teams diffs to update metrics cache + metrics_teams_to_update: MetricsCacheManager.TeamsDiffMap = {} + for team_id in team_ids_to_delete: + metrics_teams_to_update = MetricsCacheManager.update_team_diff(metrics_teams_to_update, team_id, deleted=True) + for team in sync_teams: + previous_name = existing_team_names.get(team.team_id) + if previous_name and previous_name != team.name: + metrics_teams_to_update = MetricsCacheManager.update_team_diff( + metrics_teams_to_update, team.team_id, new_name=team.name + ) + metrics_bulk_update_team_label_cache(metrics_teams_to_update, organization.id) + + +def _sync_teams_members_data(organization: Organization, team_members: dict[int, list[int]]): + # set team members + for team_id, members_ids in team_members.items(): + team = organization.teams.get(team_id=team_id) + team.users.set(organization.users.filter(user_id__in=members_ids)) + + +def apply_sync_data(organization: Organization, sync_data: SyncData): + # update org + settings + _sync_organization_data(organization, sync_data.settings) + # update or create users + _sync_users_data(organization, sync_data.users, delete_extra=True) + # update or create teams + direct paging integrations + _sync_teams_data(organization, sync_data.teams) + # update team members + _sync_teams_members_data(organization, sync_data.team_members) diff --git a/engine/apps/user_management/tests/test_sync.py b/engine/apps/user_management/tests/test_sync.py index afa889e7..ee278cf5 100644 --- a/engine/apps/user_management/tests/test_sync.py +++ b/engine/apps/user_management/tests/test_sync.py @@ -9,13 +9,15 @@ from django.test import override_settings from apps.alerts.models import AlertReceiveChannel from apps.api.permissions import LegacyAccessControlRole -from apps.grafana_plugin.helpers.client import GrafanaAPIClient -from apps.user_management.models import Team, User +from apps.grafana_plugin.sync_data import SyncUser +from apps.user_management.models import User from apps.user_management.sync import ( - _sync_grafana_incident_plugin, - _sync_grafana_labels_plugin, cleanup_organization, + get_or_create_user, sync_organization, + sync_team_members, + sync_teams, + sync_users, ) MOCK_GRAFANA_INCIDENT_BACKEND_URL = "https://grafana-incident.test" @@ -96,8 +98,9 @@ def test_sync_users_for_organization(make_organization, make_user_for_organizati } for user_id in (2, 3) ) - - User.objects.sync_for_organization(organization, api_users=api_users) + with patched_grafana_api_client(organization) as mock_grafana_api_client: + mock_grafana_api_client.get_users.return_value = api_users + sync_users(mock_grafana_api_client, organization) assert organization.users.count() == 2 @@ -137,7 +140,9 @@ def test_sync_users_for_organization_role_none(make_organization, make_user_for_ for user_id in (2, 3) ) - User.objects.sync_for_organization(organization, api_users=api_users) + with patched_grafana_api_client(organization) as mock_grafana_api_client: + mock_grafana_api_client.get_users.return_value = api_users + sync_users(mock_grafana_api_client, organization) assert organization.users.count() == 2 @@ -170,7 +175,9 @@ def test_sync_teams_for_organization(make_organization, make_team, make_alert_re for team_id in (2, 3, 4) ) - Team.objects.sync_for_organization(organization, api_teams=api_teams) + with patched_grafana_api_client(organization) as mock_grafana_api_client: + mock_grafana_api_client.get_teams.return_value = ({"teams": api_teams}, None) + sync_teams(mock_grafana_api_client, organization) assert organization.teams.count() == 3 @@ -224,15 +231,16 @@ def test_sync_users_for_team(make_organization, make_user_for_organization, make }, ) - User.objects.sync_for_team(team, api_members=api_members) + with patched_grafana_api_client(organization) as mock_grafana_api_client: + mock_grafana_api_client.get_team_members.return_value = (api_members, None) + sync_team_members(mock_grafana_api_client, organization) assert team.users.count() == 1 assert team.users.get() == users[0] @pytest.mark.django_db -@patch("apps.user_management.sync.org_sync_signal") -def test_sync_organization(mocked_org_sync_signal, make_organization): +def test_sync_organization(make_organization): organization = make_organization() with patched_grafana_api_client(organization): @@ -259,8 +267,6 @@ def test_sync_organization(mocked_org_sync_signal, make_organization): # check that is_grafana_labels_enabled flag is set assert organization.is_grafana_labels_enabled is True - mocked_org_sync_signal.send.assert_called_once_with(sender=None, organization=organization) - @pytest.mark.parametrize( "is_rbac_enabled_for_organization,expected", @@ -333,15 +339,26 @@ def test_duplicate_user_ids(make_organization, make_user_for_organization): organization = make_organization() user = make_user_for_organization(organization, user_id=1) - api_users = [] - - User.objects.sync_for_organization(organization, api_users=api_users) + api_users = [ + { + "userId": 2, + "email": "other@test.test", + "name": "Other", + "login": "other", + "role": "admin", + "avatarUrl": "other.test/test", + "permissions": [], + } + ] + with patched_grafana_api_client(organization) as mock_grafana_api_client: + mock_grafana_api_client.get_users.return_value = api_users + sync_users(mock_grafana_api_client, organization) user.refresh_from_db() assert user.is_active is None - assert organization.users.count() == 0 - assert User.objects.filter_with_deleted().count() == 1 + assert organization.users.count() == 1 + assert User.objects.filter_with_deleted(organization=organization).count() == 2 api_users = [ { @@ -355,11 +372,13 @@ def test_duplicate_user_ids(make_organization, make_user_for_organization): } ] - User.objects.sync_for_organization(organization, api_users=api_users) + with patched_grafana_api_client(organization) as mock_grafana_api_client: + mock_grafana_api_client.get_users.return_value = api_users + sync_users(mock_grafana_api_client, organization) assert organization.users.count() == 1 assert organization.users.get().email == "newtest@test.test" - assert User.objects.filter_with_deleted().count() == 2 + assert User.objects.filter_with_deleted(organization=organization).count() == 3 @pytest.mark.django_db @@ -410,7 +429,9 @@ def test_sync_organization_lock( mock_task_lock.assert_called_once_with(lock_cache_key, "random") if task_lock_acquired: - mock_grafana_api_client.assert_called_once() + # 2 calls: get client to fetch organization data, + # and then another one to check token in the refactored sync function + assert mock_grafana_api_client.call_count == 2 else: # task lock could not be acquired mock_grafana_api_client.assert_not_called() @@ -433,18 +454,14 @@ class TestSyncGrafanaLabelsPluginParams: TestSyncGrafanaLabelsPluginParams(({"enabled": False}, None), False), ], ) -@pytest.mark.django_db def test_sync_grafana_labels_plugin(make_organization, test_params: TestSyncGrafanaLabelsPluginParams): organization = make_organization() organization.is_grafana_labels_enabled = False # by default in tests it's true, so setting to false - with patch.object( - GrafanaAPIClient, - "get_grafana_labels_plugin_settings", - return_value=test_params.response, - ): - grafana_api_client = GrafanaAPIClient(api_url=organization.grafana_url, api_token=organization.api_token) - _sync_grafana_labels_plugin(organization, grafana_api_client) + with patched_grafana_api_client(organization) as mock_grafana_api_client: + mock_grafana_api_client.return_value.get_grafana_labels_plugin_settings.return_value = test_params.response + sync_organization(organization) + organization.refresh_from_db() assert organization.is_grafana_labels_enabled is test_params.expected_result @@ -472,15 +489,52 @@ class TestSyncGrafanaIncidentParams: TestSyncGrafanaIncidentParams(({"enabled": False}, None), False, None), # plugin is disabled for some reason ], ) -@pytest.mark.django_db def test_sync_grafana_incident_plugin(make_organization, test_params: TestSyncGrafanaIncidentParams): organization = make_organization() - with patch.object( - GrafanaAPIClient, - "get_grafana_incident_plugin_settings", - return_value=test_params.response, - ): - grafana_api_client = GrafanaAPIClient(api_url=organization.grafana_url, api_token=organization.api_token) - _sync_grafana_incident_plugin(organization, grafana_api_client) + with patched_grafana_api_client(organization) as mock_grafana_api_client: + mock_grafana_api_client.return_value.get_grafana_incident_plugin_settings.return_value = test_params.response + sync_organization(organization) + organization.refresh_from_db() assert organization.is_grafana_incident_enabled is test_params.expected_flag - assert organization.grafana_incident_backend_url is test_params.expected_url + assert organization.grafana_incident_backend_url == test_params.expected_url + + +@pytest.mark.django_db +def test_get_or_create_user(make_organization, make_team, make_user_for_organization): + organization = make_organization() + team = make_team(organization) + # add an existing_user + existing_user = make_user_for_organization(organization) + team.users.add(existing_user) + + assert organization.users.count() == 1 + assert team.users.count() == 1 + + sync_user = SyncUser( + id=42, + email="test@test.com", + name="Test", + login="test", + avatar_url="https://test.com/test", + role="admin", + permissions=[], + teams=None, + ) + + # create user + user = get_or_create_user(organization, sync_user) + + assert user.user_id == sync_user.id + assert user.name == sync_user.name + assert user.email == sync_user.email + assert user.avatar_full_url == sync_user.avatar_url + assert organization.users.count() == 2 + assert team.users.count() == 1 + + # update user + sync_user.teams = [team.team_id] + user = get_or_create_user(organization, sync_user) + + assert organization.users.count() == 2 + assert team.users.count() == 2 + assert team.users.filter(pk=user.pk).exists() diff --git a/engine/common/api_helpers/errors.py b/engine/common/api_helpers/errors.py new file mode 100644 index 00000000..55a27fbf --- /dev/null +++ b/engine/common/api_helpers/errors.py @@ -0,0 +1,20 @@ +from dataclasses import dataclass +from typing import Dict, List, Optional + + +@dataclass +class OnCallError: + code: int + message: str + fields: Optional[Dict[str, List[str]]] = None + + +SELF_HOSTED_ONLY_FEATURE_ERROR = OnCallError( + code=1001, message="This feature is not available in Cloud versions of OnCall" +) + +INVALID_SELF_HOSTED_ID = OnCallError(code=1001, message="Invalid stack or org id for self-hosted organization") + +CLOUD_ONLY_FEATURE_ERROR = OnCallError(code=1002, message="This feature is not available in OSS versions of OnCall") + +INSTALL_ERROR = OnCallError(code=1003, message="Install failed check /plugin/status for details") diff --git a/engine/engine/urls.py b/engine/engine/urls.py index 36cbffbe..1a2f9339 100644 --- a/engine/engine/urls.py +++ b/engine/engine/urls.py @@ -23,6 +23,7 @@ from .views import HealthCheckView, MaintenanceModeStatusView, ReadinessCheckVie paths_to_work_even_when_maintenance_mode_is_active: list[URLPattern | URLResolver] = [ path("", HealthCheckView.as_view()), path("health/", HealthCheckView.as_view()), + path("api/internal/v1/health/", HealthCheckView.as_view()), path("ready/", ReadinessCheckView.as_view()), path("startupprobe/", StartupProbeView.as_view()), path("api/internal/v1/maintenance-mode-status", MaintenanceModeStatusView.as_view()), diff --git a/engine/settings/celery_task_routes.py b/engine/settings/celery_task_routes.py index cce98b6f..03d44e10 100644 --- a/engine/settings/celery_task_routes.py +++ b/engine/settings/celery_task_routes.py @@ -153,6 +153,7 @@ CELERY_TASK_ROUTES = { "apps.grafana_plugin.tasks.sync.start_sync_regions": {"queue": "long"}, "apps.metrics_exporter.tasks.calculate_and_cache_metrics": {"queue": "long"}, "apps.metrics_exporter.tasks.calculate_and_cache_user_was_notified_metric": {"queue": "long"}, + "apps.grafana_plugin.tasks.sync_v2.sync_organizations_v2": {"queue": "long"}, # SLACK "apps.integrations.tasks.notify_about_integration_ratelimit_in_slack": {"queue": "slack"}, "apps.slack.helpers.alert_group_representative.on_alert_group_action_triggered_async": {"queue": "slack"},