# What this PR does Fixed issue where `User Settings Reader` was missing permission to list users. ## Which issue(s) this PR fixes ## Checklist - [x] Unit, integration, and e2e (if applicable) tests updated - [x] Documentation added (or `pr:no public docs` PR label added if not required) - [x] `CHANGELOG.md` updated (or `pr:no changelog` PR label added if not required)
706 lines
29 KiB
Python
706 lines
29 KiB
Python
import logging
|
|
|
|
import pytz
|
|
from django.conf import settings
|
|
from django.core.exceptions import ObjectDoesNotExist
|
|
from django.db.utils import IntegrityError
|
|
from django.urls import reverse
|
|
from django.utils import timezone
|
|
from django.utils.functional import cached_property
|
|
from django_filters import rest_framework as filters
|
|
from rest_framework import mixins, status, viewsets
|
|
from rest_framework.decorators import action
|
|
from rest_framework.exceptions import NotFound
|
|
from rest_framework.filters import SearchFilter
|
|
from rest_framework.permissions import IsAuthenticated
|
|
from rest_framework.response import Response
|
|
from rest_framework.views import APIView
|
|
|
|
from apps.api.permissions import (
|
|
ALL_PERMISSION_CHOICES,
|
|
IsOwnerOrHasRBACPermissions,
|
|
LegacyAccessControlRole,
|
|
RBACPermission,
|
|
get_permission_from_permission_string,
|
|
user_is_authorized,
|
|
)
|
|
from apps.api.serializers.team import TeamSerializer
|
|
from apps.api.serializers.user import (
|
|
CurrentUserSerializer,
|
|
FilterUserSerializer,
|
|
UserHiddenFieldsSerializer,
|
|
UserIsCurrentlyOnCallSerializer,
|
|
UserSerializer,
|
|
)
|
|
from apps.api.throttlers import (
|
|
GetPhoneVerificationCodeThrottlerPerOrg,
|
|
GetPhoneVerificationCodeThrottlerPerUser,
|
|
TestCallThrottler,
|
|
VerifyPhoneNumberThrottlerPerOrg,
|
|
VerifyPhoneNumberThrottlerPerUser,
|
|
)
|
|
from apps.api.throttlers.test_call_throttler import TestPushThrottler
|
|
from apps.auth_token.auth import PluginAuthentication
|
|
from apps.auth_token.constants import SCHEDULE_EXPORT_TOKEN_NAME
|
|
from apps.auth_token.models import UserScheduleExportAuthToken
|
|
from apps.base.messaging import get_messaging_backend_from_id
|
|
from apps.base.utils import live_settings
|
|
from apps.mobile_app.auth import MobileAppAuthTokenAuthentication
|
|
from apps.mobile_app.demo_push import send_test_push
|
|
from apps.mobile_app.exceptions import DeviceNotSet
|
|
from apps.phone_notifications.exceptions import (
|
|
BaseFailed,
|
|
FailedToFinishVerification,
|
|
FailedToMakeCall,
|
|
FailedToStartVerification,
|
|
NumberAlreadyVerified,
|
|
NumberNotVerified,
|
|
ProviderNotSupports,
|
|
)
|
|
from apps.phone_notifications.phone_backend import PhoneBackend
|
|
from apps.schedules.ical_utils import get_cached_oncall_users_for_multiple_schedules
|
|
from apps.schedules.models import OnCallSchedule
|
|
from apps.telegram.client import TelegramClient
|
|
from apps.telegram.models import TelegramVerificationCode
|
|
from apps.user_management.models import Team, User
|
|
from common.api_helpers.exceptions import Conflict
|
|
from common.api_helpers.mixins import FilterSerializerMixin, PublicPrimaryKeyMixin
|
|
from common.api_helpers.paginators import HundredPageSizePaginator
|
|
from common.api_helpers.utils import create_engine_url
|
|
from common.insight_log import (
|
|
ChatOpsEvent,
|
|
ChatOpsTypePlug,
|
|
EntityEvent,
|
|
write_chatops_insight_log,
|
|
write_resource_insight_log,
|
|
)
|
|
from common.recaptcha import check_recaptcha_internal_api
|
|
|
|
logger = logging.getLogger(__name__)
|
|
IsOwnerOrHasUserSettingsAdminPermission = IsOwnerOrHasRBACPermissions([RBACPermission.Permissions.USER_SETTINGS_ADMIN])
|
|
IsOwnerOrHasUserSettingsReadPermission = IsOwnerOrHasRBACPermissions([RBACPermission.Permissions.USER_SETTINGS_READ])
|
|
|
|
|
|
UPCOMING_SHIFTS_DEFAULT_DAYS = 7
|
|
UPCOMING_SHIFTS_MAX_DAYS = 65
|
|
|
|
|
|
class CurrentUserView(APIView):
|
|
authentication_classes = (MobileAppAuthTokenAuthentication, PluginAuthentication)
|
|
permission_classes = (IsAuthenticated,)
|
|
|
|
def get(self, request):
|
|
context = {"request": self.request, "format": self.format_kwarg, "view": self}
|
|
|
|
if settings.IS_OPEN_SOURCE and live_settings.GRAFANA_CLOUD_NOTIFICATIONS_ENABLED:
|
|
from apps.oss_installation.models import CloudConnector, CloudUserIdentity
|
|
|
|
connector = CloudConnector.objects.first()
|
|
if connector is not None:
|
|
cloud_identities = list(CloudUserIdentity.objects.filter(email__in=[request.user.email]))
|
|
cloud_identities = {cloud_identity.email: cloud_identity for cloud_identity in cloud_identities}
|
|
context["cloud_identities"] = cloud_identities
|
|
context["connector"] = connector
|
|
|
|
serializer = CurrentUserSerializer(request.user, context=context)
|
|
return Response(serializer.data)
|
|
|
|
def put(self, request):
|
|
data = self.request.data
|
|
serializer = CurrentUserSerializer(request.user, data=data, context={"request": self.request})
|
|
serializer.is_valid(raise_exception=True)
|
|
serializer.save()
|
|
return Response(serializer.data)
|
|
|
|
|
|
class UserFilter(filters.FilterSet):
|
|
"""
|
|
https://django-filter.readthedocs.io/en/master/guide/rest_framework.html
|
|
"""
|
|
|
|
email = filters.CharFilter(field_name="email", lookup_expr="icontains")
|
|
# TODO: remove "roles" in next version
|
|
roles = filters.MultipleChoiceFilter(field_name="role", choices=LegacyAccessControlRole.choices())
|
|
permission = filters.ChoiceFilter(method="filter_by_permission", choices=ALL_PERMISSION_CHOICES)
|
|
|
|
class Meta:
|
|
model = User
|
|
# TODO: remove "roles" in next version
|
|
fields = ["email", "roles", "permission"]
|
|
|
|
def filter_by_permission(self, queryset, name, value):
|
|
rbac_permission = get_permission_from_permission_string(value)
|
|
if not rbac_permission:
|
|
# TODO: maybe raise a 400 here?
|
|
return queryset
|
|
|
|
return queryset.filter(
|
|
**User.build_permissions_query(rbac_permission, self.request.user.organization),
|
|
)
|
|
|
|
|
|
class UserView(
|
|
PublicPrimaryKeyMixin,
|
|
FilterSerializerMixin,
|
|
mixins.RetrieveModelMixin,
|
|
mixins.UpdateModelMixin,
|
|
mixins.ListModelMixin,
|
|
viewsets.GenericViewSet,
|
|
):
|
|
authentication_classes = (
|
|
MobileAppAuthTokenAuthentication,
|
|
PluginAuthentication,
|
|
)
|
|
|
|
permission_classes = (IsAuthenticated, RBACPermission)
|
|
|
|
rbac_permissions = {
|
|
"retrieve": [RBACPermission.Permissions.USER_SETTINGS_READ],
|
|
"timezone_options": [RBACPermission.Permissions.USER_SETTINGS_READ],
|
|
"check_availability": [RBACPermission.Permissions.USER_SETTINGS_READ],
|
|
"metadata": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
|
|
"list": [RBACPermission.Permissions.USER_SETTINGS_READ],
|
|
"update": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
|
|
"partial_update": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
|
|
"verify_number": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
|
|
"forget_number": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
|
|
"get_verification_code": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
|
|
"get_verification_call": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
|
|
"get_backend_verification_code": [RBACPermission.Permissions.USER_SETTINGS_READ],
|
|
"get_telegram_verification_code": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
|
|
"unlink_slack": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
|
|
"unlink_telegram": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
|
|
"unlink_backend": [RBACPermission.Permissions.USER_SETTINGS_READ],
|
|
"make_test_call": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
|
|
"send_test_push": [RBACPermission.Permissions.USER_SETTINGS_READ],
|
|
"send_test_sms": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
|
|
"export_token": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
|
|
"upcoming_shifts": [RBACPermission.Permissions.USER_SETTINGS_READ],
|
|
}
|
|
|
|
rbac_object_permissions = {
|
|
IsOwnerOrHasUserSettingsAdminPermission: [
|
|
"metadata",
|
|
"list",
|
|
"retrieve",
|
|
"update",
|
|
"partial_update",
|
|
"destroy",
|
|
"verify_number",
|
|
"forget_number",
|
|
"get_verification_code",
|
|
"get_verification_call",
|
|
"get_backend_verification_code",
|
|
"get_telegram_verification_code",
|
|
"unlink_slack",
|
|
"unlink_telegram",
|
|
"unlink_backend",
|
|
"make_test_call",
|
|
"send_test_sms",
|
|
"send_test_push",
|
|
"export_token",
|
|
"upcoming_shifts",
|
|
],
|
|
IsOwnerOrHasUserSettingsReadPermission: [
|
|
"check_availability",
|
|
],
|
|
}
|
|
|
|
filter_serializer_class = FilterUserSerializer
|
|
|
|
pagination_class = HundredPageSizePaginator
|
|
|
|
filter_backends = (SearchFilter, filters.DjangoFilterBackend)
|
|
# NB start search params
|
|
# '^' Starts-with search.
|
|
# '=' Exact matches.
|
|
# '@' Full-text search. (Currently only supported Django's MySQL backend.)
|
|
# '$' Regex search.
|
|
search_fields = (
|
|
"^email",
|
|
"^username",
|
|
"^slack_user_identity__cached_slack_login",
|
|
"^slack_user_identity__cached_name",
|
|
"^teams__name",
|
|
"=public_primary_key",
|
|
)
|
|
|
|
filterset_class = UserFilter
|
|
|
|
@cached_property
|
|
def schedules_with_oncall_users(self):
|
|
"""
|
|
The result of this method is cached and is reused for the whole lifetime of a request,
|
|
since self.get_serializer_context() is called multiple times for every instance in the queryset.
|
|
"""
|
|
return get_cached_oncall_users_for_multiple_schedules(self.request.user.organization.oncall_schedules.all())
|
|
|
|
def _get_is_currently_oncall_query_param(self) -> str:
|
|
return self.request.query_params.get("is_currently_oncall", "").lower()
|
|
|
|
def _is_currently_oncall_request(self) -> bool:
|
|
return self._get_is_currently_oncall_query_param() in ["true", "false", "all"]
|
|
|
|
def get_serializer_context(self):
|
|
context = super().get_serializer_context()
|
|
context.update(
|
|
{
|
|
"schedules_with_oncall_users": self.schedules_with_oncall_users
|
|
if self._is_currently_oncall_request()
|
|
else {}
|
|
}
|
|
)
|
|
return context
|
|
|
|
def get_serializer_class(self):
|
|
request = self.request
|
|
user = request.user
|
|
kwargs = self.kwargs
|
|
query_params = request.query_params
|
|
|
|
is_list_request = self.action in ["list"]
|
|
is_filters_request = query_params.get("filters", "false") == "true"
|
|
|
|
if is_list_request and is_filters_request:
|
|
return self.get_filter_serializer_class()
|
|
elif is_list_request and self._is_currently_oncall_request():
|
|
return UserIsCurrentlyOnCallSerializer
|
|
|
|
is_users_own_data = kwargs.get("pk") is not None and kwargs.get("pk") == user.public_primary_key
|
|
has_admin_permission = user_is_authorized(user, [RBACPermission.Permissions.USER_SETTINGS_ADMIN])
|
|
|
|
if is_users_own_data or has_admin_permission:
|
|
return UserSerializer
|
|
return UserHiddenFieldsSerializer
|
|
|
|
def get_queryset(self):
|
|
slack_identity = self.request.query_params.get("slack_identity", None) == "true"
|
|
|
|
queryset = User.objects.filter(organization=self.request.user.organization)
|
|
|
|
queryset = self.get_serializer_class().setup_eager_loading(queryset)
|
|
|
|
if slack_identity:
|
|
queryset = queryset.filter(slack_user_identity__isnull=False).distinct()
|
|
|
|
return queryset.order_by("id")
|
|
|
|
def list(self, request, *args, **kwargs) -> Response:
|
|
queryset = self.filter_queryset(self.get_queryset())
|
|
|
|
def _get_oncall_user_ids():
|
|
return {user.pk for _, users in self.schedules_with_oncall_users.items() for user in users}
|
|
|
|
paginate_results = True
|
|
|
|
if (is_currently_oncall_query_param := self._get_is_currently_oncall_query_param()) == "true":
|
|
# client explicitly wants to filter out users that are on-call
|
|
queryset = queryset.filter(pk__in=_get_oncall_user_ids())
|
|
elif is_currently_oncall_query_param == "false":
|
|
# user explicitly wants to filter out on-call users
|
|
queryset = queryset.exclude(pk__in=_get_oncall_user_ids())
|
|
elif is_currently_oncall_query_param == "all":
|
|
# return all users, don't paginate
|
|
paginate_results = False
|
|
|
|
context = self.get_serializer_context()
|
|
|
|
if paginate_results and (page := self.paginate_queryset(queryset)) is not None:
|
|
if settings.IS_OPEN_SOURCE and live_settings.GRAFANA_CLOUD_NOTIFICATIONS_ENABLED:
|
|
from apps.oss_installation.models import CloudConnector, CloudUserIdentity
|
|
|
|
if (connector := CloudConnector.objects.first()) is not None:
|
|
emails = list(queryset.values_list("email", flat=True))
|
|
cloud_identities = list(CloudUserIdentity.objects.filter(email__in=emails))
|
|
cloud_identities = {cloud_identity.email: cloud_identity for cloud_identity in cloud_identities}
|
|
context["cloud_identities"] = cloud_identities
|
|
context["connector"] = connector
|
|
|
|
serializer = self.get_serializer(page, many=True, context=context)
|
|
return self.get_paginated_response(serializer.data)
|
|
|
|
serializer = self.get_serializer(queryset, many=True, context=context)
|
|
return Response(serializer.data)
|
|
|
|
def retrieve(self, request, *args, **kwargs) -> Response:
|
|
context = self.get_serializer_context()
|
|
|
|
try:
|
|
instance = self.get_object()
|
|
except NotFound:
|
|
return self.wrong_team_response()
|
|
|
|
if settings.IS_OPEN_SOURCE and live_settings.GRAFANA_CLOUD_NOTIFICATIONS_ENABLED:
|
|
from apps.oss_installation.models import CloudConnector, CloudUserIdentity
|
|
|
|
connector = CloudConnector.objects.first()
|
|
if connector is not None:
|
|
cloud_identities = list(CloudUserIdentity.objects.filter(email__in=[instance.email]))
|
|
cloud_identities = {cloud_identity.email: cloud_identity for cloud_identity in cloud_identities}
|
|
context["cloud_identities"] = cloud_identities
|
|
context["connector"] = connector
|
|
|
|
serializer = self.get_serializer(instance, context=context)
|
|
return Response(serializer.data)
|
|
|
|
def wrong_team_response(self) -> Response:
|
|
"""
|
|
This method returns 403 and {"error_code": "wrong_team", "owner_team": {"name", "id", "email", "avatar_url"}}.
|
|
Used in case if a requested instance doesn't belong to user's current_team.
|
|
Used instead of TeamFilteringMixin because of m2m teams field (mixin doesn't work correctly with this)
|
|
and overridden retrieve method in UserView.
|
|
"""
|
|
queryset = User.objects.filter(organization=self.request.user.organization).order_by("id")
|
|
queryset = self.filter_queryset(queryset)
|
|
|
|
try:
|
|
queryset.get(public_primary_key=self.kwargs["pk"])
|
|
except ObjectDoesNotExist:
|
|
raise NotFound
|
|
|
|
general_team = Team(public_primary_key=None, name="General", email=None, avatar_url=None)
|
|
|
|
return Response(
|
|
data={"error_code": "wrong_team", "owner_team": TeamSerializer(general_team).data},
|
|
status=status.HTTP_403_FORBIDDEN,
|
|
)
|
|
|
|
def current(self, request) -> Response:
|
|
serializer = UserSerializer(self.get_queryset().get(pk=self.request.user.pk))
|
|
return Response(serializer.data)
|
|
|
|
@action(detail=False, methods=["get"])
|
|
def timezone_options(self, request) -> Response:
|
|
return Response(pytz.common_timezones)
|
|
|
|
@action(
|
|
detail=True,
|
|
methods=["get"],
|
|
throttle_classes=[GetPhoneVerificationCodeThrottlerPerUser, GetPhoneVerificationCodeThrottlerPerOrg],
|
|
)
|
|
def get_verification_code(self, request, pk) -> Response:
|
|
logger.info("get_verification_code: validating reCAPTCHA code")
|
|
valid = check_recaptcha_internal_api(request, "mobile_verification_code")
|
|
if not valid:
|
|
logger.warning("get_verification_code: invalid reCAPTCHA validation")
|
|
return Response("failed reCAPTCHA check", status=status.HTTP_400_BAD_REQUEST)
|
|
logger.info('get_verification_code: pass reCAPTCHA validation"')
|
|
|
|
user = self.get_object()
|
|
phone_backend = PhoneBackend()
|
|
try:
|
|
phone_backend.send_verification_sms(user)
|
|
except NumberAlreadyVerified:
|
|
return Response("Phone number already verified", status=status.HTTP_400_BAD_REQUEST)
|
|
except FailedToStartVerification as e:
|
|
return handle_phone_notificator_failed(e)
|
|
except ProviderNotSupports:
|
|
return Response(
|
|
"Phone provider not supports sms verification", status=status.HTTP_500_INTERNAL_SERVER_ERROR
|
|
)
|
|
return Response(status=status.HTTP_200_OK)
|
|
|
|
@action(
|
|
detail=True,
|
|
methods=["get"],
|
|
throttle_classes=[GetPhoneVerificationCodeThrottlerPerUser, GetPhoneVerificationCodeThrottlerPerOrg],
|
|
)
|
|
def get_verification_call(self, request, pk) -> Response:
|
|
logger.info("get_verification_code_via_call: validating reCAPTCHA code")
|
|
valid = check_recaptcha_internal_api(request, "mobile_verification_code")
|
|
if not valid:
|
|
logger.warning("get_verification_code_via_call: invalid reCAPTCHA validation")
|
|
return Response("failed reCAPTCHA check", status=status.HTTP_400_BAD_REQUEST)
|
|
logger.info('get_verification_code_via_call: pass reCAPTCHA validation"')
|
|
|
|
user = self.get_object()
|
|
phone_backend = PhoneBackend()
|
|
try:
|
|
phone_backend.make_verification_call(user)
|
|
except NumberAlreadyVerified:
|
|
return Response("Phone number already verified", status=status.HTTP_400_BAD_REQUEST)
|
|
except FailedToStartVerification as e:
|
|
return handle_phone_notificator_failed(e)
|
|
except ProviderNotSupports:
|
|
return Response(
|
|
"Phone provider not supports call verification", status=status.HTTP_500_INTERNAL_SERVER_ERROR
|
|
)
|
|
return Response(status=status.HTTP_200_OK)
|
|
|
|
@action(
|
|
detail=True,
|
|
methods=["put"],
|
|
throttle_classes=[VerifyPhoneNumberThrottlerPerUser, VerifyPhoneNumberThrottlerPerOrg],
|
|
)
|
|
def verify_number(self, request, pk) -> Response:
|
|
target_user = self.get_object()
|
|
code = request.query_params.get("token", None)
|
|
if not code:
|
|
return Response("Invalid verification code", status=status.HTTP_400_BAD_REQUEST)
|
|
prev_state = target_user.insight_logs_serialized
|
|
|
|
phone_backend = PhoneBackend()
|
|
try:
|
|
verified = phone_backend.verify_phone_number(target_user, code)
|
|
except FailedToFinishVerification as e:
|
|
return handle_phone_notificator_failed(e)
|
|
if verified:
|
|
new_state = target_user.insight_logs_serialized
|
|
write_resource_insight_log(
|
|
instance=target_user,
|
|
author=self.request.user,
|
|
event=EntityEvent.UPDATED,
|
|
prev_state=prev_state,
|
|
new_state=new_state,
|
|
)
|
|
return Response(status=status.HTTP_200_OK)
|
|
else:
|
|
return Response("Verification code is not correct", status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
@action(detail=True, methods=["put"])
|
|
def forget_number(self, request, pk) -> Response:
|
|
target_user = self.get_object()
|
|
prev_state = target_user.insight_logs_serialized
|
|
|
|
phone_backend = PhoneBackend()
|
|
removed = phone_backend.forget_number(target_user)
|
|
|
|
if removed:
|
|
new_state = target_user.insight_logs_serialized
|
|
write_resource_insight_log(
|
|
instance=target_user,
|
|
author=self.request.user,
|
|
event=EntityEvent.UPDATED,
|
|
prev_state=prev_state,
|
|
new_state=new_state,
|
|
)
|
|
return Response(status=status.HTTP_200_OK)
|
|
|
|
@action(detail=True, methods=["post"], throttle_classes=[TestCallThrottler])
|
|
def make_test_call(self, request, pk) -> Response:
|
|
user = self.get_object()
|
|
try:
|
|
phone_backend = PhoneBackend()
|
|
phone_backend.make_test_call(user)
|
|
except NumberNotVerified:
|
|
return Response("Phone number is not verified", status=status.HTTP_400_BAD_REQUEST)
|
|
except FailedToMakeCall as e:
|
|
return handle_phone_notificator_failed(e)
|
|
except ProviderNotSupports:
|
|
return Response("Phone provider not supports phone calls", status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
return Response(status=status.HTTP_200_OK)
|
|
|
|
@action(detail=True, methods=["post"], throttle_classes=[TestCallThrottler])
|
|
def send_test_sms(self, request, pk) -> Response:
|
|
user = self.get_object()
|
|
try:
|
|
phone_backend = PhoneBackend()
|
|
phone_backend.send_test_sms(user)
|
|
except NumberNotVerified:
|
|
return Response("Phone number is not verified", status=status.HTTP_400_BAD_REQUEST)
|
|
except FailedToMakeCall as e:
|
|
return handle_phone_notificator_failed(e)
|
|
except ProviderNotSupports:
|
|
return Response("Phone provider not supports phone calls", status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
|
|
return Response(status=status.HTTP_200_OK)
|
|
|
|
@action(detail=True, methods=["post"], throttle_classes=[TestPushThrottler])
|
|
def send_test_push(self, request, pk) -> Response:
|
|
user = self.get_object()
|
|
critical = request.query_params.get("critical", "false") == "true"
|
|
|
|
try:
|
|
send_test_push(user, critical)
|
|
except DeviceNotSet:
|
|
return Response(
|
|
data="Mobile device not connected",
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
except Exception as e:
|
|
logger.info(f"UserView.send_test_push: Unable to send test push due to {e}")
|
|
return Response(
|
|
data="Something went wrong while sending a test push", status=status.HTTP_500_INTERNAL_SERVER_ERROR
|
|
)
|
|
return Response(status=status.HTTP_200_OK)
|
|
|
|
@action(detail=True, methods=["get"])
|
|
def get_backend_verification_code(self, request, pk) -> Response:
|
|
user = self.get_object()
|
|
|
|
backend_id = request.query_params.get("backend")
|
|
backend = get_messaging_backend_from_id(backend_id)
|
|
if backend is None:
|
|
return Response(status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
code = backend.generate_user_verification_code(user)
|
|
return Response(code)
|
|
|
|
@action(detail=True, methods=["get"])
|
|
def get_telegram_verification_code(self, request, pk) -> Response:
|
|
user = self.get_object()
|
|
|
|
if not user.is_telegram_connected:
|
|
return Response(status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
try:
|
|
existing_verification_code = user.telegram_verification_code
|
|
existing_verification_code.delete()
|
|
except TelegramVerificationCode.DoesNotExist:
|
|
pass
|
|
|
|
new_code = TelegramVerificationCode(user=user)
|
|
new_code.save()
|
|
|
|
telegram_client = TelegramClient()
|
|
bot_username = telegram_client.api_client.username
|
|
bot_link = f"https://t.me/{bot_username}"
|
|
|
|
return Response(
|
|
{"telegram_code": str(new_code.uuid_with_org_uuid), "bot_link": bot_link}, status=status.HTTP_200_OK
|
|
)
|
|
|
|
@action(detail=True, methods=["post"])
|
|
def unlink_slack(self, request, pk) -> Response:
|
|
user = self.get_object()
|
|
user.slack_user_identity = None
|
|
user.save(update_fields=["slack_user_identity"])
|
|
write_chatops_insight_log(
|
|
author=request.user,
|
|
event_name=ChatOpsEvent.USER_UNLINKED,
|
|
chatops_type=ChatOpsTypePlug.SLACK.value,
|
|
linked_user=user.username,
|
|
linked_user_id=user.public_primary_key,
|
|
)
|
|
return Response(status=status.HTTP_200_OK)
|
|
|
|
@action(detail=True, methods=["post"])
|
|
def unlink_telegram(self, request, pk) -> Response:
|
|
user = self.get_object()
|
|
from apps.telegram.models import TelegramToUserConnector
|
|
|
|
try:
|
|
connector = TelegramToUserConnector.objects.get(user=user)
|
|
connector.delete()
|
|
write_chatops_insight_log(
|
|
author=request.user,
|
|
event_name=ChatOpsEvent.USER_UNLINKED,
|
|
chatops_type=ChatOpsTypePlug.TELEGRAM.value,
|
|
linked_user=user.username,
|
|
linked_user_id=user.public_primary_key,
|
|
)
|
|
except TelegramToUserConnector.DoesNotExist:
|
|
return Response(status=status.HTTP_400_BAD_REQUEST)
|
|
return Response(status=status.HTTP_200_OK)
|
|
|
|
@action(detail=True, methods=["post"])
|
|
def unlink_backend(self, request, pk) -> Response:
|
|
# TODO: insight logs support
|
|
user = self.get_object()
|
|
|
|
backend_id = request.query_params.get("backend")
|
|
backend = get_messaging_backend_from_id(backend_id)
|
|
if backend is None:
|
|
return Response(status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
try:
|
|
backend.unlink_user(user)
|
|
write_chatops_insight_log(
|
|
author=request.user,
|
|
event_name=ChatOpsEvent.USER_UNLINKED,
|
|
chatops_type=backend.backend_id,
|
|
linked_user=user.username,
|
|
linked_user_id=user.public_primary_key,
|
|
)
|
|
except ObjectDoesNotExist:
|
|
return Response(status=status.HTTP_400_BAD_REQUEST)
|
|
return Response(status=status.HTTP_200_OK)
|
|
|
|
@action(detail=True, methods=["get"])
|
|
def upcoming_shifts(self, request, pk) -> Response:
|
|
user = self.get_object()
|
|
try:
|
|
days = int(request.query_params.get("days", UPCOMING_SHIFTS_DEFAULT_DAYS))
|
|
except ValueError:
|
|
return Response(status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
if days <= 0 or days > UPCOMING_SHIFTS_MAX_DAYS:
|
|
return Response(status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
now = timezone.now()
|
|
# filter user-related schedules
|
|
schedules = OnCallSchedule.objects.related_to_user(user)
|
|
|
|
# check upcoming shifts
|
|
upcoming = []
|
|
for schedule in schedules:
|
|
_, current_shifts, upcoming_shifts = schedule.shifts_for_user(user, datetime_start=now, days=days)
|
|
if current_shifts or upcoming_shifts:
|
|
upcoming.append(
|
|
{
|
|
"schedule_id": schedule.public_primary_key,
|
|
"schedule_name": schedule.name,
|
|
"is_oncall": len(current_shifts) > 0,
|
|
"current_shift": current_shifts[0] if current_shifts else None,
|
|
"next_shift": upcoming_shifts[0] if upcoming_shifts else None,
|
|
}
|
|
)
|
|
|
|
# sort entries by start timestamp
|
|
def sorting_key(entry):
|
|
shift = entry["current_shift"] if entry["current_shift"] else entry["next_shift"]
|
|
return shift["start"]
|
|
|
|
upcoming.sort(key=sorting_key)
|
|
|
|
return Response(upcoming, status=status.HTTP_200_OK)
|
|
|
|
@action(detail=True, methods=["get", "post", "delete"])
|
|
def export_token(self, request, pk) -> Response:
|
|
user = self.get_object()
|
|
|
|
if self.request.method == "GET":
|
|
try:
|
|
token = UserScheduleExportAuthToken.objects.get(user=user)
|
|
except UserScheduleExportAuthToken.DoesNotExist:
|
|
raise NotFound
|
|
|
|
response = {
|
|
"created_at": token.created_at,
|
|
"revoked_at": token.revoked_at,
|
|
"active": token.active,
|
|
}
|
|
return Response(response, status=status.HTTP_200_OK)
|
|
|
|
if self.request.method == "POST":
|
|
try:
|
|
instance, token = UserScheduleExportAuthToken.create_auth_token(user, user.organization)
|
|
write_resource_insight_log(instance=instance, author=self.request.user, event=EntityEvent.CREATED)
|
|
except IntegrityError:
|
|
raise Conflict("Schedule export token for user already exists")
|
|
|
|
export_url = create_engine_url(
|
|
reverse("api-public:users-schedule-export", kwargs={"pk": user.public_primary_key})
|
|
+ f"?{SCHEDULE_EXPORT_TOKEN_NAME}={token}"
|
|
)
|
|
|
|
data = {"token": token, "created_at": instance.created_at, "export_url": export_url}
|
|
return Response(data, status=status.HTTP_201_CREATED)
|
|
|
|
if self.request.method == "DELETE":
|
|
try:
|
|
token = UserScheduleExportAuthToken.objects.get(user=user)
|
|
write_resource_insight_log(instance=token, author=self.request.user, event=EntityEvent.DELETED)
|
|
token.delete()
|
|
except UserScheduleExportAuthToken.DoesNotExist:
|
|
raise NotFound
|
|
return Response(status=status.HTTP_204_NO_CONTENT)
|
|
return Response(status=status.HTTP_405_METHOD_NOT_ALLOWED)
|
|
|
|
|
|
def handle_phone_notificator_failed(exc: BaseFailed) -> Response:
|
|
if exc.graceful_msg:
|
|
return Response(exc.graceful_msg, status=status.HTTP_400_BAD_REQUEST)
|
|
else:
|
|
return Response("Something went wrong", status=status.HTTP_503_SERVICE_UNAVAILABLE)
|