Add integration status_options endpoint listing backsync possible statuses (#4061)

Related to https://github.com/grafana/oncall-private/issues/2542

Requires https://github.com/grafana/oncall/pull/4050
This commit is contained in:
Matias Bordese 2024-03-18 09:11:40 -03:00 committed by GitHub
parent 477062bb0c
commit 432c477ee2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 75 additions and 12 deletions

View file

@ -10,7 +10,7 @@ from rest_framework.test import APIClient
from apps.alerts.models import AlertReceiveChannel, EscalationPolicy
from apps.api.permissions import LegacyAccessControlRole
from apps.labels.models import LabelKeyCache, LabelValueCache
from common.exceptions import TestConnectionError
from common.exceptions import BacksyncIntegrationRequestError
class AdditionalSettingsTestSerializer(serializers.Serializer):
@ -1962,7 +1962,7 @@ def test_alert_receive_channel_test_connection(
# test error
def testing_error(instance):
raise TestConnectionError(error_msg="Error!")
raise BacksyncIntegrationRequestError(error_msg="Error!")
with patch.object(integration_config, "test_connection", side_effect=testing_error, create=True):
response = client.post(url, data, format="json", **make_user_auth_headers(user, token))
@ -1976,6 +1976,45 @@ def test_alert_receive_channel_test_connection(
assert response.json() == {"team": ["Object does not exist"]}
@pytest.mark.django_db
def test_alert_receive_channel_status_options(
make_organization_and_user_with_plugin_token,
make_alert_receive_channel,
make_user_auth_headers,
):
organization, user, token = make_organization_and_user_with_plugin_token()
integration_config = AlertReceiveChannel._config[0]
alert_receive_channel = make_alert_receive_channel(organization, integration=integration_config.slug)
client = APIClient()
url = reverse(
"api-internal:alert_receive_channel-status-options",
kwargs={"pk": alert_receive_channel.public_primary_key},
)
# no status options setup
response = client.get(url, format="json", **make_user_auth_headers(user, token))
assert response.status_code == status.HTTP_200_OK
assert response.json() == []
# test ok
def testing_ok(instance):
return [("The option", "value"), ("Another", "another")]
with patch.object(integration_config, "status_options", side_effect=testing_ok, create=True):
response = client.get(url, format="json", **make_user_auth_headers(user, token))
assert response.status_code == status.HTTP_200_OK
assert response.json() == [["The option", "value"], ["Another", "another"]]
# test error
def testing_error(instance):
raise BacksyncIntegrationRequestError(error_msg="Error!")
with patch.object(integration_config, "status_options", side_effect=testing_error, create=True):
response = client.get(url, format="json", **make_user_auth_headers(user, token))
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert response.json() == {"detail": "Error!"}
def _webhook_data(webhook_id=ANY, webhook_name=ANY, webhook_url=ANY, alert_receive_channel_id=ANY):
return {
"authorization_header": None,

View file

@ -51,9 +51,9 @@ from common.api_helpers.mixins import (
)
from common.api_helpers.paginators import FifteenPageSizePaginator
from common.exceptions import (
BacksyncIntegrationRequestError,
MaintenanceCouldNotBeStartedError,
TeamCanNotBeChangedError,
TestConnectionError,
UnableToSendDemoAlert,
)
from common.insight_log import EntityEvent, write_resource_insight_log
@ -173,6 +173,7 @@ class AlertReceiveChannelView(
"create_contact_point": [RBACPermission.Permissions.INTEGRATIONS_WRITE],
"disconnect_contact_point": [RBACPermission.Permissions.INTEGRATIONS_WRITE],
"test_connection": [RBACPermission.Permissions.INTEGRATIONS_WRITE],
"status_options": [RBACPermission.Permissions.INTEGRATIONS_READ],
"webhooks_get": [RBACPermission.Permissions.INTEGRATIONS_READ],
"webhooks_post": [RBACPermission.Permissions.INTEGRATIONS_WRITE],
"webhooks_put": [RBACPermission.Permissions.INTEGRATIONS_WRITE],
@ -280,6 +281,14 @@ class AlertReceiveChannelView(
return Response(status=status.HTTP_200_OK)
def _backsync_integration_request(self, instance, func_name):
integration_func = getattr(instance.config, func_name, None)
if integration_func:
try:
return integration_func(instance)
except BacksyncIntegrationRequestError as e:
raise BadRequest(detail=e.error_msg)
@extend_schema(request=AlertReceiveChannelSerializer)
@action(detail=False, methods=["post"])
def test_connection(self, request):
@ -290,15 +299,30 @@ class AlertReceiveChannelView(
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
instance = AlertReceiveChannel(**serializer.validated_data)
test_connection_func = getattr(instance.config, "test_connection", None)
if test_connection_func:
try:
test_connection_func(instance)
except TestConnectionError as e:
raise BadRequest(detail=e.error_msg)
# will raise if there are errors
self._backsync_integration_request(instance, "test_connection")
return Response(status=status.HTTP_200_OK)
@extend_schema(
request=inline_serializer(
name="AlertReceiveChannelBacksyncStatusOptions",
fields={
"value": serializers.CharField(),
"display_name": serializers.CharField(),
},
many=True,
),
)
@action(detail=True, methods=["get"])
def status_options(self, request, pk):
instance = self.get_object()
choices = self._backsync_integration_request(instance, "status_options")
if choices is None:
choices = []
return Response(choices)
@extend_schema(
responses=inline_serializer(
name="AlertReceiveChannelIntegrationOptions",

View file

@ -1,7 +1,7 @@
from .exceptions import ( # noqa: F401
BacksyncIntegrationRequestError,
MaintenanceCouldNotBeStartedError,
TeamCanNotBeChangedError,
TestConnectionError,
UnableToSendDemoAlert,
UserNotificationPolicyCouldNotBeDeleted,
)

View file

@ -23,8 +23,8 @@ class UserNotificationPolicyCouldNotBeDeleted(OperationCouldNotBePerformedError)
pass
class TestConnectionError(Exception):
"""Error testing alert receive channel connection."""
class BacksyncIntegrationRequestError(Exception):
"""Error making request to alert receive channel backsync connection."""
def __init__(self, *args, **kwargs):
self.error_msg = kwargs.pop("error_msg", None)