2022-06-03 08:09:47 -06:00
|
|
|
from unittest.mock import patch
|
|
|
|
|
|
|
|
|
|
import pytest
|
|
|
|
|
from django.db.models import Max
|
|
|
|
|
from django.urls import reverse
|
|
|
|
|
from django.utils.timezone import timedelta
|
|
|
|
|
from rest_framework import status
|
|
|
|
|
from rest_framework.response import Response
|
|
|
|
|
from rest_framework.test import APIClient
|
|
|
|
|
|
|
|
|
|
from apps.alerts.models import EscalationPolicy
|
2022-11-29 09:41:56 +01:00
|
|
|
from apps.api.permissions import LegacyAccessControlRole
|
Reworked declare incident escalation step (#5130)
Reworked https://github.com/grafana/oncall/pull/5047. Main update is the
switch from FK to a [M2M
relation](https://docs.google.com/document/d/1HeulqxoFShSHtInQrZNJLL5MDlHPNT50rVGaK3zZWvw/edit?disco=AAABVLjV4W8)
(which doesn't really change the original/intended behavior, besides not
needing to alter the alert group table, and it is a bit more flexible;
the extra table shouldn't introduce issues because this is used only for
tracking purposes and the information needed in the log record is
already there).
Avoid a db migration involving alert group table:
```
--
-- Create model RelatedIncident
--
CREATE TABLE `alerts_relatedincident` (`id` bigint AUTO_INCREMENT NOT NULL PRIMARY KEY, `incident_id` varchar(50) NOT NULL, `created_at` datetime(6) NOT NULL, `is_active` bool NOT NULL, `channel_filter_id` bigint NULL, `organization_id` bigint NOT NULL);
CREATE TABLE `alerts_relatedincident_attached_alert_groups` (`id` bigint AUTO_INCREMENT NOT NULL PRIMARY KEY, `relatedincident_id` bigint NOT NULL, `alertgroup_id` bigint NOT NULL);
ALTER TABLE `alerts_relatedincident` ADD CONSTRAINT `alerts_relatedincident_organization_id_incident_id_d7fc9a4f_uniq` UNIQUE (`organization_id`, `incident_id`);
ALTER TABLE `alerts_relatedincident` ADD CONSTRAINT `alerts_relatedincide_channel_filter_id_9556c836_fk_alerts_ch` FOREIGN KEY (`channel_filter_id`) REFERENCES `alerts_channelfilter` (`id`);
ALTER TABLE `alerts_relatedincident` ADD CONSTRAINT `alerts_relatedincide_organization_id_74ed6bed_fk_user_mana` FOREIGN KEY (`organization_id`) REFERENCES `user_management_organization` (`id`);
CREATE INDEX `alerts_relatedincident_incident_id_8356a799` ON `alerts_relatedincident` (`incident_id`);
ALTER TABLE `alerts_relatedincident_attached_alert_groups` ADD CONSTRAINT `alerts_relatedincident_a_relatedincident_id_alert_3d683baa_uniq` UNIQUE (`relatedincident_id`, `alertgroup_id`);
ALTER TABLE `alerts_relatedincident_attached_alert_groups` ADD CONSTRAINT `alerts_relatedincide_relatedincident_id_3e5e7a23_fk_alerts_re` FOREIGN KEY (`relatedincident_id`) REFERENCES `alerts_relatedincident` (`id`);
ALTER TABLE `alerts_relatedincident_attached_alert_groups` ADD CONSTRAINT `alerts_relatedincide_alertgroup_id_0125deca_fk_alerts_al` FOREIGN KEY (`alertgroup_id`) REFERENCES `alerts_alertgroup` (`id`);
```
2024-10-07 16:26:10 -03:00
|
|
|
from common.incident_api.client import DEFAULT_INCIDENT_SEVERITY, IncidentAPIException
|
2022-06-03 08:09:47 -06:00
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture()
|
|
|
|
|
def escalation_policy_internal_api_setup(
|
|
|
|
|
make_organization_and_user_with_plugin_token,
|
|
|
|
|
make_escalation_chain,
|
|
|
|
|
make_user_for_organization,
|
|
|
|
|
make_escalation_policy,
|
|
|
|
|
):
|
|
|
|
|
organization, first_user, token = make_organization_and_user_with_plugin_token()
|
|
|
|
|
second_user = make_user_for_organization(organization)
|
|
|
|
|
|
|
|
|
|
escalation_chain = make_escalation_chain(organization)
|
|
|
|
|
escalation_policy = make_escalation_policy(
|
|
|
|
|
escalation_chain=escalation_chain,
|
|
|
|
|
escalation_policy_step=EscalationPolicy.STEP_WAIT,
|
|
|
|
|
wait_delay=EscalationPolicy.ONE_MINUTE,
|
|
|
|
|
)
|
|
|
|
|
return token, escalation_chain, escalation_policy, first_user, second_user
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.django_db
|
|
|
|
|
def test_create_escalation_policy(escalation_policy_internal_api_setup, make_user_auth_headers):
|
|
|
|
|
token, escalation_chain, _, user, _ = escalation_policy_internal_api_setup
|
|
|
|
|
client = APIClient()
|
|
|
|
|
url = reverse("api-internal:escalation_policy-list")
|
|
|
|
|
|
|
|
|
|
data = {
|
|
|
|
|
"step": EscalationPolicy.STEP_WAIT,
|
2024-06-03 11:47:42 +01:00
|
|
|
"wait_delay": "60.0",
|
2022-06-03 08:09:47 -06:00
|
|
|
"escalation_chain": escalation_chain.public_primary_key,
|
|
|
|
|
"notify_to_users_queue": [],
|
|
|
|
|
"from_time": None,
|
|
|
|
|
"to_time": None,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
max_order = EscalationPolicy.objects.filter(escalation_chain=escalation_chain).aggregate(maxorder=Max("order"))[
|
|
|
|
|
"maxorder"
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
response = client.post(url, data, format="json", **make_user_auth_headers(user, token))
|
|
|
|
|
assert response.status_code == status.HTTP_201_CREATED
|
2023-07-18 18:17:53 +01:00
|
|
|
assert EscalationPolicy.objects.get(public_primary_key=response.data["id"]).order == max_order + 1
|
2022-06-03 08:09:47 -06:00
|
|
|
|
|
|
|
|
|
2024-06-03 14:06:47 +01:00
|
|
|
@pytest.mark.django_db
|
|
|
|
|
@pytest.mark.parametrize("wait_delay", (timedelta(seconds=59), timedelta(hours=24, seconds=1)))
|
|
|
|
|
def test_create_escalation_policy_wait_delay_invalid(
|
|
|
|
|
escalation_policy_internal_api_setup, make_user_auth_headers, wait_delay
|
|
|
|
|
):
|
|
|
|
|
token, escalation_chain, _, user, _ = escalation_policy_internal_api_setup
|
|
|
|
|
client = APIClient()
|
|
|
|
|
url = reverse("api-internal:escalation_policy-list")
|
|
|
|
|
|
|
|
|
|
data = {
|
|
|
|
|
"step": EscalationPolicy.STEP_WAIT,
|
|
|
|
|
"wait_delay": str(wait_delay.total_seconds()),
|
|
|
|
|
"escalation_chain": escalation_chain.public_primary_key,
|
|
|
|
|
"notify_to_users_queue": [],
|
|
|
|
|
"from_time": None,
|
|
|
|
|
"to_time": None,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
response = client.post(url, data, format="json", **make_user_auth_headers(user, token))
|
|
|
|
|
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
|
|
|
|
|
|
|
|
|
|
2023-04-05 09:03:55 -03:00
|
|
|
@pytest.mark.django_db
|
|
|
|
|
def test_create_escalation_policy_webhook(
|
|
|
|
|
escalation_policy_internal_api_setup, make_custom_webhook, make_user_auth_headers
|
|
|
|
|
):
|
|
|
|
|
token, escalation_chain, _, user, _ = escalation_policy_internal_api_setup
|
|
|
|
|
client = APIClient()
|
|
|
|
|
url = reverse("api-internal:escalation_policy-list")
|
|
|
|
|
|
|
|
|
|
webhook = make_custom_webhook(organization=user.organization)
|
|
|
|
|
data = {
|
|
|
|
|
"step": EscalationPolicy.STEP_TRIGGER_CUSTOM_WEBHOOK,
|
|
|
|
|
"escalation_chain": escalation_chain.public_primary_key,
|
|
|
|
|
"custom_webhook": webhook.public_primary_key,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
max_order = EscalationPolicy.objects.filter(escalation_chain=escalation_chain).aggregate(maxorder=Max("order"))[
|
|
|
|
|
"maxorder"
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
response = client.post(url, data, format="json", **make_user_auth_headers(user, token))
|
|
|
|
|
assert response.status_code == status.HTTP_201_CREATED
|
|
|
|
|
assert response.data["custom_webhook"] == webhook.public_primary_key
|
|
|
|
|
escalation_policy = EscalationPolicy.objects.get(public_primary_key=response.data["id"])
|
2023-07-18 18:17:53 +01:00
|
|
|
assert escalation_policy.order == max_order + 1
|
2023-04-05 09:03:55 -03:00
|
|
|
assert escalation_policy.custom_webhook == webhook
|
|
|
|
|
|
|
|
|
|
|
2022-06-03 08:09:47 -06:00
|
|
|
@pytest.mark.django_db
|
|
|
|
|
def test_update_notify_multiple_users_step(escalation_policy_internal_api_setup, make_user_auth_headers):
|
|
|
|
|
token, _, escalation_policy, first_user, second_user = escalation_policy_internal_api_setup
|
|
|
|
|
client = APIClient()
|
|
|
|
|
url = reverse("api-internal:escalation_policy-detail", kwargs={"pk": escalation_policy.public_primary_key})
|
|
|
|
|
|
|
|
|
|
data = {
|
|
|
|
|
"step": EscalationPolicy.STEP_NOTIFY_MULTIPLE_USERS,
|
|
|
|
|
"notify_to_users_queue": [first_user.public_primary_key, second_user.public_primary_key],
|
|
|
|
|
}
|
|
|
|
|
response = client.put(url, data, format="json", **make_user_auth_headers(first_user, token))
|
|
|
|
|
|
|
|
|
|
assert response.status_code == status.HTTP_200_OK
|
|
|
|
|
assert response.json()["step"] == EscalationPolicy.STEP_NOTIFY_MULTIPLE_USERS
|
2023-07-12 22:41:44 +02:00
|
|
|
assert sorted(response.json()["notify_to_users_queue"]) == sorted(
|
|
|
|
|
[first_user.public_primary_key, second_user.public_primary_key]
|
|
|
|
|
)
|
2022-06-03 08:09:47 -06:00
|
|
|
|
|
|
|
|
|
2024-02-20 10:02:23 -03:00
|
|
|
@pytest.mark.django_db
|
|
|
|
|
def test_manage_escalation_policy_notify_team(escalation_policy_internal_api_setup, make_team, make_user_auth_headers):
|
|
|
|
|
token, escalation_chain, _, user, _ = escalation_policy_internal_api_setup
|
|
|
|
|
client = APIClient()
|
|
|
|
|
url = reverse("api-internal:escalation_policy-list")
|
|
|
|
|
|
|
|
|
|
team = make_team(organization=user.organization)
|
|
|
|
|
data = {
|
|
|
|
|
"step": EscalationPolicy.STEP_NOTIFY_TEAM_MEMBERS,
|
|
|
|
|
"escalation_chain": escalation_chain.public_primary_key,
|
|
|
|
|
"notify_to_team_members": team.public_primary_key,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
max_order = EscalationPolicy.objects.filter(escalation_chain=escalation_chain).aggregate(maxorder=Max("order"))[
|
|
|
|
|
"maxorder"
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
response = client.post(url, data, format="json", **make_user_auth_headers(user, token))
|
|
|
|
|
assert response.status_code == status.HTTP_201_CREATED
|
|
|
|
|
assert response.data["notify_to_team_members"] == team.public_primary_key
|
|
|
|
|
escalation_policy = EscalationPolicy.objects.get(public_primary_key=response.data["id"])
|
|
|
|
|
assert escalation_policy.order == max_order + 1
|
|
|
|
|
assert escalation_policy.notify_to_team_members == team
|
|
|
|
|
|
|
|
|
|
# update team in policy
|
|
|
|
|
url = reverse("api-internal:escalation_policy-detail", kwargs={"pk": escalation_policy.public_primary_key})
|
|
|
|
|
another_team = make_team(organization=user.organization)
|
|
|
|
|
data = {
|
|
|
|
|
"step": EscalationPolicy.STEP_NOTIFY_TEAM_MEMBERS,
|
|
|
|
|
"notify_to_team_members": another_team.public_primary_key,
|
|
|
|
|
}
|
|
|
|
|
response = client.put(url, data, format="json", **make_user_auth_headers(user, token))
|
|
|
|
|
assert response.status_code == status.HTTP_200_OK
|
|
|
|
|
assert response.json()["step"] == EscalationPolicy.STEP_NOTIFY_TEAM_MEMBERS
|
|
|
|
|
assert response.json()["notify_to_team_members"] == another_team.public_primary_key
|
|
|
|
|
|
|
|
|
|
|
2022-06-03 08:09:47 -06:00
|
|
|
@pytest.mark.django_db
|
|
|
|
|
def test_move_to_position(escalation_policy_internal_api_setup, make_user_auth_headers):
|
|
|
|
|
token, _, escalation_policy, user, _ = escalation_policy_internal_api_setup
|
|
|
|
|
client = APIClient()
|
|
|
|
|
|
2023-07-18 18:17:53 +01:00
|
|
|
position_to_move = 0
|
2022-06-03 08:09:47 -06:00
|
|
|
url = reverse(
|
|
|
|
|
"api-internal:escalation_policy-move-to-position", kwargs={"pk": escalation_policy.public_primary_key}
|
|
|
|
|
)
|
|
|
|
|
response = client.put(
|
|
|
|
|
f"{url}?position={position_to_move}", content_type="application/json", **make_user_auth_headers(user, token)
|
|
|
|
|
)
|
|
|
|
|
escalation_policy.refresh_from_db()
|
|
|
|
|
assert response.status_code == status.HTTP_200_OK
|
|
|
|
|
assert escalation_policy.order == position_to_move
|
|
|
|
|
|
|
|
|
|
|
2023-07-18 18:17:53 +01:00
|
|
|
@pytest.mark.django_db
|
|
|
|
|
def test_move_to_position_invalid_index(escalation_policy_internal_api_setup, make_user_auth_headers):
|
|
|
|
|
token, _, escalation_policy, user, _ = escalation_policy_internal_api_setup
|
|
|
|
|
client = APIClient()
|
|
|
|
|
|
|
|
|
|
position_to_move = 1
|
|
|
|
|
url = reverse(
|
|
|
|
|
"api-internal:escalation_policy-move-to-position", kwargs={"pk": escalation_policy.public_primary_key}
|
|
|
|
|
)
|
|
|
|
|
response = client.put(
|
|
|
|
|
f"{url}?position={position_to_move}", content_type="application/json", **make_user_auth_headers(user, token)
|
|
|
|
|
)
|
|
|
|
|
escalation_policy.refresh_from_db()
|
|
|
|
|
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
|
|
|
|
|
|
|
|
|
|
2022-06-03 08:09:47 -06:00
|
|
|
@pytest.mark.django_db
|
|
|
|
|
@pytest.mark.parametrize(
|
|
|
|
|
"role,expected_status",
|
|
|
|
|
[
|
2022-11-29 09:41:56 +01:00
|
|
|
(LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
|
|
|
|
|
(LegacyAccessControlRole.EDITOR, status.HTTP_403_FORBIDDEN),
|
|
|
|
|
(LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN),
|
2023-10-19 14:39:08 -03:00
|
|
|
(LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN),
|
2022-06-03 08:09:47 -06:00
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
def test_escalation_policy_create_permissions(
|
|
|
|
|
make_organization_and_user_with_plugin_token,
|
|
|
|
|
make_escalation_chain,
|
|
|
|
|
make_escalation_policy,
|
|
|
|
|
make_user_auth_headers,
|
|
|
|
|
role,
|
|
|
|
|
expected_status,
|
|
|
|
|
):
|
|
|
|
|
organization, user, token = make_organization_and_user_with_plugin_token(role)
|
|
|
|
|
escalation_chain = make_escalation_chain(organization)
|
|
|
|
|
make_escalation_policy(
|
|
|
|
|
escalation_chain, escalation_policy_step=EscalationPolicy.STEP_WAIT, wait_delay=EscalationPolicy.ONE_MINUTE
|
|
|
|
|
)
|
|
|
|
|
client = APIClient()
|
|
|
|
|
|
|
|
|
|
url = reverse("api-internal:escalation_policy-list")
|
|
|
|
|
|
|
|
|
|
with patch(
|
|
|
|
|
"apps.api.views.escalation_policy.EscalationPolicyView.create",
|
|
|
|
|
return_value=Response(
|
|
|
|
|
status=status.HTTP_200_OK,
|
|
|
|
|
),
|
|
|
|
|
):
|
|
|
|
|
response = client.post(url, format="json", **make_user_auth_headers(user, token))
|
|
|
|
|
|
|
|
|
|
assert response.status_code == expected_status
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.django_db
|
|
|
|
|
@pytest.mark.parametrize(
|
|
|
|
|
"role,expected_status",
|
|
|
|
|
[
|
2022-11-29 09:41:56 +01:00
|
|
|
(LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
|
|
|
|
|
(LegacyAccessControlRole.EDITOR, status.HTTP_403_FORBIDDEN),
|
|
|
|
|
(LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN),
|
2023-10-19 14:39:08 -03:00
|
|
|
(LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN),
|
2022-06-03 08:09:47 -06:00
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
def test_escalation_policy_update_permissions(
|
|
|
|
|
make_organization_and_user_with_plugin_token,
|
|
|
|
|
make_escalation_chain,
|
|
|
|
|
make_escalation_policy,
|
|
|
|
|
make_user_auth_headers,
|
|
|
|
|
role,
|
|
|
|
|
expected_status,
|
|
|
|
|
):
|
|
|
|
|
organization, user, token = make_organization_and_user_with_plugin_token(role)
|
|
|
|
|
escalation_chain = make_escalation_chain(organization)
|
|
|
|
|
escalation_policy = make_escalation_policy(
|
|
|
|
|
escalation_chain, escalation_policy_step=EscalationPolicy.STEP_WAIT, wait_delay=EscalationPolicy.ONE_MINUTE
|
|
|
|
|
)
|
|
|
|
|
client = APIClient()
|
|
|
|
|
|
|
|
|
|
url = reverse("api-internal:escalation_policy-detail", kwargs={"pk": escalation_policy.public_primary_key})
|
|
|
|
|
|
|
|
|
|
with patch(
|
|
|
|
|
"apps.api.views.escalation_policy.EscalationPolicyView.update",
|
|
|
|
|
return_value=Response(
|
|
|
|
|
status=status.HTTP_200_OK,
|
|
|
|
|
),
|
|
|
|
|
):
|
|
|
|
|
response = client.put(url, format="json", **make_user_auth_headers(user, token))
|
|
|
|
|
|
|
|
|
|
assert response.status_code == expected_status
|
|
|
|
|
|
|
|
|
|
response = client.patch(url, format="json", **make_user_auth_headers(user, token))
|
|
|
|
|
|
|
|
|
|
assert response.status_code == expected_status
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.django_db
|
|
|
|
|
@pytest.mark.parametrize(
|
|
|
|
|
"role,expected_status",
|
|
|
|
|
[
|
2022-11-29 09:41:56 +01:00
|
|
|
(LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
|
|
|
|
|
(LegacyAccessControlRole.EDITOR, status.HTTP_200_OK),
|
|
|
|
|
(LegacyAccessControlRole.VIEWER, status.HTTP_200_OK),
|
2023-10-19 14:39:08 -03:00
|
|
|
(LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN),
|
2022-06-03 08:09:47 -06:00
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
def test_escalation_policy_list_permissions(
|
|
|
|
|
make_organization_and_user_with_plugin_token,
|
|
|
|
|
make_escalation_chain,
|
|
|
|
|
make_escalation_policy,
|
|
|
|
|
make_user_auth_headers,
|
|
|
|
|
role,
|
|
|
|
|
expected_status,
|
|
|
|
|
):
|
|
|
|
|
organization, user, token = make_organization_and_user_with_plugin_token(role)
|
|
|
|
|
escalation_chain = make_escalation_chain(organization)
|
|
|
|
|
make_escalation_policy(
|
|
|
|
|
escalation_chain, escalation_policy_step=EscalationPolicy.STEP_WAIT, wait_delay=EscalationPolicy.ONE_MINUTE
|
|
|
|
|
)
|
|
|
|
|
client = APIClient()
|
|
|
|
|
|
|
|
|
|
url = reverse("api-internal:escalation_policy-list")
|
|
|
|
|
|
|
|
|
|
with patch(
|
|
|
|
|
"apps.api.views.escalation_policy.EscalationPolicyView.list",
|
|
|
|
|
return_value=Response(
|
|
|
|
|
status=status.HTTP_200_OK,
|
|
|
|
|
),
|
|
|
|
|
):
|
|
|
|
|
response = client.get(url, format="json", **make_user_auth_headers(user, token))
|
|
|
|
|
|
|
|
|
|
assert response.status_code == expected_status
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.django_db
|
|
|
|
|
@pytest.mark.parametrize(
|
|
|
|
|
"role,expected_status",
|
|
|
|
|
[
|
2022-11-29 09:41:56 +01:00
|
|
|
(LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
|
|
|
|
|
(LegacyAccessControlRole.EDITOR, status.HTTP_200_OK),
|
|
|
|
|
(LegacyAccessControlRole.VIEWER, status.HTTP_200_OK),
|
2023-10-19 14:39:08 -03:00
|
|
|
(LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN),
|
2022-06-03 08:09:47 -06:00
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
def test_escalation_policy_retrieve_permissions(
|
|
|
|
|
make_organization_and_user_with_plugin_token,
|
|
|
|
|
make_escalation_chain,
|
|
|
|
|
make_escalation_policy,
|
|
|
|
|
make_user_auth_headers,
|
|
|
|
|
role,
|
|
|
|
|
expected_status,
|
|
|
|
|
):
|
|
|
|
|
organization, user, token = make_organization_and_user_with_plugin_token(role)
|
|
|
|
|
escalation_chain = make_escalation_chain(organization)
|
|
|
|
|
escalation_policy = make_escalation_policy(
|
|
|
|
|
escalation_chain, escalation_policy_step=EscalationPolicy.STEP_WAIT, wait_delay=EscalationPolicy.ONE_MINUTE
|
|
|
|
|
)
|
|
|
|
|
client = APIClient()
|
|
|
|
|
|
|
|
|
|
url = reverse("api-internal:escalation_policy-detail", kwargs={"pk": escalation_policy.public_primary_key})
|
|
|
|
|
|
|
|
|
|
with patch(
|
|
|
|
|
"apps.api.views.escalation_policy.EscalationPolicyView.retrieve",
|
|
|
|
|
return_value=Response(
|
|
|
|
|
status=status.HTTP_200_OK,
|
|
|
|
|
),
|
|
|
|
|
):
|
|
|
|
|
response = client.get(url, format="json", **make_user_auth_headers(user, token))
|
|
|
|
|
|
|
|
|
|
assert response.status_code == expected_status
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.django_db
|
|
|
|
|
@pytest.mark.parametrize(
|
|
|
|
|
"role,expected_status",
|
|
|
|
|
[
|
2022-11-29 09:41:56 +01:00
|
|
|
(LegacyAccessControlRole.ADMIN, status.HTTP_204_NO_CONTENT),
|
|
|
|
|
(LegacyAccessControlRole.EDITOR, status.HTTP_403_FORBIDDEN),
|
|
|
|
|
(LegacyAccessControlRole.VIEWER, status.HTTP_403_FORBIDDEN),
|
2023-10-19 14:39:08 -03:00
|
|
|
(LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN),
|
2022-06-03 08:09:47 -06:00
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
def test_escalation_policy_delete_permissions(
|
|
|
|
|
make_organization_and_user_with_plugin_token,
|
|
|
|
|
make_escalation_chain,
|
|
|
|
|
make_escalation_policy,
|
|
|
|
|
make_user_auth_headers,
|
|
|
|
|
role,
|
|
|
|
|
expected_status,
|
|
|
|
|
):
|
|
|
|
|
organization, user, token = make_organization_and_user_with_plugin_token(role)
|
|
|
|
|
escalation_chain = make_escalation_chain(organization)
|
|
|
|
|
escalation_policy = make_escalation_policy(
|
|
|
|
|
escalation_chain, escalation_policy_step=EscalationPolicy.STEP_WAIT, wait_delay=EscalationPolicy.ONE_MINUTE
|
|
|
|
|
)
|
|
|
|
|
client = APIClient()
|
|
|
|
|
|
|
|
|
|
url = reverse("api-internal:escalation_policy-detail", kwargs={"pk": escalation_policy.public_primary_key})
|
|
|
|
|
|
|
|
|
|
with patch(
|
|
|
|
|
"apps.api.views.escalation_policy.EscalationPolicyView.destroy",
|
|
|
|
|
return_value=Response(
|
|
|
|
|
status=status.HTTP_204_NO_CONTENT,
|
|
|
|
|
),
|
|
|
|
|
):
|
|
|
|
|
response = client.delete(url, format="json", **make_user_auth_headers(user, token))
|
|
|
|
|
|
|
|
|
|
assert response.status_code == expected_status
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.django_db
|
|
|
|
|
@pytest.mark.parametrize(
|
|
|
|
|
"role,expected_status",
|
|
|
|
|
[
|
2022-11-29 09:41:56 +01:00
|
|
|
(LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
|
|
|
|
|
(LegacyAccessControlRole.EDITOR, status.HTTP_200_OK),
|
|
|
|
|
(LegacyAccessControlRole.VIEWER, status.HTTP_200_OK),
|
2023-10-19 14:39:08 -03:00
|
|
|
(LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN),
|
2022-06-03 08:09:47 -06:00
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
def test_escalation_policy_escalation_options_permissions(
|
|
|
|
|
make_organization_and_user_with_plugin_token,
|
|
|
|
|
make_escalation_chain,
|
|
|
|
|
make_escalation_policy,
|
|
|
|
|
make_user_auth_headers,
|
|
|
|
|
role,
|
|
|
|
|
expected_status,
|
|
|
|
|
):
|
|
|
|
|
organization, user, token = make_organization_and_user_with_plugin_token(role)
|
|
|
|
|
escalation_chain = make_escalation_chain(organization)
|
|
|
|
|
make_escalation_policy(
|
|
|
|
|
escalation_chain, escalation_policy_step=EscalationPolicy.STEP_WAIT, wait_delay=EscalationPolicy.ONE_MINUTE
|
|
|
|
|
)
|
|
|
|
|
client = APIClient()
|
|
|
|
|
|
|
|
|
|
url = reverse("api-internal:escalation_policy-escalation-options")
|
|
|
|
|
|
|
|
|
|
with patch(
|
|
|
|
|
"apps.api.views.escalation_policy.EscalationPolicyView.escalation_options",
|
|
|
|
|
return_value=Response(
|
|
|
|
|
status=status.HTTP_200_OK,
|
|
|
|
|
),
|
|
|
|
|
):
|
|
|
|
|
response = client.get(url, format="json", **make_user_auth_headers(user, token))
|
|
|
|
|
|
|
|
|
|
assert response.status_code == expected_status
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.django_db
|
|
|
|
|
@pytest.mark.parametrize(
|
|
|
|
|
"role,expected_status",
|
|
|
|
|
[
|
2022-11-29 09:41:56 +01:00
|
|
|
(LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
|
|
|
|
|
(LegacyAccessControlRole.EDITOR, status.HTTP_200_OK),
|
|
|
|
|
(LegacyAccessControlRole.VIEWER, status.HTTP_200_OK),
|
2023-10-19 14:39:08 -03:00
|
|
|
(LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN),
|
2022-06-03 08:09:47 -06:00
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
def test_escalation_policy_delay_options_permissions(
|
|
|
|
|
make_organization_and_user_with_plugin_token,
|
|
|
|
|
make_escalation_chain,
|
|
|
|
|
make_escalation_policy,
|
|
|
|
|
make_user_auth_headers,
|
|
|
|
|
role,
|
|
|
|
|
expected_status,
|
|
|
|
|
):
|
|
|
|
|
organization, user, token = make_organization_and_user_with_plugin_token(role)
|
|
|
|
|
|
|
|
|
|
escalation_chain = make_escalation_chain(organization)
|
|
|
|
|
make_escalation_policy(
|
|
|
|
|
escalation_chain, escalation_policy_step=EscalationPolicy.STEP_WAIT, wait_delay=EscalationPolicy.ONE_MINUTE
|
|
|
|
|
)
|
|
|
|
|
client = APIClient()
|
|
|
|
|
|
|
|
|
|
url = reverse("api-internal:escalation_policy-delay-options")
|
|
|
|
|
|
|
|
|
|
with patch(
|
|
|
|
|
"apps.api.views.escalation_policy.EscalationPolicyView.delay_options",
|
|
|
|
|
return_value=Response(
|
|
|
|
|
status=status.HTTP_200_OK,
|
|
|
|
|
),
|
|
|
|
|
):
|
|
|
|
|
response = client.get(url, format="json", **make_user_auth_headers(user, token))
|
|
|
|
|
|
|
|
|
|
assert response.status_code == expected_status
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.django_db
|
|
|
|
|
@pytest.mark.parametrize(
|
|
|
|
|
"role,expected_status",
|
|
|
|
|
[
|
2022-11-29 09:41:56 +01:00
|
|
|
(LegacyAccessControlRole.ADMIN, status.HTTP_200_OK),
|
|
|
|
|
(LegacyAccessControlRole.EDITOR, status.HTTP_200_OK),
|
|
|
|
|
(LegacyAccessControlRole.VIEWER, status.HTTP_200_OK),
|
2023-10-19 14:39:08 -03:00
|
|
|
(LegacyAccessControlRole.NONE, status.HTTP_403_FORBIDDEN),
|
2022-06-03 08:09:47 -06:00
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
def test_escalation_policy_move_to_position_permissions(
|
|
|
|
|
make_organization_and_user_with_plugin_token,
|
|
|
|
|
make_escalation_chain,
|
|
|
|
|
make_escalation_policy,
|
|
|
|
|
make_user_auth_headers,
|
|
|
|
|
role,
|
|
|
|
|
expected_status,
|
|
|
|
|
):
|
|
|
|
|
organization, user, token = make_organization_and_user_with_plugin_token(role)
|
|
|
|
|
|
|
|
|
|
escalation_chain = make_escalation_chain(organization)
|
|
|
|
|
escalation_policy = make_escalation_policy(
|
|
|
|
|
escalation_chain, escalation_policy_step=EscalationPolicy.STEP_WAIT, wait_delay=EscalationPolicy.ONE_MINUTE
|
|
|
|
|
)
|
|
|
|
|
client = APIClient()
|
|
|
|
|
|
|
|
|
|
url = reverse("api-internal:escalation_policy-detail", kwargs={"pk": escalation_policy.public_primary_key})
|
|
|
|
|
|
|
|
|
|
with patch(
|
|
|
|
|
"apps.api.views.escalation_policy.EscalationPolicyView.move_to_position",
|
|
|
|
|
return_value=Response(
|
|
|
|
|
status=status.HTTP_200_OK,
|
|
|
|
|
),
|
|
|
|
|
):
|
|
|
|
|
response = client.get(url, format="json", **make_user_auth_headers(user, token))
|
|
|
|
|
|
|
|
|
|
assert response.status_code == expected_status
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.django_db
|
|
|
|
|
@pytest.mark.parametrize(
|
|
|
|
|
"important_step ,expected_default_step",
|
|
|
|
|
[
|
|
|
|
|
(EscalationPolicy.STEP_NOTIFY_GROUP_IMPORTANT, EscalationPolicy.STEP_NOTIFY_GROUP),
|
|
|
|
|
(EscalationPolicy.STEP_NOTIFY_SCHEDULE_IMPORTANT, EscalationPolicy.STEP_NOTIFY_SCHEDULE),
|
|
|
|
|
(EscalationPolicy.STEP_NOTIFY_MULTIPLE_USERS_IMPORTANT, EscalationPolicy.STEP_NOTIFY_MULTIPLE_USERS),
|
2025-01-21 17:29:36 +01:00
|
|
|
(EscalationPolicy.STEP_NOTIFY_USERS_QUEUE_IMPORTANT, EscalationPolicy.STEP_NOTIFY_USERS_QUEUE),
|
2022-06-03 08:09:47 -06:00
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
def test_escalation_policy_maps_default_to_important(
|
|
|
|
|
make_organization_and_user_with_plugin_token,
|
|
|
|
|
make_user_auth_headers,
|
|
|
|
|
make_escalation_chain,
|
|
|
|
|
make_escalation_policy,
|
|
|
|
|
important_step,
|
|
|
|
|
expected_default_step,
|
|
|
|
|
):
|
|
|
|
|
organization, user, token = make_organization_and_user_with_plugin_token()
|
|
|
|
|
|
|
|
|
|
escalation_chain = make_escalation_chain(organization)
|
|
|
|
|
escalation_policy = make_escalation_policy(
|
|
|
|
|
escalation_chain,
|
|
|
|
|
escalation_policy_step=important_step,
|
|
|
|
|
)
|
|
|
|
|
client = APIClient()
|
|
|
|
|
|
|
|
|
|
url = reverse("api-internal:escalation_policy-detail", kwargs={"pk": escalation_policy.public_primary_key})
|
|
|
|
|
|
|
|
|
|
response = client.get(url, format="json", **make_user_auth_headers(user, token))
|
|
|
|
|
|
|
|
|
|
assert response.json()["step"] == expected_default_step
|
|
|
|
|
assert response.json()["important"] is True
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.django_db
|
|
|
|
|
@pytest.mark.parametrize(
|
|
|
|
|
"default_step",
|
|
|
|
|
[
|
|
|
|
|
EscalationPolicy.STEP_NOTIFY_GROUP,
|
|
|
|
|
EscalationPolicy.STEP_NOTIFY_SCHEDULE,
|
|
|
|
|
EscalationPolicy.STEP_NOTIFY_MULTIPLE_USERS,
|
2025-01-21 17:29:36 +01:00
|
|
|
EscalationPolicy.STEP_NOTIFY_USERS_QUEUE,
|
2022-06-03 08:09:47 -06:00
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
def test_escalation_policy_default_steps_stay_default(
|
|
|
|
|
make_organization_and_user_with_plugin_token,
|
|
|
|
|
make_escalation_chain,
|
|
|
|
|
make_escalation_policy,
|
|
|
|
|
default_step,
|
|
|
|
|
make_user_auth_headers,
|
|
|
|
|
):
|
|
|
|
|
organization, user, token = make_organization_and_user_with_plugin_token()
|
|
|
|
|
|
|
|
|
|
escalation_chain = make_escalation_chain(organization)
|
|
|
|
|
escalation_policy = make_escalation_policy(
|
|
|
|
|
escalation_chain,
|
|
|
|
|
escalation_policy_step=default_step,
|
|
|
|
|
)
|
|
|
|
|
client = APIClient()
|
|
|
|
|
|
|
|
|
|
url = reverse("api-internal:escalation_policy-detail", kwargs={"pk": escalation_policy.public_primary_key})
|
|
|
|
|
|
|
|
|
|
response = client.get(url, format="json", **make_user_auth_headers(user, token))
|
|
|
|
|
|
|
|
|
|
assert response.json()["step"] == default_step
|
|
|
|
|
assert response.json()["important"] is False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.django_db
|
|
|
|
|
@pytest.mark.parametrize(
|
|
|
|
|
"default_step ,expected_important_step",
|
|
|
|
|
[
|
|
|
|
|
(EscalationPolicy.STEP_NOTIFY_GROUP, EscalationPolicy.STEP_NOTIFY_GROUP_IMPORTANT),
|
|
|
|
|
(EscalationPolicy.STEP_NOTIFY_SCHEDULE, EscalationPolicy.STEP_NOTIFY_SCHEDULE_IMPORTANT),
|
|
|
|
|
(EscalationPolicy.STEP_NOTIFY_MULTIPLE_USERS, EscalationPolicy.STEP_NOTIFY_MULTIPLE_USERS_IMPORTANT),
|
2025-01-21 17:29:36 +01:00
|
|
|
(EscalationPolicy.STEP_NOTIFY_USERS_QUEUE, EscalationPolicy.STEP_NOTIFY_USERS_QUEUE_IMPORTANT),
|
2022-06-03 08:09:47 -06:00
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
def test_create_escalation_policy_important(
|
|
|
|
|
make_organization_and_user_with_slack_identities,
|
|
|
|
|
make_token_for_organization,
|
|
|
|
|
make_escalation_chain,
|
|
|
|
|
default_step,
|
|
|
|
|
expected_important_step,
|
|
|
|
|
make_user_auth_headers,
|
|
|
|
|
):
|
|
|
|
|
organization, user, _, _ = make_organization_and_user_with_slack_identities()
|
|
|
|
|
_, token = make_token_for_organization(organization)
|
|
|
|
|
escalation_chain = make_escalation_chain(organization)
|
|
|
|
|
|
|
|
|
|
client = APIClient()
|
|
|
|
|
data_for_creation = {
|
|
|
|
|
"escalation_chain": escalation_chain.public_primary_key,
|
|
|
|
|
"step": default_step,
|
|
|
|
|
"important": True,
|
|
|
|
|
}
|
|
|
|
|
url = reverse("api-internal:escalation_policy-list")
|
|
|
|
|
|
|
|
|
|
response = client.post(url, data=data_for_creation, format="json", **make_user_auth_headers(user, token))
|
|
|
|
|
|
|
|
|
|
assert response.status_code == status.HTTP_201_CREATED
|
|
|
|
|
public_primary_key = response.json()["id"]
|
|
|
|
|
created_escalation_policy = EscalationPolicy.objects.get(public_primary_key=public_primary_key)
|
|
|
|
|
assert created_escalation_policy.step == expected_important_step
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.django_db
|
|
|
|
|
@pytest.mark.parametrize(
|
|
|
|
|
"default_step",
|
|
|
|
|
[
|
|
|
|
|
EscalationPolicy.STEP_NOTIFY_GROUP,
|
|
|
|
|
EscalationPolicy.STEP_NOTIFY_SCHEDULE,
|
|
|
|
|
EscalationPolicy.STEP_NOTIFY_MULTIPLE_USERS,
|
2025-01-21 17:29:36 +01:00
|
|
|
EscalationPolicy.STEP_NOTIFY_USERS_QUEUE,
|
2022-06-03 08:09:47 -06:00
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
def test_create_escalation_policy_default(
|
|
|
|
|
make_organization_and_user_with_slack_identities,
|
|
|
|
|
make_token_for_organization,
|
|
|
|
|
make_escalation_chain,
|
|
|
|
|
default_step,
|
|
|
|
|
make_user_auth_headers,
|
|
|
|
|
):
|
|
|
|
|
organization, user, _, _ = make_organization_and_user_with_slack_identities()
|
|
|
|
|
_, token = make_token_for_organization(organization)
|
|
|
|
|
escalation_chain = make_escalation_chain(organization)
|
|
|
|
|
|
|
|
|
|
client = APIClient()
|
|
|
|
|
data_for_creation = {
|
|
|
|
|
"escalation_chain": escalation_chain.public_primary_key,
|
|
|
|
|
"step": default_step,
|
|
|
|
|
"important": False,
|
|
|
|
|
}
|
|
|
|
|
url = reverse("api-internal:escalation_policy-list")
|
|
|
|
|
|
|
|
|
|
response = client.post(url, data=data_for_creation, format="json", **make_user_auth_headers(user, token))
|
|
|
|
|
|
|
|
|
|
assert response.status_code == status.HTTP_201_CREATED
|
|
|
|
|
public_primary_key = response.json()["id"]
|
|
|
|
|
created_escalation_policy = EscalationPolicy.objects.get(public_primary_key=public_primary_key)
|
|
|
|
|
assert created_escalation_policy.step == default_step
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.django_db
|
|
|
|
|
@pytest.mark.parametrize("step", EscalationPolicy.STEPS_WITH_NO_IMPORTANT_VERSION_SET)
|
|
|
|
|
def test_create_escalation_policy_with_no_important_version(
|
|
|
|
|
make_organization_and_user_with_slack_identities,
|
|
|
|
|
make_token_for_organization,
|
|
|
|
|
make_escalation_chain,
|
|
|
|
|
step,
|
|
|
|
|
make_user_auth_headers,
|
Reworked declare incident escalation step (#5130)
Reworked https://github.com/grafana/oncall/pull/5047. Main update is the
switch from FK to a [M2M
relation](https://docs.google.com/document/d/1HeulqxoFShSHtInQrZNJLL5MDlHPNT50rVGaK3zZWvw/edit?disco=AAABVLjV4W8)
(which doesn't really change the original/intended behavior, besides not
needing to alter the alert group table, and it is a bit more flexible;
the extra table shouldn't introduce issues because this is used only for
tracking purposes and the information needed in the log record is
already there).
Avoid a db migration involving alert group table:
```
--
-- Create model RelatedIncident
--
CREATE TABLE `alerts_relatedincident` (`id` bigint AUTO_INCREMENT NOT NULL PRIMARY KEY, `incident_id` varchar(50) NOT NULL, `created_at` datetime(6) NOT NULL, `is_active` bool NOT NULL, `channel_filter_id` bigint NULL, `organization_id` bigint NOT NULL);
CREATE TABLE `alerts_relatedincident_attached_alert_groups` (`id` bigint AUTO_INCREMENT NOT NULL PRIMARY KEY, `relatedincident_id` bigint NOT NULL, `alertgroup_id` bigint NOT NULL);
ALTER TABLE `alerts_relatedincident` ADD CONSTRAINT `alerts_relatedincident_organization_id_incident_id_d7fc9a4f_uniq` UNIQUE (`organization_id`, `incident_id`);
ALTER TABLE `alerts_relatedincident` ADD CONSTRAINT `alerts_relatedincide_channel_filter_id_9556c836_fk_alerts_ch` FOREIGN KEY (`channel_filter_id`) REFERENCES `alerts_channelfilter` (`id`);
ALTER TABLE `alerts_relatedincident` ADD CONSTRAINT `alerts_relatedincide_organization_id_74ed6bed_fk_user_mana` FOREIGN KEY (`organization_id`) REFERENCES `user_management_organization` (`id`);
CREATE INDEX `alerts_relatedincident_incident_id_8356a799` ON `alerts_relatedincident` (`incident_id`);
ALTER TABLE `alerts_relatedincident_attached_alert_groups` ADD CONSTRAINT `alerts_relatedincident_a_relatedincident_id_alert_3d683baa_uniq` UNIQUE (`relatedincident_id`, `alertgroup_id`);
ALTER TABLE `alerts_relatedincident_attached_alert_groups` ADD CONSTRAINT `alerts_relatedincide_relatedincident_id_3e5e7a23_fk_alerts_re` FOREIGN KEY (`relatedincident_id`) REFERENCES `alerts_relatedincident` (`id`);
ALTER TABLE `alerts_relatedincident_attached_alert_groups` ADD CONSTRAINT `alerts_relatedincide_alertgroup_id_0125deca_fk_alerts_al` FOREIGN KEY (`alertgroup_id`) REFERENCES `alerts_alertgroup` (`id`);
```
2024-10-07 16:26:10 -03:00
|
|
|
settings,
|
2022-06-03 08:09:47 -06:00
|
|
|
):
|
|
|
|
|
organization, user, _, _ = make_organization_and_user_with_slack_identities()
|
Reworked declare incident escalation step (#5130)
Reworked https://github.com/grafana/oncall/pull/5047. Main update is the
switch from FK to a [M2M
relation](https://docs.google.com/document/d/1HeulqxoFShSHtInQrZNJLL5MDlHPNT50rVGaK3zZWvw/edit?disco=AAABVLjV4W8)
(which doesn't really change the original/intended behavior, besides not
needing to alter the alert group table, and it is a bit more flexible;
the extra table shouldn't introduce issues because this is used only for
tracking purposes and the information needed in the log record is
already there).
Avoid a db migration involving alert group table:
```
--
-- Create model RelatedIncident
--
CREATE TABLE `alerts_relatedincident` (`id` bigint AUTO_INCREMENT NOT NULL PRIMARY KEY, `incident_id` varchar(50) NOT NULL, `created_at` datetime(6) NOT NULL, `is_active` bool NOT NULL, `channel_filter_id` bigint NULL, `organization_id` bigint NOT NULL);
CREATE TABLE `alerts_relatedincident_attached_alert_groups` (`id` bigint AUTO_INCREMENT NOT NULL PRIMARY KEY, `relatedincident_id` bigint NOT NULL, `alertgroup_id` bigint NOT NULL);
ALTER TABLE `alerts_relatedincident` ADD CONSTRAINT `alerts_relatedincident_organization_id_incident_id_d7fc9a4f_uniq` UNIQUE (`organization_id`, `incident_id`);
ALTER TABLE `alerts_relatedincident` ADD CONSTRAINT `alerts_relatedincide_channel_filter_id_9556c836_fk_alerts_ch` FOREIGN KEY (`channel_filter_id`) REFERENCES `alerts_channelfilter` (`id`);
ALTER TABLE `alerts_relatedincident` ADD CONSTRAINT `alerts_relatedincide_organization_id_74ed6bed_fk_user_mana` FOREIGN KEY (`organization_id`) REFERENCES `user_management_organization` (`id`);
CREATE INDEX `alerts_relatedincident_incident_id_8356a799` ON `alerts_relatedincident` (`incident_id`);
ALTER TABLE `alerts_relatedincident_attached_alert_groups` ADD CONSTRAINT `alerts_relatedincident_a_relatedincident_id_alert_3d683baa_uniq` UNIQUE (`relatedincident_id`, `alertgroup_id`);
ALTER TABLE `alerts_relatedincident_attached_alert_groups` ADD CONSTRAINT `alerts_relatedincide_relatedincident_id_3e5e7a23_fk_alerts_re` FOREIGN KEY (`relatedincident_id`) REFERENCES `alerts_relatedincident` (`id`);
ALTER TABLE `alerts_relatedincident_attached_alert_groups` ADD CONSTRAINT `alerts_relatedincide_alertgroup_id_0125deca_fk_alerts_al` FOREIGN KEY (`alertgroup_id`) REFERENCES `alerts_alertgroup` (`id`);
```
2024-10-07 16:26:10 -03:00
|
|
|
# make sure declare incident step is enabled
|
|
|
|
|
settings.FEATURE_DECLARE_INCIDENT_STEP_ENABLED = True
|
|
|
|
|
organization.is_grafana_incident_enabled = True
|
|
|
|
|
organization.save()
|
2022-06-03 08:09:47 -06:00
|
|
|
_, token = make_token_for_organization(organization)
|
|
|
|
|
escalation_chain = make_escalation_chain(organization)
|
|
|
|
|
|
2024-10-03 13:51:40 -03:00
|
|
|
if step == EscalationPolicy.STEP_DECLARE_INCIDENT:
|
|
|
|
|
# declare incident step is disabled
|
|
|
|
|
return
|
|
|
|
|
|
2022-06-03 08:09:47 -06:00
|
|
|
client = APIClient()
|
|
|
|
|
data_for_creation = {
|
|
|
|
|
"escalation_chain": escalation_chain.public_primary_key,
|
|
|
|
|
"step": step,
|
|
|
|
|
}
|
|
|
|
|
url = reverse("api-internal:escalation_policy-list")
|
|
|
|
|
|
|
|
|
|
response = client.post(url, data=data_for_creation, format="json", **make_user_auth_headers(user, token))
|
|
|
|
|
|
|
|
|
|
assert response.status_code == status.HTTP_201_CREATED
|
|
|
|
|
public_primary_key = response.json()["id"]
|
|
|
|
|
created_escalation_policy = EscalationPolicy.objects.get(public_primary_key=public_primary_key)
|
|
|
|
|
assert created_escalation_policy.step == step
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.django_db
|
|
|
|
|
@pytest.mark.parametrize("step", EscalationPolicy.STEPS_WITH_NO_IMPORTANT_VERSION_SET)
|
|
|
|
|
def test_escalation_policy_can_not_create_invalid_important_step(
|
|
|
|
|
make_organization_and_user_with_slack_identities,
|
|
|
|
|
make_token_for_organization,
|
|
|
|
|
make_escalation_chain,
|
|
|
|
|
step,
|
|
|
|
|
make_user_auth_headers,
|
|
|
|
|
):
|
|
|
|
|
organization, user, _, _ = make_organization_and_user_with_slack_identities()
|
|
|
|
|
_, token = make_token_for_organization(organization)
|
|
|
|
|
escalation_chain = make_escalation_chain(organization)
|
|
|
|
|
|
|
|
|
|
client = APIClient()
|
|
|
|
|
data_for_creation = {"escalation_chain": escalation_chain.public_primary_key, "step": step, "important": True}
|
|
|
|
|
url = reverse("api-internal:escalation_policy-list")
|
|
|
|
|
|
|
|
|
|
response = client.post(url, data=data_for_creation, format="json", **make_user_auth_headers(user, token))
|
|
|
|
|
|
|
|
|
|
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.django_db
|
|
|
|
|
@pytest.mark.parametrize("step", EscalationPolicy.INTERNAL_API_STEPS)
|
|
|
|
|
def test_escalation_policy_can_not_create_with_non_step_type_related_data(
|
|
|
|
|
make_organization_and_user_with_slack_identities,
|
|
|
|
|
make_token_for_organization,
|
|
|
|
|
make_escalation_chain,
|
|
|
|
|
step,
|
|
|
|
|
make_user_auth_headers,
|
|
|
|
|
):
|
|
|
|
|
organization, user, _, _ = make_organization_and_user_with_slack_identities()
|
|
|
|
|
_, token = make_token_for_organization(organization)
|
|
|
|
|
|
|
|
|
|
escalation_chain = make_escalation_chain(organization)
|
|
|
|
|
|
|
|
|
|
client = APIClient()
|
|
|
|
|
data_for_creation = {
|
|
|
|
|
"escalation_chain": escalation_chain.public_primary_key,
|
|
|
|
|
"step": step,
|
|
|
|
|
"notify_to_users_queue": [user.public_primary_key],
|
2024-06-03 11:47:42 +01:00
|
|
|
"wait_delay": "300.0",
|
2022-06-03 08:09:47 -06:00
|
|
|
"from_time": "06:50:00",
|
|
|
|
|
"to_time": "04:10:00",
|
|
|
|
|
}
|
|
|
|
|
url = reverse("api-internal:escalation_policy-list")
|
|
|
|
|
|
|
|
|
|
response = client.post(url, data=data_for_creation, format="json", **make_user_auth_headers(user, token))
|
|
|
|
|
|
|
|
|
|
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.django_db
|
|
|
|
|
@pytest.mark.parametrize(
|
|
|
|
|
"step, related_fields",
|
|
|
|
|
[
|
|
|
|
|
(EscalationPolicy.STEP_WAIT, ["wait_delay"]),
|
|
|
|
|
(EscalationPolicy.STEP_FINAL_NOTIFYALL, []),
|
|
|
|
|
(EscalationPolicy.STEP_FINAL_RESOLVE, []),
|
|
|
|
|
(EscalationPolicy.STEP_NOTIFY_GROUP, ["notify_to_group"]),
|
|
|
|
|
(EscalationPolicy.STEP_NOTIFY_SCHEDULE, ["notify_schedule"]),
|
|
|
|
|
(EscalationPolicy.STEP_NOTIFY_USERS_QUEUE, ["notify_to_users_queue"]),
|
|
|
|
|
(EscalationPolicy.STEP_NOTIFY_IF_TIME, ["from_time", "to_time"]),
|
|
|
|
|
(EscalationPolicy.STEP_NOTIFY_MULTIPLE_USERS, ["notify_to_users_queue"]),
|
2023-04-05 09:03:55 -03:00
|
|
|
(EscalationPolicy.STEP_TRIGGER_CUSTOM_WEBHOOK, ["custom_webhook"]),
|
2022-06-03 08:09:47 -06:00
|
|
|
],
|
|
|
|
|
)
|
|
|
|
|
def test_escalation_policy_update_drop_non_step_type_related_data(
|
|
|
|
|
make_organization_and_user_with_slack_identities,
|
|
|
|
|
make_token_for_organization,
|
|
|
|
|
make_escalation_chain,
|
|
|
|
|
make_escalation_policy,
|
|
|
|
|
step,
|
|
|
|
|
related_fields,
|
|
|
|
|
make_user_auth_headers,
|
|
|
|
|
):
|
|
|
|
|
organization, user, _, _ = make_organization_and_user_with_slack_identities()
|
|
|
|
|
_, token = make_token_for_organization(organization)
|
|
|
|
|
|
|
|
|
|
escalation_chain = make_escalation_chain(organization)
|
|
|
|
|
|
|
|
|
|
data_for_creation = {
|
|
|
|
|
"wait_delay": timedelta(minutes=5),
|
|
|
|
|
"from_time": "06:50:00",
|
|
|
|
|
"to_time": "04:10:00",
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
escalation_policy = make_escalation_policy(
|
|
|
|
|
escalation_chain=escalation_chain, escalation_policy_step=EscalationPolicy.STEP_WAIT, **data_for_creation
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
escalation_policy.notify_to_users_queue.set([user])
|
|
|
|
|
|
|
|
|
|
data_for_update = {"step": step}
|
|
|
|
|
|
|
|
|
|
fields_to_check = [
|
|
|
|
|
"wait_delay",
|
|
|
|
|
"notify_schedule",
|
|
|
|
|
"notify_to_users_queue",
|
|
|
|
|
"notify_to_group",
|
2024-02-20 10:02:23 -03:00
|
|
|
"notify_to_team_members",
|
2022-06-03 08:09:47 -06:00
|
|
|
"from_time",
|
|
|
|
|
"to_time",
|
2023-04-05 09:03:55 -03:00
|
|
|
"custom_webhook",
|
2022-06-03 08:09:47 -06:00
|
|
|
]
|
|
|
|
|
for f in related_fields:
|
|
|
|
|
fields_to_check.remove(f)
|
|
|
|
|
|
|
|
|
|
client = APIClient()
|
|
|
|
|
|
|
|
|
|
url = reverse("api-internal:escalation_policy-detail", kwargs={"pk": escalation_policy.public_primary_key})
|
|
|
|
|
|
|
|
|
|
response = client.put(url, data=data_for_update, format="json", **make_user_auth_headers(user, token))
|
|
|
|
|
|
|
|
|
|
assert response.status_code == status.HTTP_200_OK
|
|
|
|
|
|
|
|
|
|
escalation_policy.refresh_from_db()
|
|
|
|
|
|
|
|
|
|
for f in fields_to_check:
|
|
|
|
|
if f == "notify_to_users_queue":
|
|
|
|
|
assert len(list(getattr(escalation_policy, f).all())) == 0
|
|
|
|
|
else:
|
|
|
|
|
assert getattr(escalation_policy, f) is None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.django_db
|
|
|
|
|
@pytest.mark.parametrize("step", EscalationPolicy.DEFAULT_STEPS_SET)
|
|
|
|
|
def test_escalation_policy_switch_importance(
|
|
|
|
|
make_organization_and_user_with_slack_identities,
|
|
|
|
|
make_token_for_organization,
|
|
|
|
|
make_escalation_chain,
|
|
|
|
|
make_escalation_policy,
|
|
|
|
|
step,
|
|
|
|
|
make_user_auth_headers,
|
|
|
|
|
):
|
|
|
|
|
organization, user, _, _ = make_organization_and_user_with_slack_identities()
|
|
|
|
|
_, token = make_token_for_organization(organization)
|
|
|
|
|
escalation_chain = make_escalation_chain(organization)
|
|
|
|
|
|
|
|
|
|
escalation_policy = make_escalation_policy(
|
|
|
|
|
escalation_chain=escalation_chain,
|
|
|
|
|
escalation_policy_step=step,
|
|
|
|
|
)
|
|
|
|
|
data_for_update = {
|
|
|
|
|
"id": escalation_policy.public_primary_key,
|
|
|
|
|
"step": escalation_policy.step,
|
|
|
|
|
"escalation_chain": escalation_chain.public_primary_key,
|
|
|
|
|
"notify_to_users_queue": [],
|
|
|
|
|
"from_time": None,
|
|
|
|
|
"to_time": None,
|
|
|
|
|
"num_alerts_in_window": None,
|
|
|
|
|
"num_minutes_in_window": None,
|
|
|
|
|
"slack_integration_required": escalation_policy.slack_integration_required,
|
2023-04-05 09:03:55 -03:00
|
|
|
"custom_webhook": None,
|
2022-06-03 08:09:47 -06:00
|
|
|
"notify_schedule": None,
|
|
|
|
|
"notify_to_group": None,
|
2024-02-20 10:02:23 -03:00
|
|
|
"notify_to_team_members": None,
|
Reworked declare incident escalation step (#5130)
Reworked https://github.com/grafana/oncall/pull/5047. Main update is the
switch from FK to a [M2M
relation](https://docs.google.com/document/d/1HeulqxoFShSHtInQrZNJLL5MDlHPNT50rVGaK3zZWvw/edit?disco=AAABVLjV4W8)
(which doesn't really change the original/intended behavior, besides not
needing to alter the alert group table, and it is a bit more flexible;
the extra table shouldn't introduce issues because this is used only for
tracking purposes and the information needed in the log record is
already there).
Avoid a db migration involving alert group table:
```
--
-- Create model RelatedIncident
--
CREATE TABLE `alerts_relatedincident` (`id` bigint AUTO_INCREMENT NOT NULL PRIMARY KEY, `incident_id` varchar(50) NOT NULL, `created_at` datetime(6) NOT NULL, `is_active` bool NOT NULL, `channel_filter_id` bigint NULL, `organization_id` bigint NOT NULL);
CREATE TABLE `alerts_relatedincident_attached_alert_groups` (`id` bigint AUTO_INCREMENT NOT NULL PRIMARY KEY, `relatedincident_id` bigint NOT NULL, `alertgroup_id` bigint NOT NULL);
ALTER TABLE `alerts_relatedincident` ADD CONSTRAINT `alerts_relatedincident_organization_id_incident_id_d7fc9a4f_uniq` UNIQUE (`organization_id`, `incident_id`);
ALTER TABLE `alerts_relatedincident` ADD CONSTRAINT `alerts_relatedincide_channel_filter_id_9556c836_fk_alerts_ch` FOREIGN KEY (`channel_filter_id`) REFERENCES `alerts_channelfilter` (`id`);
ALTER TABLE `alerts_relatedincident` ADD CONSTRAINT `alerts_relatedincide_organization_id_74ed6bed_fk_user_mana` FOREIGN KEY (`organization_id`) REFERENCES `user_management_organization` (`id`);
CREATE INDEX `alerts_relatedincident_incident_id_8356a799` ON `alerts_relatedincident` (`incident_id`);
ALTER TABLE `alerts_relatedincident_attached_alert_groups` ADD CONSTRAINT `alerts_relatedincident_a_relatedincident_id_alert_3d683baa_uniq` UNIQUE (`relatedincident_id`, `alertgroup_id`);
ALTER TABLE `alerts_relatedincident_attached_alert_groups` ADD CONSTRAINT `alerts_relatedincide_relatedincident_id_3e5e7a23_fk_alerts_re` FOREIGN KEY (`relatedincident_id`) REFERENCES `alerts_relatedincident` (`id`);
ALTER TABLE `alerts_relatedincident_attached_alert_groups` ADD CONSTRAINT `alerts_relatedincide_alertgroup_id_0125deca_fk_alerts_al` FOREIGN KEY (`alertgroup_id`) REFERENCES `alerts_alertgroup` (`id`);
```
2024-10-07 16:26:10 -03:00
|
|
|
"severity": None,
|
2022-06-03 08:09:47 -06:00
|
|
|
"important": True,
|
|
|
|
|
"wait_delay": None,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
client = APIClient()
|
|
|
|
|
|
|
|
|
|
url = reverse("api-internal:escalation_policy-detail", kwargs={"pk": escalation_policy.public_primary_key})
|
|
|
|
|
|
|
|
|
|
response = client.put(url, data=data_for_update, format="json", **make_user_auth_headers(user, token))
|
|
|
|
|
|
|
|
|
|
assert response.status_code == status.HTTP_200_OK
|
|
|
|
|
|
|
|
|
|
assert response.json() == data_for_update
|
|
|
|
|
|
|
|
|
|
|
2023-04-05 09:03:55 -03:00
|
|
|
@pytest.mark.django_db
|
|
|
|
|
def test_escalation_policy_escalation_options_webhooks(
|
|
|
|
|
make_organization_and_user_with_plugin_token,
|
|
|
|
|
make_user_auth_headers,
|
|
|
|
|
):
|
|
|
|
|
_, user, token = make_organization_and_user_with_plugin_token()
|
|
|
|
|
client = APIClient()
|
|
|
|
|
|
|
|
|
|
url = reverse("api-internal:escalation_policy-escalation-options")
|
|
|
|
|
|
2023-07-13 13:53:06 -06:00
|
|
|
response = client.get(url, format="json", **make_user_auth_headers(user, token))
|
2023-04-05 09:03:55 -03:00
|
|
|
|
|
|
|
|
returned_options = [option["value"] for option in response.json()]
|
2023-08-02 20:07:47 +02:00
|
|
|
|
|
|
|
|
assert EscalationPolicy.STEP_TRIGGER_CUSTOM_WEBHOOK in returned_options
|
Reworked declare incident escalation step (#5130)
Reworked https://github.com/grafana/oncall/pull/5047. Main update is the
switch from FK to a [M2M
relation](https://docs.google.com/document/d/1HeulqxoFShSHtInQrZNJLL5MDlHPNT50rVGaK3zZWvw/edit?disco=AAABVLjV4W8)
(which doesn't really change the original/intended behavior, besides not
needing to alter the alert group table, and it is a bit more flexible;
the extra table shouldn't introduce issues because this is used only for
tracking purposes and the information needed in the log record is
already there).
Avoid a db migration involving alert group table:
```
--
-- Create model RelatedIncident
--
CREATE TABLE `alerts_relatedincident` (`id` bigint AUTO_INCREMENT NOT NULL PRIMARY KEY, `incident_id` varchar(50) NOT NULL, `created_at` datetime(6) NOT NULL, `is_active` bool NOT NULL, `channel_filter_id` bigint NULL, `organization_id` bigint NOT NULL);
CREATE TABLE `alerts_relatedincident_attached_alert_groups` (`id` bigint AUTO_INCREMENT NOT NULL PRIMARY KEY, `relatedincident_id` bigint NOT NULL, `alertgroup_id` bigint NOT NULL);
ALTER TABLE `alerts_relatedincident` ADD CONSTRAINT `alerts_relatedincident_organization_id_incident_id_d7fc9a4f_uniq` UNIQUE (`organization_id`, `incident_id`);
ALTER TABLE `alerts_relatedincident` ADD CONSTRAINT `alerts_relatedincide_channel_filter_id_9556c836_fk_alerts_ch` FOREIGN KEY (`channel_filter_id`) REFERENCES `alerts_channelfilter` (`id`);
ALTER TABLE `alerts_relatedincident` ADD CONSTRAINT `alerts_relatedincide_organization_id_74ed6bed_fk_user_mana` FOREIGN KEY (`organization_id`) REFERENCES `user_management_organization` (`id`);
CREATE INDEX `alerts_relatedincident_incident_id_8356a799` ON `alerts_relatedincident` (`incident_id`);
ALTER TABLE `alerts_relatedincident_attached_alert_groups` ADD CONSTRAINT `alerts_relatedincident_a_relatedincident_id_alert_3d683baa_uniq` UNIQUE (`relatedincident_id`, `alertgroup_id`);
ALTER TABLE `alerts_relatedincident_attached_alert_groups` ADD CONSTRAINT `alerts_relatedincide_relatedincident_id_3e5e7a23_fk_alerts_re` FOREIGN KEY (`relatedincident_id`) REFERENCES `alerts_relatedincident` (`id`);
ALTER TABLE `alerts_relatedincident_attached_alert_groups` ADD CONSTRAINT `alerts_relatedincide_alertgroup_id_0125deca_fk_alerts_al` FOREIGN KEY (`alertgroup_id`) REFERENCES `alerts_alertgroup` (`id`);
```
2024-10-07 16:26:10 -03:00
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.django_db
|
|
|
|
|
def test_escalation_policy_severity_options(
|
|
|
|
|
make_organization_and_user_with_plugin_token,
|
|
|
|
|
make_user_auth_headers,
|
|
|
|
|
):
|
|
|
|
|
organization, user, token = make_organization_and_user_with_plugin_token()
|
|
|
|
|
organization.is_grafana_labels_enabled = False
|
|
|
|
|
organization.save()
|
|
|
|
|
|
|
|
|
|
client = APIClient()
|
|
|
|
|
url = reverse("api-internal:escalation_policy-severity-options")
|
|
|
|
|
|
|
|
|
|
# without labels enabled
|
|
|
|
|
available_severities = [
|
|
|
|
|
{"severityID": "abc", "orgID": "1", "displayLabel": "Pending", "level": -1},
|
|
|
|
|
{"severityID": "def", "orgID": "1", "displayLabel": "Critical", "level": 1},
|
|
|
|
|
]
|
|
|
|
|
with patch("common.incident_api.client.IncidentAPIClient.get_severities") as mock_get_severities:
|
|
|
|
|
mock_get_severities.return_value = available_severities, None
|
|
|
|
|
response = client.get(url, format="json", **make_user_auth_headers(user, token))
|
|
|
|
|
|
|
|
|
|
expected_options = [{"value": s["displayLabel"], "display_name": s["displayLabel"]} for s in available_severities]
|
|
|
|
|
assert response.json() == expected_options
|
|
|
|
|
|
|
|
|
|
# failing request does not break; fallback to default option only
|
|
|
|
|
with patch("common.incident_api.client.IncidentAPIClient.get_severities") as mock_get_severities:
|
|
|
|
|
mock_get_severities.side_effect = IncidentAPIException(status=404, url="some-url")
|
|
|
|
|
response = client.get(url, format="json", **make_user_auth_headers(user, token))
|
|
|
|
|
|
|
|
|
|
fallback_options = [{"value": DEFAULT_INCIDENT_SEVERITY, "display_name": DEFAULT_INCIDENT_SEVERITY}]
|
|
|
|
|
assert response.json() == fallback_options
|
|
|
|
|
|
|
|
|
|
# labels enabled
|
|
|
|
|
organization.is_grafana_labels_enabled = True
|
|
|
|
|
organization.save()
|
|
|
|
|
|
|
|
|
|
with patch("common.incident_api.client.IncidentAPIClient.get_severities") as mock_get_severities:
|
|
|
|
|
mock_get_severities.return_value = available_severities, None
|
|
|
|
|
response = client.get(url, format="json", **make_user_auth_headers(user, token))
|
|
|
|
|
# include set from label option
|
|
|
|
|
expected_options = [
|
|
|
|
|
{
|
|
|
|
|
"value": EscalationPolicy.SEVERITY_SET_FROM_LABEL,
|
|
|
|
|
"display_name": EscalationPolicy.SEVERITY_SET_FROM_LABEL_DISPLAY_VALUE,
|
|
|
|
|
}
|
|
|
|
|
] + expected_options
|
|
|
|
|
assert response.json() == expected_options
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.django_db
|
|
|
|
|
def test_create_escalation_policy_declare_incident(
|
|
|
|
|
escalation_policy_internal_api_setup, make_user_auth_headers, settings
|
|
|
|
|
):
|
|
|
|
|
token, escalation_chain, _, user, _ = escalation_policy_internal_api_setup
|
|
|
|
|
organization = escalation_chain.organization
|
|
|
|
|
client = APIClient()
|
|
|
|
|
url = reverse("api-internal:escalation_policy-list")
|
|
|
|
|
|
|
|
|
|
data = {
|
|
|
|
|
"step": EscalationPolicy.STEP_DECLARE_INCIDENT,
|
|
|
|
|
"severity": "critical",
|
|
|
|
|
"escalation_chain": escalation_chain.public_primary_key,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
response = client.post(url, data, format="json", **make_user_auth_headers(user, token))
|
|
|
|
|
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
|
|
|
|
|
|
|
|
|
# make sure declare incident step is enabled
|
|
|
|
|
settings.FEATURE_DECLARE_INCIDENT_STEP_ENABLED = True
|
|
|
|
|
organization.is_grafana_incident_enabled = True
|
|
|
|
|
organization.save()
|
|
|
|
|
|
|
|
|
|
response = client.post(url, data, format="json", **make_user_auth_headers(user, token))
|
|
|
|
|
assert response.status_code == status.HTTP_201_CREATED
|
|
|
|
|
escalation_policy = EscalationPolicy.objects.get(public_primary_key=response.data["id"])
|
|
|
|
|
assert escalation_policy.step == EscalationPolicy.STEP_DECLARE_INCIDENT
|
|
|
|
|
assert escalation_policy.severity == "critical"
|
|
|
|
|
|
|
|
|
|
url = reverse("api-internal:escalation_policy-detail", kwargs={"pk": escalation_policy.public_primary_key})
|
|
|
|
|
response = client.get(url, format="json", **make_user_auth_headers(user, token))
|
|
|
|
|
response_data = response.json()
|
|
|
|
|
assert response_data["step"] == EscalationPolicy.STEP_DECLARE_INCIDENT
|
|
|
|
|
assert response_data["severity"] == "critical"
|