oncall-engine/engine/apps/oss_installation/views/cloud_users.py
Vadim Stepanov d0904ca405
Improve OpenAPI schema coverage (#3629)
# What this PR does

Improves OpenAPI schema coverage for internal API:

- Fixes/Improves `alert group` and `feature` endpoints
- Adds `integration` and `user` endpoints

## Which issue(s) this PR fixes

https://github.com/grafana/oncall/issues/3444

## Checklist

- [x] Unit, integration, and e2e (if applicable) tests updated
- [x] Documentation added (or `pr:no public docs` PR label added if not
required)
- [x] `CHANGELOG.md` updated (or `pr:no changelog` PR label added if not
required)
2024-01-12 15:11:22 +00:00

119 lines
4.5 KiB
Python

from rest_framework import mixins, status, viewsets
from rest_framework.decorators import action
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.views import APIView
from apps.api.permissions import IsOwnerOrHasRBACPermissions, RBACPermission
from apps.auth_token.auth import PluginAuthentication
from apps.oss_installation.models import CloudConnector, CloudUserIdentity
from apps.oss_installation.serializers import CloudUserSerializer
from apps.oss_installation.utils import cloud_user_identity_status
from apps.user_management.models import User
from common.api_helpers.mixins import PublicPrimaryKeyMixin
from common.api_helpers.paginators import HundredPageSizePaginator, PaginatedData
PERMISSIONS = [RBACPermission.Permissions.OTHER_SETTINGS_WRITE]
class CloudUsersPagination(HundredPageSizePaginator):
# the override ignore here is expected. The parent classes' get_paginated_response method does not
# take a matched_users_count argument. This is fine in this case
def get_paginated_response(self, data: PaginatedData, matched_users_count: int) -> Response: # type: ignore[override]
response = super().get_paginated_response(data)
response.data["matched_users_count"] = matched_users_count
return response
class CloudUsersView(CloudUsersPagination, APIView):
authentication_classes = (PluginAuthentication,)
permission_classes = (IsAuthenticated, RBACPermission)
rbac_permissions = {
"get": PERMISSIONS,
"post": PERMISSIONS,
}
def get(self, request):
organization = request.user.organization
queryset = User.objects.filter(
organization=organization,
**User.build_permissions_query(RBACPermission.Permissions.NOTIFICATIONS_READ, organization),
)
if request.user.current_team is not None:
queryset = queryset.filter(teams=request.user.current_team).distinct()
emails = list(queryset.values_list("email", flat=True))
results = self.paginate_queryset(queryset, request, view=self)
cloud_identities = list(CloudUserIdentity.objects.filter(email__in=emails))
cloud_identities = {cloud_identity.email: cloud_identity for cloud_identity in cloud_identities}
data = []
connector = CloudConnector.objects.first()
for user in results:
cloud_identity = cloud_identities.get(user.email, None)
status, link = cloud_user_identity_status(connector, cloud_identity)
data.append(
{
"id": user.public_primary_key,
"email": user.email,
"username": user.username,
"cloud_data": {"status": status, "link": link},
}
)
return self.get_paginated_response(data, len(cloud_identities))
def post(self, request):
connector = CloudConnector.objects.first()
if connector is not None:
sync_status, err = connector.sync_users_with_cloud()
return Response(status=status.HTTP_200_OK, data={"status": sync_status, "error": err})
else:
return Response(status=status.HTTP_400_BAD_REQUEST, data={"detail": "Grafana Cloud is not connected"})
class CloudUserView(
PublicPrimaryKeyMixin,
mixins.RetrieveModelMixin,
viewsets.GenericViewSet,
):
authentication_classes = (PluginAuthentication,)
permission_classes = (IsAuthenticated, RBACPermission)
rbac_permissions = {
"retrieve": PERMISSIONS,
"sync": PERMISSIONS,
}
IsOwnerOrHasUserSettingsAdminPermission = IsOwnerOrHasRBACPermissions(
[RBACPermission.Permissions.USER_SETTINGS_ADMIN]
)
rbac_object_permissions = {
IsOwnerOrHasUserSettingsAdminPermission: [
"retrieve",
"sync",
],
}
serializer_class = CloudUserSerializer
def get_queryset(self):
queryset = User.objects.filter(organization=self.request.user.organization)
return queryset
@action(detail=True, methods=["post"])
def sync(self, request, pk):
user = self.get_object()
connector = CloudConnector.objects.first()
if connector is not None:
sync_status, err = connector.sync_user_with_cloud(user)
return Response(status=status.HTTP_200_OK, data={"status": sync_status, "error": err})
else:
return Response(status=status.HTTP_400_BAD_REQUEST, data={"detail": "Grafana Cloud is not connected"})