Enable test_connection for existing integrations reusing data (#4091)

Related to https://github.com/grafana/oncall-private/issues/2542
Make it possible to test connection for an existing integration, without
needing to explicitly pass the password.
This commit is contained in:
Matias Bordese 2024-03-21 11:04:35 -03:00 committed by GitHub
parent 5df15926e9
commit 73cfaf25c0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 68 additions and 12 deletions

View file

@ -340,7 +340,7 @@ class AlertReceiveChannelSerializer(
_additional_settings_serializer_from_type(self.instance.config.slug) if self.instance else None
)
if settings_serializer_cls:
additional_settings_data = data.get("additional_settings")
additional_settings_data = data.get("additional_settings", self.instance.additional_settings)
settings_serializer = settings_serializer_cls(self.instance, data=additional_settings_data)
settings_serializer.is_valid()
if settings_serializer.errors:

View file

@ -1941,7 +1941,7 @@ def test_alert_receive_channel_test_connection(
):
_, user, token = make_organization_and_user_with_plugin_token()
client = APIClient()
url = reverse("api-internal:alert_receive_channel-test-connection")
url = reverse("api-internal:alert_receive_channel-test-connection-create")
integration_config = AlertReceiveChannel._config[0]
data = {
"integration": integration_config.slug,
@ -1976,6 +1976,33 @@ def test_alert_receive_channel_test_connection(
assert response.json() == {"team": ["Object does not exist"]}
@pytest.mark.django_db
def test_alert_receive_channel_test_connection_existing_integration(
make_organization_and_user_with_plugin_token,
make_user_auth_headers,
make_alert_receive_channel,
):
organization, user, token = make_organization_and_user_with_plugin_token()
client = APIClient()
integration_config = AlertReceiveChannel._config[0]
integration = make_alert_receive_channel(organization, integration=integration_config.slug, description_short="ok")
def testing_connection(instance):
if instance.description_short != "ok":
raise BacksyncIntegrationRequestError(error_msg="Error!")
url = reverse("api-internal:alert_receive_channel-test-connection", kwargs={"pk": integration.public_primary_key})
with patch.object(integration_config, "test_connection", side_effect=testing_connection, create=True):
data = {} # no updates, use original data
response = client.post(url, data, format="json", **make_user_auth_headers(user, token))
assert response.status_code == status.HTTP_200_OK
data = {"description_short": "notok"}
response = client.post(url, data, format="json", **make_user_auth_headers(user, token))
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert response.json() == {"detail": "Error!"}
@pytest.mark.django_db
def test_alert_receive_channel_status_options(
make_organization_and_user_with_plugin_token,

View file

@ -172,6 +172,7 @@ class AlertReceiveChannelView(
"connect_contact_point": [RBACPermission.Permissions.INTEGRATIONS_WRITE],
"create_contact_point": [RBACPermission.Permissions.INTEGRATIONS_WRITE],
"disconnect_contact_point": [RBACPermission.Permissions.INTEGRATIONS_WRITE],
"test_connection_create": [RBACPermission.Permissions.INTEGRATIONS_WRITE],
"test_connection": [RBACPermission.Permissions.INTEGRATIONS_WRITE],
"status_options": [RBACPermission.Permissions.INTEGRATIONS_READ],
"webhooks_get": [RBACPermission.Permissions.INTEGRATIONS_READ],
@ -290,25 +291,53 @@ class AlertReceiveChannelView(
except BacksyncIntegrationRequestError as e:
raise BadRequest(detail=e.error_msg)
@extend_schema(
request=AlertReceiveChannelSerializer,
responses={status.HTTP_200_OK: None},
)
@action(detail=False, methods=["post"])
def test_connection(self, request):
# create in-memory instance to test with the (possible) unsaved data
def _test_connection(self, request, pk=None):
instance = None
data = request.data
# clear name while testing connection (to avoid name already used validation error)
data["verbal_name"] = None
serializer = self.get_serializer(data=request.data)
if pk is not None:
instance = self.get_object()
serializer = self.update_serializer_class(
instance,
data=data,
partial=True,
context={"request": request},
)
else:
serializer = self.create_serializer_class(data=data, context={"request": request})
# check we have all the required information
serializer.is_valid(raise_exception=True)
instance = AlertReceiveChannel(**serializer.validated_data)
if instance is None:
serializer.validated_data.pop("create_default_webhooks", None)
# create in-memory instance to test with the (possible) unsaved data
instance = AlertReceiveChannel(**serializer.validated_data)
else:
# update instance with the validated data
for attr, val in serializer.validated_data.items():
setattr(instance, attr, val)
# will raise if there are errors
self._backsync_integration_request(instance, "test_connection")
return Response(status=status.HTTP_200_OK)
@extend_schema(
request=AlertReceiveChannelSerializer,
responses={status.HTTP_200_OK: None},
)
@action(detail=False, methods=["post"], url_path="test_connection")
def test_connection_create(self, request):
return self._test_connection(request)
@extend_schema(
request=AlertReceiveChannelUpdateSerializer,
responses={status.HTTP_200_OK: None},
)
@action(detail=True, methods=["post"])
def test_connection(self, request, pk):
return self._test_connection(request, pk=pk)
@extend_schema(
responses=inline_serializer(
name="AlertReceiveChannelBacksyncStatusOptions",