Related to https://github.com/grafana/oncall-private/issues/2826 Continuing work started in https://github.com/grafana/oncall/pull/5211, this adds support for Grafana service accounts tokens for API authentication (except alert group actions which will still require a user behind). Next steps would be updating the go client and the terraform provider to allow service account token auth for OnCall resources. Following proposal 1.1 from [doc](https://docs.google.com/document/d/1I3nFbsUEkiNPphBXT-kWefIeramTY71qqZ1OA06Kmls/edit?usp=sharing).
100 lines
4 KiB
Python
100 lines
4 KiB
Python
from django_filters.rest_framework import DjangoFilterBackend
|
|
from rest_framework import status
|
|
from rest_framework.exceptions import NotFound
|
|
from rest_framework.permissions import IsAuthenticated
|
|
from rest_framework.response import Response
|
|
from rest_framework.viewsets import ModelViewSet
|
|
|
|
from apps.alerts.models import ChannelFilter
|
|
from apps.api.permissions import RBACPermission
|
|
from apps.auth_token.auth import ApiTokenAuthentication, GrafanaServiceAccountAuthentication
|
|
from apps.public_api.serializers import ChannelFilterSerializer, ChannelFilterUpdateSerializer
|
|
from apps.public_api.throttlers.user_throttle import UserThrottle
|
|
from common.api_helpers.exceptions import BadRequest
|
|
from common.api_helpers.mixins import RateLimitHeadersMixin, UpdateSerializerMixin
|
|
from common.api_helpers.paginators import TwentyFivePageSizePaginator
|
|
from common.insight_log import EntityEvent, write_resource_insight_log
|
|
|
|
|
|
class ChannelFilterView(RateLimitHeadersMixin, UpdateSerializerMixin, ModelViewSet):
|
|
authentication_classes = (GrafanaServiceAccountAuthentication, ApiTokenAuthentication)
|
|
permission_classes = (IsAuthenticated, RBACPermission)
|
|
|
|
rbac_permissions = {
|
|
"list": [RBACPermission.Permissions.INTEGRATIONS_READ],
|
|
"retrieve": [RBACPermission.Permissions.INTEGRATIONS_READ],
|
|
"create": [RBACPermission.Permissions.INTEGRATIONS_WRITE],
|
|
"update": [RBACPermission.Permissions.INTEGRATIONS_WRITE],
|
|
"partial_update": [RBACPermission.Permissions.INTEGRATIONS_WRITE],
|
|
"destroy": [RBACPermission.Permissions.INTEGRATIONS_WRITE],
|
|
}
|
|
|
|
throttle_classes = [UserThrottle]
|
|
|
|
model = ChannelFilter
|
|
serializer_class = ChannelFilterSerializer
|
|
update_serializer_class = ChannelFilterUpdateSerializer
|
|
|
|
pagination_class = TwentyFivePageSizePaginator
|
|
|
|
filter_backends = [DjangoFilterBackend]
|
|
filterset_fields = ["alert_receive_channel"]
|
|
|
|
def get_queryset(self):
|
|
integration_id = self.request.query_params.get("integration_id", None)
|
|
routing_regex = self.request.query_params.get("routing_regex", None)
|
|
|
|
queryset = ChannelFilter.objects.filter(
|
|
alert_receive_channel__organization=self.request.auth.organization, alert_receive_channel__deleted_at=None
|
|
)
|
|
queryset = self.serializer_class.setup_eager_loading(queryset)
|
|
|
|
if integration_id:
|
|
queryset = queryset.filter(alert_receive_channel__public_primary_key=integration_id)
|
|
if routing_regex:
|
|
queryset = queryset.filter(filtering_term=routing_regex)
|
|
return queryset
|
|
|
|
def get_object(self):
|
|
public_primary_key = self.kwargs["pk"]
|
|
|
|
try:
|
|
return ChannelFilter.objects.filter(
|
|
alert_receive_channel__organization=self.request.auth.organization,
|
|
alert_receive_channel__deleted_at=None,
|
|
).get(public_primary_key=public_primary_key)
|
|
except ChannelFilter.DoesNotExist:
|
|
raise NotFound
|
|
|
|
def destroy(self, request, *args, **kwargs):
|
|
instance = self.get_object()
|
|
if instance.is_default:
|
|
raise BadRequest(detail="Unable to delete default filter")
|
|
else:
|
|
write_resource_insight_log(
|
|
instance=instance,
|
|
author=self.request.user,
|
|
event=EntityEvent.DELETED,
|
|
)
|
|
self.perform_destroy(instance)
|
|
return Response(status=status.HTTP_204_NO_CONTENT)
|
|
|
|
def perform_create(self, serializer):
|
|
serializer.save()
|
|
write_resource_insight_log(
|
|
instance=serializer.instance,
|
|
author=self.request.user,
|
|
event=EntityEvent.CREATED,
|
|
)
|
|
|
|
def perform_update(self, serializer):
|
|
prev_state = serializer.instance.insight_logs_serialized
|
|
serializer.save()
|
|
new_state = serializer.instance.insight_logs_serialized
|
|
write_resource_insight_log(
|
|
instance=serializer.instance,
|
|
author=self.request.user,
|
|
event=EntityEvent.UPDATED,
|
|
prev_state=prev_state,
|
|
new_state=new_state,
|
|
)
|