oncall-engine/engine/apps/api/views/user.py
Innokentii Konstantinov 1f786e8d2a
Phone provider refactoring (#1713)
# What this PR does
This PR moves phone notification logic into separate object PhoneBackend
and introduces PhoneProvider interface to hide actual implementation of
external phone services provider. It should allow add new phone
providers just by implementing one class (See SimplePhoneProvider for
example).
# Why 
[Asterisk PR](https://github.com/grafana/oncall/pull/1282) showed that
our phone notification system is not flexible. However this is one of
the most frequent community questions - how to add "X" phone provider.
Also, this refactoring move us one step closer to unifying all
notification backends, since with PhoneBackend all phone notification
logic is collected in one place and independent from concrete
realisation.
# Highligts
1. PhoneBackend object - contains all phone notifications business
logic.
2. PhoneProvider - interface to  external phone services provider.
3. TwilioPhoneProvider and SimplePhoneProvider - two examples of
PhoneProvider implementation.
4. PhoneCallRecord and SMSRecord models. I introduced these models to
keep phone notification limits logic decoupled from external providers.
Existing TwilioPhoneCall and TwilioSMS objects will be migrated to the
new table to not to reset limits counter. To be able to receive status
callbacks and gather from Twilio TwilioPhoneCall and TwilioSMS still
exists, but they are linked to PhoneCallRecord and SMSRecord via fk, to
not to leat twilio logic into core code.

---------

Co-authored-by: Yulia Shanyrova <yulia.shanyrova@grafana.com>
2023-05-24 06:27:48 +00:00

652 lines
27 KiB
Python

import logging
import pytz
from django.apps import apps
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.db.utils import IntegrityError
from django.urls import reverse
from django_filters import rest_framework as filters
from rest_framework import mixins, status, viewsets
from rest_framework.decorators import action
from rest_framework.exceptions import NotFound
from rest_framework.filters import SearchFilter
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.views import APIView
from apps.alerts.paging import check_user_availability
from apps.api.permissions import (
ALL_PERMISSION_CHOICES,
IsOwnerOrHasRBACPermissions,
LegacyAccessControlRole,
RBACPermission,
get_permission_from_permission_string,
user_is_authorized,
)
from apps.api.serializers.team import TeamSerializer
from apps.api.serializers.user import FilterUserSerializer, UserHiddenFieldsSerializer, UserSerializer
from apps.api.throttlers import (
GetPhoneVerificationCodeThrottlerPerOrg,
GetPhoneVerificationCodeThrottlerPerUser,
TestCallThrottler,
VerifyPhoneNumberThrottlerPerOrg,
VerifyPhoneNumberThrottlerPerUser,
)
from apps.api.throttlers.test_call_throttler import TestPushThrottler
from apps.auth_token.auth import PluginAuthentication
from apps.auth_token.constants import SCHEDULE_EXPORT_TOKEN_NAME
from apps.auth_token.models import UserScheduleExportAuthToken
from apps.base.messaging import get_messaging_backend_from_id
from apps.base.utils import live_settings
from apps.mobile_app.auth import MobileAppAuthTokenAuthentication
from apps.mobile_app.demo_push import send_test_push
from apps.mobile_app.exceptions import DeviceNotSet
from apps.phone_notifications.exceptions import (
FailedToFinishVerification,
FailedToMakeCall,
FailedToStartVerification,
NumberAlreadyVerified,
NumberNotVerified,
ProviderNotSupports,
)
from apps.phone_notifications.phone_backend import PhoneBackend
from apps.schedules.models import OnCallSchedule
from apps.telegram.client import TelegramClient
from apps.telegram.models import TelegramVerificationCode
from apps.user_management.models import Team, User
from common.api_helpers.exceptions import Conflict
from common.api_helpers.mixins import FilterSerializerMixin, PublicPrimaryKeyMixin
from common.api_helpers.paginators import HundredPageSizePaginator
from common.api_helpers.utils import create_engine_url
from common.insight_log import (
ChatOpsEvent,
ChatOpsTypePlug,
EntityEvent,
write_chatops_insight_log,
write_resource_insight_log,
)
from common.recaptcha import check_recaptcha_internal_api
logger = logging.getLogger(__name__)
IsOwnerOrHasUserSettingsAdminPermission = IsOwnerOrHasRBACPermissions([RBACPermission.Permissions.USER_SETTINGS_ADMIN])
IsOwnerOrHasUserSettingsReadPermission = IsOwnerOrHasRBACPermissions([RBACPermission.Permissions.USER_SETTINGS_READ])
UPCOMING_SHIFTS_DEFAULT_DAYS = 7
UPCOMING_SHIFTS_MAX_DAYS = 65
class CurrentUserView(APIView):
authentication_classes = (
MobileAppAuthTokenAuthentication,
PluginAuthentication,
)
permission_classes = (IsAuthenticated,)
def get(self, request):
context = {"request": self.request, "format": self.format_kwarg, "view": self}
if settings.IS_OPEN_SOURCE and live_settings.GRAFANA_CLOUD_NOTIFICATIONS_ENABLED:
from apps.oss_installation.models import CloudConnector, CloudUserIdentity
connector = CloudConnector.objects.first()
if connector is not None:
cloud_identities = list(CloudUserIdentity.objects.filter(email__in=[request.user.email]))
cloud_identities = {cloud_identity.email: cloud_identity for cloud_identity in cloud_identities}
context["cloud_identities"] = cloud_identities
context["connector"] = connector
serializer = UserSerializer(request.user, context=context)
return Response(serializer.data)
def put(self, request):
data = self.request.data
serializer = UserSerializer(request.user, data=data, context={"request": self.request})
serializer.is_valid(raise_exception=True)
serializer.save()
return Response(serializer.data)
class UserFilter(filters.FilterSet):
"""
https://django-filter.readthedocs.io/en/master/guide/rest_framework.html
"""
email = filters.CharFilter(field_name="email", lookup_expr="icontains")
# TODO: remove "roles" in next version
roles = filters.MultipleChoiceFilter(field_name="role", choices=LegacyAccessControlRole.choices())
permission = filters.ChoiceFilter(method="filter_by_permission", choices=ALL_PERMISSION_CHOICES)
class Meta:
model = User
# TODO: remove "roles" in next version
fields = ["email", "roles", "permission"]
def filter_by_permission(self, queryset, name, value):
rbac_permission = get_permission_from_permission_string(value)
if not rbac_permission:
# TODO: maybe raise a 400 here?
return queryset
return queryset.filter(
**User.build_permissions_query(rbac_permission, self.request.user.organization),
)
class UserView(
PublicPrimaryKeyMixin,
FilterSerializerMixin,
mixins.RetrieveModelMixin,
mixins.UpdateModelMixin,
mixins.ListModelMixin,
viewsets.GenericViewSet,
):
authentication_classes = (
MobileAppAuthTokenAuthentication,
PluginAuthentication,
)
permission_classes = (IsAuthenticated, RBACPermission)
rbac_permissions = {
"retrieve": [RBACPermission.Permissions.USER_SETTINGS_READ],
"timezone_options": [RBACPermission.Permissions.USER_SETTINGS_READ],
"check_availability": [RBACPermission.Permissions.USER_SETTINGS_READ],
"metadata": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
"list": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
"update": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
"partial_update": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
"verify_number": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
"forget_number": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
"get_verification_code": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
"get_verification_call": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
"get_backend_verification_code": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
"get_telegram_verification_code": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
"unlink_slack": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
"unlink_telegram": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
"unlink_backend": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
"make_test_call": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
"send_test_push": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
"send_test_sms": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
"export_token": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
"upcoming_shifts": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
}
rbac_object_permissions = {
IsOwnerOrHasUserSettingsAdminPermission: [
"metadata",
"list",
"retrieve",
"update",
"partial_update",
"destroy",
"verify_number",
"forget_number",
"get_verification_code",
"get_verification_call",
"get_backend_verification_code",
"get_telegram_verification_code",
"unlink_slack",
"unlink_telegram",
"unlink_backend",
"make_test_call",
"send_test_sms",
"send_test_push",
"export_token",
"upcoming_shifts",
],
IsOwnerOrHasUserSettingsReadPermission: [
"check_availability",
],
}
filter_serializer_class = FilterUserSerializer
pagination_class = HundredPageSizePaginator
filter_backends = (SearchFilter, filters.DjangoFilterBackend)
# NB start search params
# '^' Starts-with search.
# '=' Exact matches.
# '@' Full-text search. (Currently only supported Django's MySQL backend.)
# '$' Regex search.
search_fields = (
"^email",
"^username",
"^slack_user_identity__cached_slack_login",
"^slack_user_identity__cached_name",
)
filterset_class = UserFilter
def get_serializer_class(self):
request = self.request
user = request.user
kwargs = self.kwargs
is_filters_request = request.query_params.get("filters", "false") == "true"
if self.action in ["list"] and is_filters_request:
return self.get_filter_serializer_class()
is_users_own_data = kwargs.get("pk") is not None and kwargs.get("pk") == user.public_primary_key
has_admin_permission = user_is_authorized(user, [RBACPermission.Permissions.USER_SETTINGS_ADMIN])
if is_users_own_data or has_admin_permission:
return UserSerializer
return UserHiddenFieldsSerializer
def get_queryset(self):
slack_identity = self.request.query_params.get("slack_identity", None) == "true"
queryset = User.objects.filter(organization=self.request.user.organization)
queryset = self.get_serializer_class().setup_eager_loading(queryset)
if slack_identity:
queryset = queryset.filter(slack_user_identity__isnull=False).distinct()
return queryset.order_by("id")
def list(self, request, *args, **kwargs):
queryset = self.filter_queryset(self.get_queryset())
page = self.paginate_queryset(queryset)
if page is not None:
context = {"request": self.request, "format": self.format_kwarg, "view": self}
if settings.IS_OPEN_SOURCE:
if live_settings.GRAFANA_CLOUD_NOTIFICATIONS_ENABLED:
from apps.oss_installation.models import CloudConnector, CloudUserIdentity
connector = CloudConnector.objects.first()
if connector is not None:
emails = list(queryset.values_list("email", flat=True))
cloud_identities = list(CloudUserIdentity.objects.filter(email__in=emails))
cloud_identities = {cloud_identity.email: cloud_identity for cloud_identity in cloud_identities}
context["cloud_identities"] = cloud_identities
context["connector"] = connector
serializer = self.get_serializer(page, many=True, context=context)
return self.get_paginated_response(serializer.data)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
def retrieve(self, request, *args, **kwargs):
context = {"request": self.request, "format": self.format_kwarg, "view": self}
try:
instance = self.get_object()
except NotFound:
return self.wrong_team_response()
if settings.IS_OPEN_SOURCE and live_settings.GRAFANA_CLOUD_NOTIFICATIONS_ENABLED:
from apps.oss_installation.models import CloudConnector, CloudUserIdentity
connector = CloudConnector.objects.first()
if connector is not None:
cloud_identities = list(CloudUserIdentity.objects.filter(email__in=[instance.email]))
cloud_identities = {cloud_identity.email: cloud_identity for cloud_identity in cloud_identities}
context["cloud_identities"] = cloud_identities
context["connector"] = connector
serializer = self.get_serializer(instance, context=context)
return Response(serializer.data)
def wrong_team_response(self):
"""
This method returns 403 and {"error_code": "wrong_team", "owner_team": {"name", "id", "email", "avatar_url"}}.
Used in case if a requested instance doesn't belong to user's current_team.
Used instead of TeamFilteringMixin because of m2m teams field (mixin doesn't work correctly with this)
and overridden retrieve method in UserView.
"""
queryset = User.objects.filter(organization=self.request.user.organization).order_by("id")
queryset = self.filter_queryset(queryset)
try:
queryset.get(public_primary_key=self.kwargs["pk"])
except ObjectDoesNotExist:
raise NotFound
general_team = Team(public_primary_key=None, name="General", email=None, avatar_url=None)
return Response(
data={"error_code": "wrong_team", "owner_team": TeamSerializer(general_team).data},
status=status.HTTP_403_FORBIDDEN,
)
def current(self, request):
serializer = UserSerializer(self.get_queryset().get(pk=self.request.user.pk))
return Response(serializer.data)
@action(detail=False, methods=["get"])
def timezone_options(self, request):
return Response(pytz.common_timezones)
@action(
detail=True,
methods=["get"],
throttle_classes=[GetPhoneVerificationCodeThrottlerPerUser, GetPhoneVerificationCodeThrottlerPerOrg],
)
def get_verification_code(self, request, pk):
logger.info("get_verification_code: validating reCAPTCHA code")
valid = check_recaptcha_internal_api(request, "mobile_verification_code")
if not valid:
logger.warning(f"get_verification_code: invalid reCAPTCHA validation")
return Response("failed reCAPTCHA check", status=status.HTTP_400_BAD_REQUEST)
logger.info('get_verification_code: pass reCAPTCHA validation"')
user = self.get_object()
phone_backend = PhoneBackend()
try:
phone_backend.send_verification_sms(user)
except NumberAlreadyVerified:
return Response("Phone number already verified", status=status.HTTP_400_BAD_REQUEST)
except FailedToStartVerification:
return Response("Something went wrong while sending code", status=status.HTTP_500_INTERNAL_SERVER_ERROR)
except ProviderNotSupports:
return Response(
"Phone provider not supports sms verification", status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
return Response(status=status.HTTP_200_OK)
@action(
detail=True,
methods=["get"],
throttle_classes=[GetPhoneVerificationCodeThrottlerPerUser, GetPhoneVerificationCodeThrottlerPerOrg],
)
def get_verification_call(self, request, pk):
logger.info("get_verification_code_via_call: validating reCAPTCHA code")
valid = check_recaptcha_internal_api(request, "mobile_verification_code")
if not valid:
logger.warning(f"get_verification_code_via_call: invalid reCAPTCHA validation")
return Response("failed reCAPTCHA check", status=status.HTTP_400_BAD_REQUEST)
logger.info('get_verification_code_via_call: pass reCAPTCHA validation"')
user = self.get_object()
phone_backend = PhoneBackend()
try:
phone_backend.make_verification_call(user)
except NumberAlreadyVerified:
return Response("Phone number already verified", status=status.HTTP_400_BAD_REQUEST)
except FailedToStartVerification:
return Response("Something went wrong while calling", status=status.HTTP_500_INTERNAL_SERVER_ERROR)
except ProviderNotSupports:
return Response(
"Phone provider not supports call verification", status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
return Response(status=status.HTTP_200_OK)
@action(
detail=True,
methods=["put"],
throttle_classes=[VerifyPhoneNumberThrottlerPerUser, VerifyPhoneNumberThrottlerPerOrg],
)
def verify_number(self, request, pk):
target_user = self.get_object()
code = request.query_params.get("token", None)
if not code:
return Response("Invalid verification code", status=status.HTTP_400_BAD_REQUEST)
prev_state = target_user.insight_logs_serialized
phone_backend = PhoneBackend()
try:
verified = phone_backend.verify_phone_number(target_user, code)
except FailedToFinishVerification:
return Response("Something went wrong while verifying code", status=status.HTTP_503_SERVICE_UNAVAILABLE)
if verified:
new_state = target_user.insight_logs_serialized
write_resource_insight_log(
instance=target_user,
author=self.request.user,
event=EntityEvent.UPDATED,
prev_state=prev_state,
new_state=new_state,
)
return Response(status=status.HTTP_200_OK)
else:
return Response("Verification code is not correct", status=status.HTTP_400_BAD_REQUEST)
@action(detail=True, methods=["put"])
def forget_number(self, request, pk):
target_user = self.get_object()
prev_state = target_user.insight_logs_serialized
phone_backend = PhoneBackend()
removed = phone_backend.forget_number(target_user)
if removed:
new_state = target_user.insight_logs_serialized
write_resource_insight_log(
instance=target_user,
author=self.request.user,
event=EntityEvent.UPDATED,
prev_state=prev_state,
new_state=new_state,
)
return Response(status=status.HTTP_200_OK)
@action(detail=True, methods=["post"], throttle_classes=[TestCallThrottler])
def make_test_call(self, request, pk):
user = self.get_object()
try:
phone_backend = PhoneBackend()
phone_backend.make_test_call(user)
except NumberNotVerified:
return Response("Phone number is not verified", status=status.HTTP_400_BAD_REQUEST)
except FailedToMakeCall:
return Response(
"Something went wrong while making a test call", status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
except ProviderNotSupports:
return Response("Phone provider not supports phone calls", status=status.HTTP_500_INTERNAL_SERVER_ERROR)
return Response(status=status.HTTP_200_OK)
@action(detail=True, methods=["post"], throttle_classes=[TestCallThrottler])
def send_test_sms(self, request, pk):
user = self.get_object()
try:
phone_backend = PhoneBackend()
phone_backend.send_test_sms(user)
except NumberNotVerified:
return Response("Phone number is not verified", status=status.HTTP_400_BAD_REQUEST)
except FailedToMakeCall:
return Response(
"Something went wrong while making a test call", status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
except ProviderNotSupports:
return Response("Phone provider not supports phone calls", status=status.HTTP_500_INTERNAL_SERVER_ERROR)
return Response(status=status.HTTP_200_OK)
@action(detail=True, methods=["post"], throttle_classes=[TestPushThrottler])
def send_test_push(self, request, pk):
user = self.get_object()
critical = request.query_params.get("critical", "false") == "true"
try:
send_test_push(user, critical)
except DeviceNotSet:
return Response(
data="Mobile device not connected",
status=status.HTTP_400_BAD_REQUEST,
)
except Exception as e:
logger.info(f"UserView.send_test_push: Unable to send test push due to {e}")
return Response(
data="Something went wrong while sending a test push", status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
return Response(status=status.HTTP_200_OK)
@action(detail=True, methods=["get"])
def get_backend_verification_code(self, request, pk):
backend_id = request.query_params.get("backend")
backend = get_messaging_backend_from_id(backend_id)
if backend is None:
return Response(status=status.HTTP_400_BAD_REQUEST)
user = self.get_object()
code = backend.generate_user_verification_code(user)
return Response(code)
@action(detail=True, methods=["get"])
def get_telegram_verification_code(self, request, pk):
user = self.get_object()
if not user.is_telegram_connected:
return Response(status=status.HTTP_400_BAD_REQUEST)
try:
existing_verification_code = user.telegram_verification_code
existing_verification_code.delete()
except TelegramVerificationCode.DoesNotExist:
pass
new_code = TelegramVerificationCode(user=user)
new_code.save()
telegram_client = TelegramClient()
bot_username = telegram_client.api_client.username
bot_link = f"https://t.me/{bot_username}"
return Response(
{"telegram_code": str(new_code.uuid_with_org_uuid), "bot_link": bot_link}, status=status.HTTP_200_OK
)
@action(detail=True, methods=["post"])
def unlink_slack(self, request, pk):
user = self.get_object()
user.slack_user_identity = None
user.save(update_fields=["slack_user_identity"])
write_chatops_insight_log(
author=request.user,
event_name=ChatOpsEvent.USER_UNLINKED,
chatops_type=ChatOpsTypePlug.SLACK.value,
linked_user=user.username,
linked_user_id=user.public_primary_key,
)
return Response(status=status.HTTP_200_OK)
@action(detail=True, methods=["post"])
def unlink_telegram(self, request, pk):
user = self.get_object()
TelegramToUserConnector = apps.get_model("telegram", "TelegramToUserConnector")
try:
connector = TelegramToUserConnector.objects.get(user=user)
connector.delete()
write_chatops_insight_log(
author=request.user,
event_name=ChatOpsEvent.USER_UNLINKED,
chatops_type=ChatOpsTypePlug.TELEGRAM.value,
linked_user=user.username,
linked_user_id=user.public_primary_key,
)
except TelegramToUserConnector.DoesNotExist:
return Response(status=status.HTTP_400_BAD_REQUEST)
return Response(status=status.HTTP_200_OK)
@action(detail=True, methods=["post"])
def unlink_backend(self, request, pk):
# TODO: insight logs support
backend_id = request.query_params.get("backend")
backend = get_messaging_backend_from_id(backend_id)
if backend is None:
return Response(status=status.HTTP_400_BAD_REQUEST)
user = self.get_object()
try:
backend.unlink_user(user)
write_chatops_insight_log(
author=request.user,
event_name=ChatOpsEvent.USER_UNLINKED,
chatops_type=backend.backend_id,
linked_user=user.username,
linked_user_id=user.public_primary_key,
)
except ObjectDoesNotExist:
return Response(status=status.HTTP_400_BAD_REQUEST)
return Response(status=status.HTTP_200_OK)
@action(detail=True, methods=["get"])
def upcoming_shifts(self, request, pk):
user = self.get_object()
try:
days = int(request.query_params.get("days", UPCOMING_SHIFTS_DEFAULT_DAYS))
except ValueError:
return Response(status=status.HTTP_400_BAD_REQUEST)
if days <= 0 or days > UPCOMING_SHIFTS_MAX_DAYS:
return Response(status=status.HTTP_400_BAD_REQUEST)
# filter user-related schedules
schedules = OnCallSchedule.objects.related_to_user(user)
# check upcoming shifts
upcoming = []
for schedule in schedules:
current_shift, upcoming_shift = schedule.upcoming_shift_for_user(user, days=days)
if current_shift or upcoming_shift:
upcoming.append(
{
"schedule_id": schedule.public_primary_key,
"schedule_name": schedule.name,
"is_oncall": current_shift is not None,
"current_shift": current_shift,
"next_shift": upcoming_shift,
}
)
# sort entries by start timestamp
def sorting_key(entry):
shift = entry["current_shift"] if entry["current_shift"] else entry["next_shift"]
return shift["start"]
upcoming.sort(key=sorting_key)
return Response(upcoming, status=status.HTTP_200_OK)
@action(detail=True, methods=["get", "post", "delete"])
def export_token(self, request, pk):
user = self.get_object()
if self.request.method == "GET":
try:
token = UserScheduleExportAuthToken.objects.get(user=user)
except UserScheduleExportAuthToken.DoesNotExist:
raise NotFound
response = {
"created_at": token.created_at,
"revoked_at": token.revoked_at,
"active": token.active,
}
return Response(response, status=status.HTTP_200_OK)
if self.request.method == "POST":
try:
instance, token = UserScheduleExportAuthToken.create_auth_token(user, user.organization)
write_resource_insight_log(instance=instance, author=self.request.user, event=EntityEvent.CREATED)
except IntegrityError:
raise Conflict("Schedule export token for user already exists")
export_url = create_engine_url(
reverse("api-public:users-schedule-export", kwargs={"pk": user.public_primary_key})
+ f"?{SCHEDULE_EXPORT_TOKEN_NAME}={token}"
)
data = {"token": token, "created_at": instance.created_at, "export_url": export_url}
return Response(data, status=status.HTTP_201_CREATED)
if self.request.method == "DELETE":
try:
token = UserScheduleExportAuthToken.objects.get(user=user)
write_resource_insight_log(instance=token, author=self.request.user, event=EntityEvent.DELETED)
token.delete()
except UserScheduleExportAuthToken.DoesNotExist:
raise NotFound
return Response(status=status.HTTP_204_NO_CONTENT)
@action(detail=True, methods=["get"])
def check_availability(self, request, pk):
user = self.get_object()
warnings = check_user_availability(user=user, team=request.user.current_team)
return Response(data={"warnings": warnings}, status=status.HTTP_200_OK)