import logging from urllib.parse import urljoin from django.apps import apps from django.conf import settings from django.core.exceptions import ObjectDoesNotExist from django.db.utils import IntegrityError from django.urls import reverse from django_filters import rest_framework as filters from rest_framework import mixins, status, viewsets from rest_framework.decorators import action from rest_framework.exceptions import NotFound from rest_framework.filters import SearchFilter from rest_framework.permissions import IsAuthenticated from rest_framework.response import Response from rest_framework.views import APIView from apps.api.permissions import ( MODIFY_ACTIONS, READ_ACTIONS, ActionPermission, AnyRole, IsAdminOrEditor, IsOwnerOrAdmin, ) from apps.api.serializers.user import FilterUserSerializer, UserHiddenFieldsSerializer, UserSerializer from apps.auth_token.auth import ( MobileAppAuthTokenAuthentication, MobileAppVerificationTokenAuthentication, PluginAuthentication, ) from apps.auth_token.constants import SCHEDULE_EXPORT_TOKEN_NAME from apps.auth_token.models import UserScheduleExportAuthToken from apps.auth_token.models.mobile_app_auth_token import MobileAppAuthToken from apps.auth_token.models.mobile_app_verification_token import MobileAppVerificationToken from apps.base.messaging import get_messaging_backend_from_id from apps.base.utils import live_settings from apps.telegram.client import TelegramClient from apps.telegram.models import TelegramVerificationCode from apps.twilioapp.phone_manager import PhoneManager from apps.twilioapp.twilio_client import twilio_client from apps.user_management.models import User from apps.user_management.organization_log_creator import OrganizationLogType, create_organization_log from common.api_helpers.exceptions import Conflict from common.api_helpers.mixins import FilterSerializerMixin, PublicPrimaryKeyMixin from common.api_helpers.paginators import HundredPageSizePaginator from common.constants.role import Role logger = logging.getLogger(__name__) class CurrentUserView(APIView): authentication_classes = ( MobileAppAuthTokenAuthentication, PluginAuthentication, ) permission_classes = (IsAuthenticated,) def get(self, request): context = {"request": self.request, "format": self.format_kwarg, "view": self} if settings.OSS_INSTALLATION and live_settings.GRAFANA_CLOUD_NOTIFICATIONS_ENABLED: from apps.oss_installation.models import CloudConnector, CloudUserIdentity connector = CloudConnector.objects.first() if connector is not None: cloud_identities = list(CloudUserIdentity.objects.filter(email__in=[request.user.email])) cloud_identities = {cloud_identity.email: cloud_identity for cloud_identity in cloud_identities} context["cloud_identities"] = cloud_identities context["connector"] = connector serializer = UserSerializer(request.user, context=context) return Response(serializer.data) def put(self, request): serializer = UserSerializer(request.user, data=self.request.data, context={"request": self.request}) serializer.is_valid(raise_exception=True) serializer.save() return Response(serializer.data) class UserFilter(filters.FilterSet): """ https://django-filter.readthedocs.io/en/master/guide/rest_framework.html """ email = filters.CharFilter(field_name="email", lookup_expr="icontains") roles = filters.MultipleChoiceFilter(field_name="role", choices=Role.choices()) class Meta: model = User fields = ["email", "roles"] class UserView( PublicPrimaryKeyMixin, FilterSerializerMixin, mixins.RetrieveModelMixin, mixins.UpdateModelMixin, mixins.ListModelMixin, viewsets.GenericViewSet, ): authentication_classes = (PluginAuthentication,) permission_classes = (IsAuthenticated, ActionPermission) # Non-admin users are allowed to list and retrieve users # The overridden get_serializer_class will return # another Serializer for non-admin users with sensitive information hidden action_permissions = { IsAdminOrEditor: ( *MODIFY_ACTIONS, "list", "metadata", "verify_number", "forget_number", "get_verification_code", "get_backend_verification_code", "get_telegram_verification_code", "unlink_telegram", "unlink_backend", "make_test_call", "export_token", "mobile_app_verification_token", "mobile_app_auth_token", ), AnyRole: ("retrieve",), } action_object_permissions = { IsOwnerOrAdmin: ( *MODIFY_ACTIONS, *READ_ACTIONS, "verify_number", "forget_number", "get_verification_code", "get_backend_verification_code", "get_telegram_verification_code", "unlink_telegram", "unlink_backend", "make_test_call", "export_token", "mobile_app_verification_token", "mobile_app_auth_token", ), } filter_serializer_class = FilterUserSerializer pagination_class = HundredPageSizePaginator filter_backends = (SearchFilter, filters.DjangoFilterBackend) # NB start search params # '^' Starts-with search. # '=' Exact matches. # '@' Full-text search. (Currently only supported Django's MySQL backend.) # '$' Regex search. search_fields = ( "^email", "^username", "^slack_user_identity__cached_slack_login", "^slack_user_identity__cached_name", ) filterset_class = UserFilter def get_serializer_class(self): is_filters_request = self.request.query_params.get("filters", "false") == "true" if self.action in ["list"] and is_filters_request: return self.get_filter_serializer_class() is_user_retrieves_own_data = ( self.action == "retrieve" and self.kwargs.get("pk") is not None and self.kwargs.get("pk") == self.request.user.public_primary_key ) if is_user_retrieves_own_data or self.request.user.role == Role.ADMIN: return UserSerializer return UserHiddenFieldsSerializer def get_queryset(self): slack_identity = self.request.query_params.get("slack_identity", None) == "true" queryset = User.objects.filter(organization=self.request.user.organization) if self.request.user.current_team is not None: queryset = queryset.filter(teams=self.request.user.current_team).distinct() queryset = self.get_serializer_class().setup_eager_loading(queryset) if slack_identity: queryset = queryset.filter(slack_user_identity__isnull=False).distinct() return queryset.order_by("id") def list(self, request, *args, **kwargs): queryset = self.filter_queryset(self.get_queryset()) page = self.paginate_queryset(queryset) if page is not None: context = {"request": self.request, "format": self.format_kwarg, "view": self} if settings.OSS_INSTALLATION: if live_settings.GRAFANA_CLOUD_NOTIFICATIONS_ENABLED: from apps.oss_installation.models import CloudConnector, CloudUserIdentity connector = CloudConnector.objects.first() if connector is not None: emails = list(queryset.values_list("email", flat=True)) cloud_identities = list(CloudUserIdentity.objects.filter(email__in=emails)) cloud_identities = {cloud_identity.email: cloud_identity for cloud_identity in cloud_identities} context["cloud_identities"] = cloud_identities context["connector"] = connector serializer = self.get_serializer(page, many=True, context=context) return self.get_paginated_response(serializer.data) serializer = self.get_serializer(queryset, many=True) return Response(serializer.data) def retrieve(self, request, *args, **kwargs): context = {"request": self.request, "format": self.format_kwarg, "view": self} instance = self.get_object() if settings.OSS_INSTALLATION and live_settings.GRAFANA_CLOUD_NOTIFICATIONS_ENABLED: from apps.oss_installation.models import CloudConnector, CloudUserIdentity connector = CloudConnector.objects.first() if connector is not None: cloud_identities = list(CloudUserIdentity.objects.filter(email__in=[instance.email])) cloud_identities = {cloud_identity.email: cloud_identity for cloud_identity in cloud_identities} context["cloud_identities"] = cloud_identities context["connector"] = connector serializer = self.get_serializer(instance, context=context) return Response(serializer.data) def current(self, request): serializer = UserSerializer(self.get_queryset().get(pk=self.request.user.pk)) return Response(serializer.data) @action(detail=True, methods=["get"]) def get_verification_code(self, request, pk): user = self.get_object() phone_manager = PhoneManager(user) code_sent = phone_manager.send_verification_code() if not code_sent: return Response(status=status.HTTP_400_BAD_REQUEST) return Response(status=status.HTTP_200_OK) @action(detail=True, methods=["put"]) def verify_number(self, request, pk): target_user = self.get_object() code = request.query_params.get("token", None) old_state = target_user.repr_settings_for_client_side_logging phone_manager = PhoneManager(target_user) verified, error = phone_manager.verify_phone_number(code) if not verified: return Response(error, status=status.HTTP_400_BAD_REQUEST) organization = request.auth.organization new_state = target_user.repr_settings_for_client_side_logging description = f"User settings for user {target_user.username} was changed from:\n{old_state}\nto:\n{new_state}" create_organization_log( organization, request.user, OrganizationLogType.TYPE_USER_SETTINGS_CHANGED, description, ) return Response(status=status.HTTP_200_OK) @action(detail=True, methods=["put"]) def forget_number(self, request, pk): target_user = self.get_object() old_state = target_user.repr_settings_for_client_side_logging phone_manager = PhoneManager(target_user) forget = phone_manager.forget_phone_number() if forget: organization = request.auth.organization new_state = target_user.repr_settings_for_client_side_logging description = ( f"User settings for user {target_user.username} was changed from:\n{old_state}\nto:\n{new_state}" ) create_organization_log( organization, request.user, OrganizationLogType.TYPE_USER_SETTINGS_CHANGED, description, ) return Response(status=status.HTTP_200_OK) @action(detail=True, methods=["post"]) def make_test_call(self, request, pk): user = self.get_object() phone_number = user.verified_phone_number if phone_number is None: return Response(status=status.HTTP_400_BAD_REQUEST) try: twilio_client.make_test_call(to=phone_number) except Exception as e: logger.error(f"Unable to make a test call due to {e}") return Response( data="Something went wrong while making a test call", status=status.HTTP_500_INTERNAL_SERVER_ERROR ) return Response(status=status.HTTP_200_OK) @action(detail=True, methods=["get"]) def get_backend_verification_code(self, request, pk): backend_id = request.query_params.get("backend") backend = get_messaging_backend_from_id(backend_id) if backend is None: return Response(status=status.HTTP_400_BAD_REQUEST) user = self.get_object() code = backend.generate_user_verification_code(user) return Response(code) @action(detail=True, methods=["get"]) def get_telegram_verification_code(self, request, pk): user = self.get_object() if not user.is_telegram_connected: return Response(status=status.HTTP_400_BAD_REQUEST) try: existing_verification_code = user.telegram_verification_code existing_verification_code.delete() except TelegramVerificationCode.DoesNotExist: pass new_code = TelegramVerificationCode(user=user) new_code.save() telegram_client = TelegramClient() bot_username = telegram_client.api_client.username bot_link = f"https://t.me/{bot_username}" return Response({"telegram_code": str(new_code.uuid), "bot_link": bot_link}, status=status.HTTP_200_OK) @action(detail=True, methods=["post"]) def unlink_telegram(self, request, pk): user = self.get_object() TelegramToUserConnector = apps.get_model("telegram", "TelegramToUserConnector") try: connector = TelegramToUserConnector.objects.get(user=user) connector.delete() except TelegramToUserConnector.DoesNotExist: return Response(status=status.HTTP_400_BAD_REQUEST) description = f"Telegram account of user {user.username} was disconnected" create_organization_log( user.organization, user, OrganizationLogType.TYPE_TELEGRAM_FROM_USER_DISCONNECTED, description, ) return Response(status=status.HTTP_200_OK) @action(detail=True, methods=["post"]) def unlink_backend(self, request, pk): backend_id = request.query_params.get("backend") backend = get_messaging_backend_from_id(backend_id) if backend is None: return Response(status=status.HTTP_400_BAD_REQUEST) user = self.get_object() try: backend.unlink_user(user) except ObjectDoesNotExist: return Response(status=status.HTTP_400_BAD_REQUEST) description = f"{backend.label} account of user {user.username} was disconnected" create_organization_log( user.organization, user, OrganizationLogType.TYPE_MESSAGING_BACKEND_USER_DISCONNECTED, description, ) return Response(status=status.HTTP_200_OK) @action(detail=True, methods=["get", "post", "delete"]) def export_token(self, request, pk): user = self.get_object() if self.request.method == "GET": try: token = UserScheduleExportAuthToken.objects.get(user=user) except UserScheduleExportAuthToken.DoesNotExist: raise NotFound response = { "created_at": token.created_at, "revoked_at": token.revoked_at, "active": token.active, } return Response(response, status=status.HTTP_200_OK) if self.request.method == "POST": try: instance, token = UserScheduleExportAuthToken.create_auth_token(user, user.organization) except IntegrityError: raise Conflict("Schedule export token for user already exists") export_url = urljoin( settings.BASE_URL, reverse("api-public:users-schedule-export", kwargs={"pk": user.public_primary_key}) + f"?{SCHEDULE_EXPORT_TOKEN_NAME}={token}", ) data = {"token": token, "created_at": instance.created_at, "export_url": export_url} return Response(data, status=status.HTTP_201_CREATED) if self.request.method == "DELETE": try: token = UserScheduleExportAuthToken.objects.get(user=user) token.delete() except UserScheduleExportAuthToken.DoesNotExist: raise NotFound return Response(status=status.HTTP_204_NO_CONTENT) @action(detail=True, methods=["get", "post", "delete"]) def mobile_app_verification_token(self, request, pk): DynamicSetting = apps.get_model("base", "DynamicSetting") if not settings.MOBILE_APP_PUSH_NOTIFICATIONS_ENABLED: return Response(status=status.HTTP_404_NOT_FOUND) mobile_app_settings = DynamicSetting.objects.get_or_create( name="mobile_app_settings", defaults={ "json_value": { "org_ids": [], } }, )[0] if self.request.auth.organization.pk not in mobile_app_settings.json_value["org_ids"]: return Response(status=status.HTTP_404_NOT_FOUND) user = self.get_object() if self.request.method == "GET": try: token = MobileAppVerificationToken.objects.get(user=user) except MobileAppVerificationToken.DoesNotExist: raise NotFound response = { "token_id": token.id, "user_id": token.user_id, "organization_id": token.organization_id, "created_at": token.created_at, "revoked_at": token.revoked_at, } return Response(response, status=status.HTTP_200_OK) if self.request.method == "POST": # If token already exists revoke it try: token = MobileAppVerificationToken.objects.get(user=user) token.delete() except MobileAppVerificationToken.DoesNotExist: pass instance, token = MobileAppVerificationToken.create_auth_token(user, user.organization) data = {"id": instance.pk, "token": token, "created_at": instance.created_at} return Response(data, status=status.HTTP_201_CREATED) if self.request.method == "DELETE": try: token = MobileAppVerificationToken.objects.get(user=user) token.delete() except MobileAppVerificationToken.DoesNotExist: raise NotFound return Response(status=status.HTTP_204_NO_CONTENT) @action( methods=["get", "post", "delete"], detail=False, authentication_classes=(MobileAppVerificationTokenAuthentication,), ) def mobile_app_auth_token(self, request): DynamicSetting = apps.get_model("base", "DynamicSetting") if not settings.MOBILE_APP_PUSH_NOTIFICATIONS_ENABLED: return Response(status=status.HTTP_404_NOT_FOUND) mobile_app_settings = DynamicSetting.objects.get_or_create( name="mobile_app_settings", defaults={ "json_value": { "org_ids": [], } }, )[0] if self.request.auth.organization.pk not in mobile_app_settings.json_value["org_ids"]: return Response(status=status.HTTP_404_NOT_FOUND) if self.request.method == "GET": try: token = MobileAppAuthToken.objects.get(user=self.request.user) except MobileAppAuthToken.DoesNotExist: raise NotFound response = { "token_id": token.id, "user_id": token.user_id, "organization_id": token.organization_id, "created_at": token.created_at, "revoked_at": token.revoked_at, } return Response(response, status=status.HTTP_200_OK) if self.request.method == "POST": # If token already exists revoke it try: token = MobileAppAuthToken.objects.get(user=self.request.user) token.delete() except MobileAppAuthToken.DoesNotExist: pass instance, token = MobileAppAuthToken.create_auth_token(self.request.user, self.request.user.organization) data = {"id": instance.pk, "token": token, "created_at": instance.created_at} return Response(data, status=status.HTTP_201_CREATED) if self.request.method == "DELETE": try: token = MobileAppAuthToken.objects.get(user=self.request.user) token.delete() except MobileAppVerificationToken.DoesNotExist: raise NotFound return Response(status=status.HTTP_204_NO_CONTENT)