from unittest.mock import patch import pytest from django.db.models import Max from django.urls import reverse from django.utils.timezone import timedelta from rest_framework import status from rest_framework.response import Response from rest_framework.test import APIClient from apps.alerts.models import EscalationPolicy from apps.api.permissions import LegacyAccessControlRole from common.incident_api.client import DEFAULT_INCIDENT_SEVERITY, IncidentAPIException @pytest.fixture() def escalation_policy_internal_api_setup( make_organization_and_user_with_plugin_token, make_escalation_chain, make_user_for_organization, make_escalation_policy, ): organization, first_user, token = make_organization_and_user_with_plugin_token() second_user = make_user_for_organization(organization) escalation_chain = make_escalation_chain(organization) escalation_policy = make_escalation_policy( escalation_chain=escalation_chain, escalation_policy_step=EscalationPolicy.STEP_WAIT, wait_delay=EscalationPolicy.ONE_MINUTE, ) return token, escalation_chain, escalation_policy, first_user, second_user @pytest.mark.django_db def test_create_escalation_policy(escalation_policy_internal_api_setup, make_user_auth_headers): token, escalation_chain, _, user, _ = escalation_policy_internal_api_setup client = APIClient() url = reverse("api-internal:escalation_policy-list") data = { "step": EscalationPolicy.STEP_WAIT, "wait_delay": "60.0", "escalation_chain": escalation_chain.public_primary_key, "notify_to_users_queue": [], "from_time": None, "to_time": None, } max_order = EscalationPolicy.objects.filter(escalation_chain=escalation_chain).aggregate(maxorder=Max("order"))[ "maxorder" ] response = client.post(url, data, format="json", **make_user_auth_headers(user, token)) assert response.status_code == status.HTTP_201_CREATED assert EscalationPolicy.objects.get(public_primary_key=response.data["id"]).order == max_order + 1 @pytest.mark.django_db @pytest.mark.parametrize("wait_delay", (timedelta(seconds=59), timedelta(hours=24, seconds=1))) def test_create_escalation_policy_wait_delay_invalid( escalation_policy_internal_api_setup, make_user_auth_headers, wait_delay ): token, escalation_chain, _, user, _ = escalation_policy_internal_api_setup client = APIClient() url = reverse("api-internal:escalation_policy-list") data = { "step": EscalationPolicy.STEP_WAIT, "wait_delay": str(wait_delay.total_seconds()), "escalation_chain": escalation_chain.public_primary_key, "notify_to_users_queue": [], "from_time": None, "to_time": None, } response = client.post(url, data, format="json", **make_user_auth_headers(user, token)) assert response.status_code == status.HTTP_400_BAD_REQUEST @pytest.mark.django_db def test_create_escalation_policy_webhook( escalation_policy_internal_api_setup, make_custom_webhook, make_user_auth_headers ): token, escalation_chain, _, user, _ = escalation_policy_internal_api_setup client = APIClient() url = reverse("api-internal:escalation_policy-list") webhook = make_custom_webhook(organization=user.organization) data = { "step": EscalationPolicy.STEP_TRIGGER_CUSTOM_WEBHOOK, "escalation_chain": escalation_chain.public_primary_key, "custom_webhook": webhook.public_primary_key, } max_order = EscalationPolicy.objects.filter(escalation_chain=escalation_chain).aggregate(maxorder=Max("order"))[ "maxorder" ] response = client.post(url, data, format="json", **make_user_auth_headers(user, token)) assert response.status_code == status.HTTP_201_CREATED assert response.data["custom_webhook"] == webhook.public_primary_key escalation_policy = EscalationPolicy.objects.get(public_primary_key=response.data["id"]) assert escalation_policy.order == max_order + 1 assert escalation_policy.custom_webhook == webhook @pytest.mark.django_db def test_update_notify_multiple_users_step(escalation_policy_internal_api_setup, make_user_auth_headers): token, _, escalation_policy, first_user, second_user = escalation_policy_internal_api_setup client = APIClient() url = reverse("api-internal:escalation_policy-detail", kwargs={"pk": escalation_policy.public_primary_key}) data = { "step": EscalationPolicy.STEP_NOTIFY_MULTIPLE_USERS, "notify_to_users_queue": [first_user.public_primary_key, second_user.public_primary_key], } response = client.put(url, data, format="json", **make_user_auth_headers(first_user, token)) assert response.status_code == status.HTTP_200_OK assert response.json()["step"] == EscalationPolicy.STEP_NOTIFY_MULTIPLE_USERS assert sorted(response.json()["notify_to_users_queue"]) == sorted( [first_user.public_primary_key, second_user.public_primary_key] ) @pytest.mark.django_db def test_manage_escalation_policy_notify_team(escalation_policy_internal_api_setup, make_team, make_user_auth_headers): token, escalation_chain, _, user, _ = escalation_policy_internal_api_setup client = APIClient() url = reverse("api-internal:escalation_policy-list") team = make_team(organization=user.organization) data = { "step": EscalationPolicy.STEP_NOTIFY_TEAM_MEMBERS, "escalation_chain": escalation_chain.public_primary_key, "notify_to_team_members": team.public_primary_key, } max_order = EscalationPolicy.objects.filter(escalation_chain=escalation_chain).aggregate(maxorder=Max("order"))[ "maxorder" ] response = client.post(url, data, format="json", **make_user_auth_headers(user, token)) assert response.status_code == status.HTTP_201_CREATED assert response.data["notify_to_team_members"] == team.public_primary_key escalation_policy = EscalationPolicy.objects.get(public_primary_key=response.data["id"]) assert escalation_policy.order == max_order + 1 assert escalation_policy.notify_to_team_members == team # update team in policy url = reverse("api-internal:escalation_policy-detail", kwargs={"pk": escalation_policy.public_primary_key}) another_team = make_team(organization=user.organization) data = { "step": EscalationPolicy.STEP_NOTIFY_TEAM_MEMBERS, "notify_to_team_members": another_team.public_primary_key, } response = client.put(url, data, format="json", **make_user_auth_headers(user, token)) assert response.status_code == status.HTTP_200_OK assert response.json()["step"] == EscalationPolicy.STEP_NOTIFY_TEAM_MEMBERS assert response.json()["notify_to_team_members"] == another_team.public_primary_key @pytest.mark.django_db def test_move_to_position(escalation_policy_internal_api_setup, make_user_auth_headers): token, _, escalation_policy, user, _ = escalation_policy_internal_api_setup client = APIClient() position_to_move = 0 url = reverse( "api-internal:escalation_policy-move-to-position", kwargs={"pk": escalation_policy.public_primary_key} ) response = client.put( f"{url}?position={position_to_move}", content_type="application/json", **make_user_auth_headers(user, token) ) escalation_policy.refresh_from_db() assert response.status_code == status.HTTP_200_OK assert escalation_policy.order == position_to_move @pytest.mark.django_db def test_move_to_position_invalid_index(escalation_policy_internal_api_setup, make_user_auth_headers): token, _, escalation_policy, user, _ = escalation_policy_internal_api_setup client = APIClient() position_to_move = 1 url = reverse( "api-internal:escalation_policy-move-to-position", kwargs={"pk": escalation_policy.public_primary_key} ) response = client.put( f"{url}?position={position_to_move}", content_type="application/json", **make_user_auth_headers(user, token) ) escalation_policy.refresh_from_db() assert response.status_code == status.HTTP_400_BAD_REQUEST @pytest.mark.django_db @pytest.mark.parametrize( "role,expected_status", [ (LegacyAccessControlRole.ADMIN, status.HTTP_200_OK), (LegacyAccessControlRole.EDITOR, status.HTTP_403_FORBIDDEN), (LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN), (LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN), ], ) def test_escalation_policy_create_permissions( make_organization_and_user_with_plugin_token, make_escalation_chain, make_escalation_policy, make_user_auth_headers, role, expected_status, ): organization, user, token = make_organization_and_user_with_plugin_token(role) escalation_chain = make_escalation_chain(organization) make_escalation_policy( escalation_chain, escalation_policy_step=EscalationPolicy.STEP_WAIT, wait_delay=EscalationPolicy.ONE_MINUTE ) client = APIClient() url = reverse("api-internal:escalation_policy-list") with patch( "apps.api.views.escalation_policy.EscalationPolicyView.create", return_value=Response( status=status.HTTP_200_OK, ), ): response = client.post(url, format="json", **make_user_auth_headers(user, token)) assert response.status_code == expected_status @pytest.mark.django_db @pytest.mark.parametrize( "role,expected_status", [ (LegacyAccessControlRole.ADMIN, status.HTTP_200_OK), (LegacyAccessControlRole.EDITOR, status.HTTP_403_FORBIDDEN), (LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN), (LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN), ], ) def test_escalation_policy_update_permissions( make_organization_and_user_with_plugin_token, make_escalation_chain, make_escalation_policy, make_user_auth_headers, role, expected_status, ): organization, user, token = make_organization_and_user_with_plugin_token(role) escalation_chain = make_escalation_chain(organization) escalation_policy = make_escalation_policy( escalation_chain, escalation_policy_step=EscalationPolicy.STEP_WAIT, wait_delay=EscalationPolicy.ONE_MINUTE ) client = APIClient() url = reverse("api-internal:escalation_policy-detail", kwargs={"pk": escalation_policy.public_primary_key}) with patch( "apps.api.views.escalation_policy.EscalationPolicyView.update", return_value=Response( status=status.HTTP_200_OK, ), ): response = client.put(url, format="json", **make_user_auth_headers(user, token)) assert response.status_code == expected_status response = client.patch(url, format="json", **make_user_auth_headers(user, token)) assert response.status_code == expected_status @pytest.mark.django_db @pytest.mark.parametrize( "role,expected_status", [ (LegacyAccessControlRole.ADMIN, status.HTTP_200_OK), (LegacyAccessControlRole.EDITOR, status.HTTP_200_OK), (LegacyAccessControlRole.VIEWER, status.HTTP_200_OK), (LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN), ], ) def test_escalation_policy_list_permissions( make_organization_and_user_with_plugin_token, make_escalation_chain, make_escalation_policy, make_user_auth_headers, role, expected_status, ): organization, user, token = make_organization_and_user_with_plugin_token(role) escalation_chain = make_escalation_chain(organization) make_escalation_policy( escalation_chain, escalation_policy_step=EscalationPolicy.STEP_WAIT, wait_delay=EscalationPolicy.ONE_MINUTE ) client = APIClient() url = reverse("api-internal:escalation_policy-list") with patch( "apps.api.views.escalation_policy.EscalationPolicyView.list", return_value=Response( status=status.HTTP_200_OK, ), ): response = client.get(url, format="json", **make_user_auth_headers(user, token)) assert response.status_code == expected_status @pytest.mark.django_db @pytest.mark.parametrize( "role,expected_status", [ (LegacyAccessControlRole.ADMIN, status.HTTP_200_OK), (LegacyAccessControlRole.EDITOR, status.HTTP_200_OK), (LegacyAccessControlRole.VIEWER, status.HTTP_200_OK), (LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN), ], ) def test_escalation_policy_retrieve_permissions( make_organization_and_user_with_plugin_token, make_escalation_chain, make_escalation_policy, make_user_auth_headers, role, expected_status, ): organization, user, token = make_organization_and_user_with_plugin_token(role) escalation_chain = make_escalation_chain(organization) escalation_policy = make_escalation_policy( escalation_chain, escalation_policy_step=EscalationPolicy.STEP_WAIT, wait_delay=EscalationPolicy.ONE_MINUTE ) client = APIClient() url = reverse("api-internal:escalation_policy-detail", kwargs={"pk": escalation_policy.public_primary_key}) with patch( "apps.api.views.escalation_policy.EscalationPolicyView.retrieve", return_value=Response( status=status.HTTP_200_OK, ), ): response = client.get(url, format="json", **make_user_auth_headers(user, token)) assert response.status_code == expected_status @pytest.mark.django_db @pytest.mark.parametrize( "role,expected_status", [ (LegacyAccessControlRole.ADMIN, status.HTTP_204_NO_CONTENT), (LegacyAccessControlRole.EDITOR, status.HTTP_403_FORBIDDEN), (LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN), (LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN), ], ) def test_escalation_policy_delete_permissions( make_organization_and_user_with_plugin_token, make_escalation_chain, make_escalation_policy, make_user_auth_headers, role, expected_status, ): organization, user, token = make_organization_and_user_with_plugin_token(role) escalation_chain = make_escalation_chain(organization) escalation_policy = make_escalation_policy( escalation_chain, escalation_policy_step=EscalationPolicy.STEP_WAIT, wait_delay=EscalationPolicy.ONE_MINUTE ) client = APIClient() url = reverse("api-internal:escalation_policy-detail", kwargs={"pk": escalation_policy.public_primary_key}) with patch( "apps.api.views.escalation_policy.EscalationPolicyView.destroy", return_value=Response( status=status.HTTP_204_NO_CONTENT, ), ): response = client.delete(url, format="json", **make_user_auth_headers(user, token)) assert response.status_code == expected_status @pytest.mark.django_db @pytest.mark.parametrize( "role,expected_status", [ (LegacyAccessControlRole.ADMIN, status.HTTP_200_OK), (LegacyAccessControlRole.EDITOR, status.HTTP_200_OK), (LegacyAccessControlRole.VIEWER, status.HTTP_200_OK), (LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN), ], ) def test_escalation_policy_escalation_options_permissions( make_organization_and_user_with_plugin_token, make_escalation_chain, make_escalation_policy, make_user_auth_headers, role, expected_status, ): organization, user, token = make_organization_and_user_with_plugin_token(role) escalation_chain = make_escalation_chain(organization) make_escalation_policy( escalation_chain, escalation_policy_step=EscalationPolicy.STEP_WAIT, wait_delay=EscalationPolicy.ONE_MINUTE ) client = APIClient() url = reverse("api-internal:escalation_policy-escalation-options") with patch( "apps.api.views.escalation_policy.EscalationPolicyView.escalation_options", return_value=Response( status=status.HTTP_200_OK, ), ): response = client.get(url, format="json", **make_user_auth_headers(user, token)) assert response.status_code == expected_status @pytest.mark.django_db @pytest.mark.parametrize( "role,expected_status", [ (LegacyAccessControlRole.ADMIN, status.HTTP_200_OK), (LegacyAccessControlRole.EDITOR, status.HTTP_200_OK), (LegacyAccessControlRole.VIEWER, status.HTTP_200_OK), (LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN), ], ) def test_escalation_policy_delay_options_permissions( make_organization_and_user_with_plugin_token, make_escalation_chain, make_escalation_policy, make_user_auth_headers, role, expected_status, ): organization, user, token = make_organization_and_user_with_plugin_token(role) escalation_chain = make_escalation_chain(organization) make_escalation_policy( escalation_chain, escalation_policy_step=EscalationPolicy.STEP_WAIT, wait_delay=EscalationPolicy.ONE_MINUTE ) client = APIClient() url = reverse("api-internal:escalation_policy-delay-options") with patch( "apps.api.views.escalation_policy.EscalationPolicyView.delay_options", return_value=Response( status=status.HTTP_200_OK, ), ): response = client.get(url, format="json", **make_user_auth_headers(user, token)) assert response.status_code == expected_status @pytest.mark.django_db @pytest.mark.parametrize( "role,expected_status", [ (LegacyAccessControlRole.ADMIN, status.HTTP_200_OK), (LegacyAccessControlRole.EDITOR, status.HTTP_200_OK), (LegacyAccessControlRole.VIEWER, status.HTTP_200_OK), (LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN), ], ) def test_escalation_policy_move_to_position_permissions( make_organization_and_user_with_plugin_token, make_escalation_chain, make_escalation_policy, make_user_auth_headers, role, expected_status, ): organization, user, token = make_organization_and_user_with_plugin_token(role) escalation_chain = make_escalation_chain(organization) escalation_policy = make_escalation_policy( escalation_chain, escalation_policy_step=EscalationPolicy.STEP_WAIT, wait_delay=EscalationPolicy.ONE_MINUTE ) client = APIClient() url = reverse("api-internal:escalation_policy-detail", kwargs={"pk": escalation_policy.public_primary_key}) with patch( "apps.api.views.escalation_policy.EscalationPolicyView.move_to_position", return_value=Response( status=status.HTTP_200_OK, ), ): response = client.get(url, format="json", **make_user_auth_headers(user, token)) assert response.status_code == expected_status @pytest.mark.django_db @pytest.mark.parametrize( "important_step ,expected_default_step", [ (EscalationPolicy.STEP_NOTIFY_GROUP_IMPORTANT, EscalationPolicy.STEP_NOTIFY_GROUP), (EscalationPolicy.STEP_NOTIFY_SCHEDULE_IMPORTANT, EscalationPolicy.STEP_NOTIFY_SCHEDULE), (EscalationPolicy.STEP_NOTIFY_MULTIPLE_USERS_IMPORTANT, EscalationPolicy.STEP_NOTIFY_MULTIPLE_USERS), (EscalationPolicy.STEP_NOTIFY_USERS_QUEUE_IMPORTANT, EscalationPolicy.STEP_NOTIFY_USERS_QUEUE), ], ) def test_escalation_policy_maps_default_to_important( make_organization_and_user_with_plugin_token, make_user_auth_headers, make_escalation_chain, make_escalation_policy, important_step, expected_default_step, ): organization, user, token = make_organization_and_user_with_plugin_token() escalation_chain = make_escalation_chain(organization) escalation_policy = make_escalation_policy( escalation_chain, escalation_policy_step=important_step, ) client = APIClient() url = reverse("api-internal:escalation_policy-detail", kwargs={"pk": escalation_policy.public_primary_key}) response = client.get(url, format="json", **make_user_auth_headers(user, token)) assert response.json()["step"] == expected_default_step assert response.json()["important"] is True @pytest.mark.django_db @pytest.mark.parametrize( "default_step", [ EscalationPolicy.STEP_NOTIFY_GROUP, EscalationPolicy.STEP_NOTIFY_SCHEDULE, EscalationPolicy.STEP_NOTIFY_MULTIPLE_USERS, EscalationPolicy.STEP_NOTIFY_USERS_QUEUE, ], ) def test_escalation_policy_default_steps_stay_default( make_organization_and_user_with_plugin_token, make_escalation_chain, make_escalation_policy, default_step, make_user_auth_headers, ): organization, user, token = make_organization_and_user_with_plugin_token() escalation_chain = make_escalation_chain(organization) escalation_policy = make_escalation_policy( escalation_chain, escalation_policy_step=default_step, ) client = APIClient() url = reverse("api-internal:escalation_policy-detail", kwargs={"pk": escalation_policy.public_primary_key}) response = client.get(url, format="json", **make_user_auth_headers(user, token)) assert response.json()["step"] == default_step assert response.json()["important"] is False @pytest.mark.django_db @pytest.mark.parametrize( "default_step ,expected_important_step", [ (EscalationPolicy.STEP_NOTIFY_GROUP, EscalationPolicy.STEP_NOTIFY_GROUP_IMPORTANT), (EscalationPolicy.STEP_NOTIFY_SCHEDULE, EscalationPolicy.STEP_NOTIFY_SCHEDULE_IMPORTANT), (EscalationPolicy.STEP_NOTIFY_MULTIPLE_USERS, EscalationPolicy.STEP_NOTIFY_MULTIPLE_USERS_IMPORTANT), (EscalationPolicy.STEP_NOTIFY_USERS_QUEUE, EscalationPolicy.STEP_NOTIFY_USERS_QUEUE_IMPORTANT), ], ) def test_create_escalation_policy_important( make_organization_and_user_with_slack_identities, make_token_for_organization, make_escalation_chain, default_step, expected_important_step, make_user_auth_headers, ): organization, user, _, _ = make_organization_and_user_with_slack_identities() _, token = make_token_for_organization(organization) escalation_chain = make_escalation_chain(organization) client = APIClient() data_for_creation = { "escalation_chain": escalation_chain.public_primary_key, "step": default_step, "important": True, } url = reverse("api-internal:escalation_policy-list") response = client.post(url, data=data_for_creation, format="json", **make_user_auth_headers(user, token)) assert response.status_code == status.HTTP_201_CREATED public_primary_key = response.json()["id"] created_escalation_policy = EscalationPolicy.objects.get(public_primary_key=public_primary_key) assert created_escalation_policy.step == expected_important_step @pytest.mark.django_db @pytest.mark.parametrize( "default_step", [ EscalationPolicy.STEP_NOTIFY_GROUP, EscalationPolicy.STEP_NOTIFY_SCHEDULE, EscalationPolicy.STEP_NOTIFY_MULTIPLE_USERS, EscalationPolicy.STEP_NOTIFY_USERS_QUEUE, ], ) def test_create_escalation_policy_default( make_organization_and_user_with_slack_identities, make_token_for_organization, make_escalation_chain, default_step, make_user_auth_headers, ): organization, user, _, _ = make_organization_and_user_with_slack_identities() _, token = make_token_for_organization(organization) escalation_chain = make_escalation_chain(organization) client = APIClient() data_for_creation = { "escalation_chain": escalation_chain.public_primary_key, "step": default_step, "important": False, } url = reverse("api-internal:escalation_policy-list") response = client.post(url, data=data_for_creation, format="json", **make_user_auth_headers(user, token)) assert response.status_code == status.HTTP_201_CREATED public_primary_key = response.json()["id"] created_escalation_policy = EscalationPolicy.objects.get(public_primary_key=public_primary_key) assert created_escalation_policy.step == default_step @pytest.mark.django_db @pytest.mark.parametrize("step", EscalationPolicy.STEPS_WITH_NO_IMPORTANT_VERSION_SET) def test_create_escalation_policy_with_no_important_version( make_organization_and_user_with_slack_identities, make_token_for_organization, make_escalation_chain, step, make_user_auth_headers, settings, ): organization, user, _, _ = make_organization_and_user_with_slack_identities() # make sure declare incident step is enabled settings.FEATURE_DECLARE_INCIDENT_STEP_ENABLED = True organization.is_grafana_incident_enabled = True organization.save() _, token = make_token_for_organization(organization) escalation_chain = make_escalation_chain(organization) if step == EscalationPolicy.STEP_DECLARE_INCIDENT: # declare incident step is disabled return client = APIClient() data_for_creation = { "escalation_chain": escalation_chain.public_primary_key, "step": step, } url = reverse("api-internal:escalation_policy-list") response = client.post(url, data=data_for_creation, format="json", **make_user_auth_headers(user, token)) assert response.status_code == status.HTTP_201_CREATED public_primary_key = response.json()["id"] created_escalation_policy = EscalationPolicy.objects.get(public_primary_key=public_primary_key) assert created_escalation_policy.step == step @pytest.mark.django_db @pytest.mark.parametrize("step", EscalationPolicy.STEPS_WITH_NO_IMPORTANT_VERSION_SET) def test_escalation_policy_can_not_create_invalid_important_step( make_organization_and_user_with_slack_identities, make_token_for_organization, make_escalation_chain, step, make_user_auth_headers, ): organization, user, _, _ = make_organization_and_user_with_slack_identities() _, token = make_token_for_organization(organization) escalation_chain = make_escalation_chain(organization) client = APIClient() data_for_creation = {"escalation_chain": escalation_chain.public_primary_key, "step": step, "important": True} url = reverse("api-internal:escalation_policy-list") response = client.post(url, data=data_for_creation, format="json", **make_user_auth_headers(user, token)) assert response.status_code == status.HTTP_400_BAD_REQUEST @pytest.mark.django_db @pytest.mark.parametrize("step", EscalationPolicy.INTERNAL_API_STEPS) def test_escalation_policy_can_not_create_with_non_step_type_related_data( make_organization_and_user_with_slack_identities, make_token_for_organization, make_escalation_chain, step, make_user_auth_headers, ): organization, user, _, _ = make_organization_and_user_with_slack_identities() _, token = make_token_for_organization(organization) escalation_chain = make_escalation_chain(organization) client = APIClient() data_for_creation = { "escalation_chain": escalation_chain.public_primary_key, "step": step, "notify_to_users_queue": [user.public_primary_key], "wait_delay": "300.0", "from_time": "06:50:00", "to_time": "04:10:00", } url = reverse("api-internal:escalation_policy-list") response = client.post(url, data=data_for_creation, format="json", **make_user_auth_headers(user, token)) assert response.status_code == status.HTTP_400_BAD_REQUEST @pytest.mark.django_db @pytest.mark.parametrize( "step, related_fields", [ (EscalationPolicy.STEP_WAIT, ["wait_delay"]), (EscalationPolicy.STEP_FINAL_NOTIFYALL, []), (EscalationPolicy.STEP_FINAL_RESOLVE, []), (EscalationPolicy.STEP_NOTIFY_GROUP, ["notify_to_group"]), (EscalationPolicy.STEP_NOTIFY_SCHEDULE, ["notify_schedule"]), (EscalationPolicy.STEP_NOTIFY_USERS_QUEUE, ["notify_to_users_queue"]), (EscalationPolicy.STEP_NOTIFY_IF_TIME, ["from_time", "to_time"]), (EscalationPolicy.STEP_NOTIFY_MULTIPLE_USERS, ["notify_to_users_queue"]), (EscalationPolicy.STEP_TRIGGER_CUSTOM_WEBHOOK, ["custom_webhook"]), ], ) def test_escalation_policy_update_drop_non_step_type_related_data( make_organization_and_user_with_slack_identities, make_token_for_organization, make_escalation_chain, make_escalation_policy, step, related_fields, make_user_auth_headers, ): organization, user, _, _ = make_organization_and_user_with_slack_identities() _, token = make_token_for_organization(organization) escalation_chain = make_escalation_chain(organization) data_for_creation = { "wait_delay": timedelta(minutes=5), "from_time": "06:50:00", "to_time": "04:10:00", } escalation_policy = make_escalation_policy( escalation_chain=escalation_chain, escalation_policy_step=EscalationPolicy.STEP_WAIT, **data_for_creation ) escalation_policy.notify_to_users_queue.set([user]) data_for_update = {"step": step} fields_to_check = [ "wait_delay", "notify_schedule", "notify_to_users_queue", "notify_to_group", "notify_to_team_members", "from_time", "to_time", "custom_webhook", ] for f in related_fields: fields_to_check.remove(f) client = APIClient() url = reverse("api-internal:escalation_policy-detail", kwargs={"pk": escalation_policy.public_primary_key}) response = client.put(url, data=data_for_update, format="json", **make_user_auth_headers(user, token)) assert response.status_code == status.HTTP_200_OK escalation_policy.refresh_from_db() for f in fields_to_check: if f == "notify_to_users_queue": assert len(list(getattr(escalation_policy, f).all())) == 0 else: assert getattr(escalation_policy, f) is None @pytest.mark.django_db @pytest.mark.parametrize("step", EscalationPolicy.DEFAULT_STEPS_SET) def test_escalation_policy_switch_importance( make_organization_and_user_with_slack_identities, make_token_for_organization, make_escalation_chain, make_escalation_policy, step, make_user_auth_headers, ): organization, user, _, _ = make_organization_and_user_with_slack_identities() _, token = make_token_for_organization(organization) escalation_chain = make_escalation_chain(organization) escalation_policy = make_escalation_policy( escalation_chain=escalation_chain, escalation_policy_step=step, ) data_for_update = { "id": escalation_policy.public_primary_key, "step": escalation_policy.step, "escalation_chain": escalation_chain.public_primary_key, "notify_to_users_queue": [], "from_time": None, "to_time": None, "num_alerts_in_window": None, "num_minutes_in_window": None, "slack_integration_required": escalation_policy.slack_integration_required, "custom_webhook": None, "notify_schedule": None, "notify_to_group": None, "notify_to_team_members": None, "severity": None, "important": True, "wait_delay": None, } client = APIClient() url = reverse("api-internal:escalation_policy-detail", kwargs={"pk": escalation_policy.public_primary_key}) response = client.put(url, data=data_for_update, format="json", **make_user_auth_headers(user, token)) assert response.status_code == status.HTTP_200_OK assert response.json() == data_for_update @pytest.mark.django_db def test_escalation_policy_escalation_options_webhooks( make_organization_and_user_with_plugin_token, make_user_auth_headers, ): _, user, token = make_organization_and_user_with_plugin_token() client = APIClient() url = reverse("api-internal:escalation_policy-escalation-options") response = client.get(url, format="json", **make_user_auth_headers(user, token)) returned_options = [option["value"] for option in response.json()] assert EscalationPolicy.STEP_TRIGGER_CUSTOM_WEBHOOK in returned_options @pytest.mark.django_db def test_escalation_policy_severity_options( make_organization_and_user_with_plugin_token, make_user_auth_headers, ): organization, user, token = make_organization_and_user_with_plugin_token() organization.is_grafana_labels_enabled = False organization.save() client = APIClient() url = reverse("api-internal:escalation_policy-severity-options") # without labels enabled available_severities = [ {"severityID": "abc", "orgID": "1", "displayLabel": "Pending", "level": -1}, {"severityID": "def", "orgID": "1", "displayLabel": "Critical", "level": 1}, ] with patch("common.incident_api.client.IncidentAPIClient.get_severities") as mock_get_severities: mock_get_severities.return_value = available_severities, None response = client.get(url, format="json", **make_user_auth_headers(user, token)) expected_options = [{"value": s["displayLabel"], "display_name": s["displayLabel"]} for s in available_severities] assert response.json() == expected_options # failing request does not break; fallback to default option only with patch("common.incident_api.client.IncidentAPIClient.get_severities") as mock_get_severities: mock_get_severities.side_effect = IncidentAPIException(status=404, url="some-url") response = client.get(url, format="json", **make_user_auth_headers(user, token)) fallback_options = [{"value": DEFAULT_INCIDENT_SEVERITY, "display_name": DEFAULT_INCIDENT_SEVERITY}] assert response.json() == fallback_options # labels enabled organization.is_grafana_labels_enabled = True organization.save() with patch("common.incident_api.client.IncidentAPIClient.get_severities") as mock_get_severities: mock_get_severities.return_value = available_severities, None response = client.get(url, format="json", **make_user_auth_headers(user, token)) # include set from label option expected_options = [ { "value": EscalationPolicy.SEVERITY_SET_FROM_LABEL, "display_name": EscalationPolicy.SEVERITY_SET_FROM_LABEL_DISPLAY_VALUE, } ] + expected_options assert response.json() == expected_options @pytest.mark.django_db def test_create_escalation_policy_declare_incident( escalation_policy_internal_api_setup, make_user_auth_headers, settings ): token, escalation_chain, _, user, _ = escalation_policy_internal_api_setup organization = escalation_chain.organization client = APIClient() url = reverse("api-internal:escalation_policy-list") data = { "step": EscalationPolicy.STEP_DECLARE_INCIDENT, "severity": "critical", "escalation_chain": escalation_chain.public_primary_key, } response = client.post(url, data, format="json", **make_user_auth_headers(user, token)) assert response.status_code == status.HTTP_400_BAD_REQUEST # make sure declare incident step is enabled settings.FEATURE_DECLARE_INCIDENT_STEP_ENABLED = True organization.is_grafana_incident_enabled = True organization.save() response = client.post(url, data, format="json", **make_user_auth_headers(user, token)) assert response.status_code == status.HTTP_201_CREATED escalation_policy = EscalationPolicy.objects.get(public_primary_key=response.data["id"]) assert escalation_policy.step == EscalationPolicy.STEP_DECLARE_INCIDENT assert escalation_policy.severity == "critical" url = reverse("api-internal:escalation_policy-detail", kwargs={"pk": escalation_policy.public_primary_key}) response = client.get(url, format="json", **make_user_auth_headers(user, token)) response_data = response.json() assert response_data["step"] == EscalationPolicy.STEP_DECLARE_INCIDENT assert response_data["severity"] == "critical"