121 lines
4.8 KiB
Python
121 lines
4.8 KiB
Python
from rest_framework import status
|
|
from rest_framework.exceptions import NotFound
|
|
from rest_framework.permissions import IsAuthenticated
|
|
from rest_framework.response import Response
|
|
from rest_framework.viewsets import ModelViewSet
|
|
|
|
from apps.api.permissions import RBACPermission
|
|
from apps.auth_token.auth import ApiTokenAuthentication
|
|
from apps.base.models import UserNotificationPolicy
|
|
from apps.public_api.serializers import PersonalNotificationRuleSerializer, PersonalNotificationRuleUpdateSerializer
|
|
from apps.public_api.throttlers.user_throttle import UserThrottle
|
|
from apps.user_management.models import User
|
|
from common.api_helpers.exceptions import BadRequest
|
|
from common.api_helpers.mixins import RateLimitHeadersMixin, UpdateSerializerMixin
|
|
from common.api_helpers.paginators import FiftyPageSizePaginator
|
|
from common.insight_log import EntityEvent, write_resource_insight_log
|
|
|
|
|
|
class PersonalNotificationView(RateLimitHeadersMixin, UpdateSerializerMixin, ModelViewSet):
|
|
authentication_classes = (ApiTokenAuthentication,)
|
|
permission_classes = (IsAuthenticated, RBACPermission)
|
|
|
|
rbac_permissions = {
|
|
"list": [RBACPermission.Permissions.USER_SETTINGS_READ],
|
|
"retrieve": [RBACPermission.Permissions.USER_SETTINGS_READ],
|
|
"create": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
|
|
"update": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
|
|
"partial_update": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
|
|
"destroy": [RBACPermission.Permissions.USER_SETTINGS_WRITE],
|
|
}
|
|
|
|
throttle_classes = [UserThrottle]
|
|
|
|
model = UserNotificationPolicy
|
|
serializer_class = PersonalNotificationRuleSerializer
|
|
update_serializer_class = PersonalNotificationRuleUpdateSerializer
|
|
|
|
pagination_class = FiftyPageSizePaginator
|
|
|
|
def get_queryset(self):
|
|
user_id = self.request.query_params.get("user_id", None)
|
|
important = self.request.query_params.get("important", None)
|
|
organization = self.request.auth.organization
|
|
if user_id is not None:
|
|
if user_id != self.request.user.public_primary_key:
|
|
try:
|
|
User.objects.get(
|
|
public_primary_key=user_id,
|
|
organization=organization,
|
|
)
|
|
except User.DoesNotExist:
|
|
raise BadRequest(detail="User not found.")
|
|
queryset = UserNotificationPolicy.objects.filter(
|
|
user__public_primary_key=user_id,
|
|
user__organization=organization,
|
|
)
|
|
else:
|
|
queryset = UserNotificationPolicy.objects.filter(user__organization=organization).distinct()
|
|
if important is not None:
|
|
if important == "true":
|
|
queryset = queryset.filter(important=True)
|
|
elif important == "false":
|
|
queryset = queryset.filter(important=False)
|
|
else:
|
|
raise BadRequest(detail="Important is not bool")
|
|
|
|
queryset = self.serializer_class.setup_eager_loading(queryset)
|
|
|
|
return queryset.order_by("user", "important", "order")
|
|
|
|
def get_object(self):
|
|
public_primary_key = self.kwargs["pk"]
|
|
queryset = self.filter_queryset(self.get_queryset())
|
|
try:
|
|
return queryset.get(public_primary_key=public_primary_key)
|
|
except UserNotificationPolicy.DoesNotExist:
|
|
raise NotFound
|
|
|
|
def destroy(self, request, *args, **kwargs):
|
|
instance = self.get_object()
|
|
self.perform_destroy(instance)
|
|
return Response(status=status.HTTP_204_NO_CONTENT)
|
|
|
|
def perform_destroy(self, instance):
|
|
user = self.request.user
|
|
prev_state = user.insight_logs_serialized
|
|
instance.delete()
|
|
new_state = user.insight_logs_serialized
|
|
write_resource_insight_log(
|
|
instance=user,
|
|
author=self.request.user,
|
|
event=EntityEvent.UPDATED,
|
|
prev_state=prev_state,
|
|
new_state=new_state,
|
|
)
|
|
|
|
def perform_create(self, serializer):
|
|
user = serializer.validated_data["user"]
|
|
prev_state = user.insight_logs_serialized
|
|
serializer.save()
|
|
new_state = user.insight_logs_serialized
|
|
write_resource_insight_log(
|
|
instance=user,
|
|
author=self.request.user,
|
|
event=EntityEvent.UPDATED,
|
|
prev_state=prev_state,
|
|
new_state=new_state,
|
|
)
|
|
|
|
def perform_update(self, serializer):
|
|
user = self.request.user
|
|
prev_state = user.insight_logs_serialized
|
|
serializer.save()
|
|
new_state = user.insight_logs_serialized
|
|
write_resource_insight_log(
|
|
instance=user,
|
|
author=self.request.user,
|
|
event=EntityEvent.UPDATED,
|
|
prev_state=prev_state,
|
|
new_state=new_state,
|
|
)
|