# What this PR does Adds `important` version of `Round-robin` escalation step <img width="1090" alt="Screenshot 2025-01-20 at 11 18 54" src="https://github.com/user-attachments/assets/add6f9e8-fc6c-40a8-a177-d727cc385651" /> ## Which issue(s) this PR closes Related to https://github.com/grafana/oncall/issues/1184 ## Checklist - [x] Unit, integration, and e2e (if applicable) tests updated - [x] Documentation added (or `pr:no public docs` PR label added if not required) - [x] Added the relevant release notes label (see labels prefixed w/ `release:`). These labels dictate how your PR will show up in the autogenerated release notes.
745 lines
32 KiB
Python
745 lines
32 KiB
Python
from unittest.mock import call, patch
|
|
|
|
import pytest
|
|
from django.utils import timezone
|
|
|
|
from apps.alerts.constants import NEXT_ESCALATION_DELAY
|
|
from apps.alerts.escalation_snapshot.serializers.escalation_policy_snapshot import EscalationPolicySnapshotSerializer
|
|
from apps.alerts.escalation_snapshot.snapshot_classes import EscalationPolicySnapshot
|
|
from apps.alerts.escalation_snapshot.utils import eta_for_escalation_step_notify_if_time
|
|
from apps.alerts.models import AlertGroupLogRecord, EscalationPolicy
|
|
from apps.api.permissions import LegacyAccessControlRole
|
|
from apps.schedules.ical_utils import list_users_to_notify_from_ical
|
|
from apps.schedules.models import CustomOnCallShift, OnCallScheduleCalendar
|
|
|
|
|
|
def get_escalation_policy_snapshot_from_model(escalation_policy):
|
|
raw_escalation_policy_data = EscalationPolicySnapshotSerializer(escalation_policy).data
|
|
escalation_policy_data = EscalationPolicySnapshotSerializer().to_internal_value(raw_escalation_policy_data)
|
|
escalation_policy_snapshot = EscalationPolicySnapshot(**escalation_policy_data)
|
|
return escalation_policy_snapshot
|
|
|
|
|
|
@pytest.fixture()
|
|
def escalation_step_test_setup(
|
|
make_organization_and_user,
|
|
make_alert_receive_channel,
|
|
make_channel_filter,
|
|
make_escalation_chain,
|
|
make_alert_group,
|
|
):
|
|
organization, user = make_organization_and_user()
|
|
alert_receive_channel = make_alert_receive_channel(organization)
|
|
|
|
escalation_chain = make_escalation_chain(organization=organization)
|
|
channel_filter = make_channel_filter(alert_receive_channel, escalation_chain=escalation_chain)
|
|
|
|
alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter)
|
|
reason = "test escalation step"
|
|
return organization, user, alert_receive_channel, channel_filter, alert_group, reason
|
|
|
|
|
|
@patch("apps.alerts.escalation_snapshot.snapshot_classes.EscalationPolicySnapshot._execute_tasks", return_value=None)
|
|
@pytest.mark.django_db
|
|
def test_escalation_step_wait(
|
|
mocked_execute_tasks,
|
|
escalation_step_test_setup,
|
|
make_escalation_policy,
|
|
):
|
|
_, _, _, channel_filter, alert_group, reason = escalation_step_test_setup
|
|
wait_delay = EscalationPolicy.FIFTEEN_MINUTES
|
|
wait_step = make_escalation_policy(
|
|
escalation_chain=channel_filter.escalation_chain,
|
|
escalation_policy_step=EscalationPolicy.STEP_WAIT,
|
|
wait_delay=wait_delay,
|
|
)
|
|
escalation_policy_snapshot = get_escalation_policy_snapshot_from_model(wait_step)
|
|
now = timezone.now()
|
|
result = escalation_policy_snapshot.execute(alert_group, reason)
|
|
|
|
assert result.eta is not None
|
|
assert wait_delay + timezone.timedelta(minutes=1) > result.eta - now >= wait_delay
|
|
assert result.stop_escalation is False and result.pause_escalation is False and result.start_from_beginning is False
|
|
assert wait_step.log_records.filter(type=AlertGroupLogRecord.TYPE_ESCALATION_TRIGGERED).exists()
|
|
assert not mocked_execute_tasks.called
|
|
|
|
|
|
@patch("apps.alerts.escalation_snapshot.snapshot_classes.EscalationPolicySnapshot._execute_tasks", return_value=None)
|
|
@pytest.mark.django_db
|
|
def test_escalation_step_notify_all(
|
|
mocked_execute_tasks,
|
|
escalation_step_test_setup,
|
|
make_escalation_policy,
|
|
):
|
|
_, _, _, channel_filter, alert_group, reason = escalation_step_test_setup
|
|
|
|
notify_all_step = make_escalation_policy(
|
|
escalation_chain=channel_filter.escalation_chain,
|
|
escalation_policy_step=EscalationPolicy.STEP_FINAL_NOTIFYALL,
|
|
)
|
|
escalation_policy_snapshot = get_escalation_policy_snapshot_from_model(notify_all_step)
|
|
expected_eta = timezone.now() + timezone.timedelta(seconds=NEXT_ESCALATION_DELAY)
|
|
|
|
result = escalation_policy_snapshot.execute(alert_group, reason)
|
|
expected_result = EscalationPolicySnapshot.StepExecutionResultData(
|
|
eta=result.eta,
|
|
stop_escalation=False,
|
|
pause_escalation=False,
|
|
start_from_beginning=False,
|
|
)
|
|
assert expected_eta + timezone.timedelta(seconds=15) > result.eta > expected_eta - timezone.timedelta(seconds=15)
|
|
assert result == expected_result
|
|
assert mocked_execute_tasks.called
|
|
|
|
|
|
@patch("apps.alerts.escalation_snapshot.snapshot_classes.EscalationPolicySnapshot._execute_tasks", return_value=None)
|
|
@pytest.mark.parametrize(
|
|
"step", [EscalationPolicy.STEP_NOTIFY_USERS_QUEUE, EscalationPolicy.STEP_NOTIFY_USERS_QUEUE_IMPORTANT]
|
|
)
|
|
@pytest.mark.django_db
|
|
def test_escalation_step_notify_users_queue(
|
|
mocked_execute_tasks,
|
|
step,
|
|
make_user_for_organization,
|
|
escalation_step_test_setup,
|
|
make_escalation_policy,
|
|
):
|
|
organization, user, _, channel_filter, alert_group, reason = escalation_step_test_setup
|
|
user_2 = make_user_for_organization(organization)
|
|
|
|
notify_queue_step = make_escalation_policy(
|
|
escalation_chain=channel_filter.escalation_chain,
|
|
escalation_policy_step=step,
|
|
)
|
|
notify_queue_step.notify_to_users_queue.set([user, user_2])
|
|
escalation_policy_snapshot = get_escalation_policy_snapshot_from_model(notify_queue_step)
|
|
|
|
assert escalation_policy_snapshot.next_user_in_sorted_queue == escalation_policy_snapshot.sorted_users_queue[0]
|
|
|
|
expected_eta = timezone.now() + timezone.timedelta(seconds=NEXT_ESCALATION_DELAY)
|
|
result = escalation_policy_snapshot.execute(alert_group, reason)
|
|
expected_result = EscalationPolicySnapshot.StepExecutionResultData(
|
|
eta=result.eta,
|
|
stop_escalation=False,
|
|
pause_escalation=False,
|
|
start_from_beginning=False,
|
|
)
|
|
assert expected_eta + timezone.timedelta(seconds=15) > result.eta > expected_eta - timezone.timedelta(seconds=15)
|
|
assert result == expected_result
|
|
assert escalation_policy_snapshot.next_user_in_sorted_queue == escalation_policy_snapshot.sorted_users_queue[1]
|
|
assert notify_queue_step.log_records.filter(type=AlertGroupLogRecord.TYPE_ESCALATION_TRIGGERED).exists()
|
|
assert mocked_execute_tasks.called
|
|
|
|
|
|
@patch("apps.alerts.escalation_snapshot.snapshot_classes.EscalationPolicySnapshot._execute_tasks", return_value=None)
|
|
@pytest.mark.django_db
|
|
def test_escalation_step_notify_multiple_users(
|
|
mocked_execute_tasks,
|
|
escalation_step_test_setup,
|
|
make_escalation_policy,
|
|
):
|
|
_, user, _, channel_filter, alert_group, reason = escalation_step_test_setup
|
|
|
|
notify_users_step = make_escalation_policy(
|
|
escalation_chain=channel_filter.escalation_chain,
|
|
escalation_policy_step=EscalationPolicy.STEP_NOTIFY_MULTIPLE_USERS,
|
|
)
|
|
notify_users_step.notify_to_users_queue.set([user])
|
|
escalation_policy_snapshot = get_escalation_policy_snapshot_from_model(notify_users_step)
|
|
|
|
expected_eta = timezone.now() + timezone.timedelta(seconds=NEXT_ESCALATION_DELAY)
|
|
result = escalation_policy_snapshot.execute(alert_group, reason)
|
|
expected_result = EscalationPolicySnapshot.StepExecutionResultData(
|
|
eta=result.eta,
|
|
stop_escalation=False,
|
|
pause_escalation=False,
|
|
start_from_beginning=False,
|
|
)
|
|
assert expected_eta + timezone.timedelta(seconds=15) > result.eta > expected_eta - timezone.timedelta(seconds=15)
|
|
assert result == expected_result
|
|
assert notify_users_step.log_records.filter(type=AlertGroupLogRecord.TYPE_ESCALATION_TRIGGERED).exists()
|
|
assert mocked_execute_tasks.called
|
|
|
|
|
|
@patch("apps.alerts.escalation_snapshot.snapshot_classes.EscalationPolicySnapshot._execute_tasks", return_value=None)
|
|
@pytest.mark.django_db
|
|
def test_escalation_step_notify_on_call_schedule(
|
|
mocked_execute_tasks,
|
|
escalation_step_test_setup,
|
|
make_escalation_policy,
|
|
make_schedule,
|
|
make_on_call_shift,
|
|
):
|
|
organization, user, _, channel_filter, alert_group, reason = escalation_step_test_setup
|
|
|
|
schedule = make_schedule(organization, schedule_class=OnCallScheduleCalendar)
|
|
# create on_call_shift with user to notify
|
|
start_date = timezone.now().replace(microsecond=0)
|
|
data = {
|
|
"start": start_date,
|
|
"rotation_start": start_date,
|
|
"duration": timezone.timedelta(seconds=7200),
|
|
}
|
|
on_call_shift = make_on_call_shift(
|
|
organization=organization, shift_type=CustomOnCallShift.TYPE_SINGLE_EVENT, **data
|
|
)
|
|
on_call_shift.users.add(user)
|
|
schedule.custom_on_call_shifts.add(on_call_shift)
|
|
|
|
notify_schedule_step = make_escalation_policy(
|
|
escalation_chain=channel_filter.escalation_chain,
|
|
escalation_policy_step=EscalationPolicy.STEP_NOTIFY_SCHEDULE,
|
|
notify_schedule=schedule,
|
|
)
|
|
escalation_policy_snapshot = get_escalation_policy_snapshot_from_model(notify_schedule_step)
|
|
expected_eta = timezone.now() + timezone.timedelta(seconds=NEXT_ESCALATION_DELAY)
|
|
result = escalation_policy_snapshot.execute(alert_group, reason)
|
|
expected_result = EscalationPolicySnapshot.StepExecutionResultData(
|
|
eta=result.eta,
|
|
stop_escalation=False,
|
|
pause_escalation=False,
|
|
start_from_beginning=False,
|
|
)
|
|
assert expected_eta + timezone.timedelta(seconds=15) > result.eta > expected_eta - timezone.timedelta(seconds=15)
|
|
assert result == expected_result
|
|
assert notify_schedule_step.log_records.filter(type=AlertGroupLogRecord.TYPE_ESCALATION_TRIGGERED).exists()
|
|
assert list(escalation_policy_snapshot.notify_to_users_queue) == list(list_users_to_notify_from_ical(schedule))
|
|
assert mocked_execute_tasks.called
|
|
|
|
|
|
@patch("apps.alerts.escalation_snapshot.snapshot_classes.EscalationPolicySnapshot._execute_tasks", return_value=None)
|
|
@pytest.mark.django_db
|
|
def test_escalation_step_notify_on_call_schedule_viewer_user(
|
|
mocked_execute_tasks,
|
|
escalation_step_test_setup,
|
|
make_user_for_organization,
|
|
make_escalation_policy,
|
|
make_schedule,
|
|
make_on_call_shift,
|
|
):
|
|
organization, _, _, channel_filter, alert_group, reason = escalation_step_test_setup
|
|
viewer = make_user_for_organization(organization=organization, role=LegacyAccessControlRole.VIEWER)
|
|
|
|
schedule = make_schedule(organization, schedule_class=OnCallScheduleCalendar)
|
|
# create on_call_shift with user to notify
|
|
start_date = timezone.now().replace(microsecond=0)
|
|
data = {
|
|
"start": start_date,
|
|
"rotation_start": start_date,
|
|
"duration": timezone.timedelta(seconds=7200),
|
|
}
|
|
on_call_shift = make_on_call_shift(
|
|
organization=organization, shift_type=CustomOnCallShift.TYPE_SINGLE_EVENT, **data
|
|
)
|
|
on_call_shift.users.add(viewer)
|
|
schedule.custom_on_call_shifts.add(on_call_shift)
|
|
|
|
notify_schedule_step = make_escalation_policy(
|
|
escalation_chain=channel_filter.escalation_chain,
|
|
escalation_policy_step=EscalationPolicy.STEP_NOTIFY_SCHEDULE,
|
|
notify_schedule=schedule,
|
|
)
|
|
escalation_policy_snapshot = get_escalation_policy_snapshot_from_model(notify_schedule_step)
|
|
expected_eta = timezone.now() + timezone.timedelta(seconds=NEXT_ESCALATION_DELAY)
|
|
result = escalation_policy_snapshot.execute(alert_group, reason)
|
|
expected_result = EscalationPolicySnapshot.StepExecutionResultData(
|
|
eta=result.eta,
|
|
stop_escalation=False,
|
|
pause_escalation=False,
|
|
start_from_beginning=False,
|
|
)
|
|
assert expected_eta + timezone.timedelta(seconds=15) > result.eta > expected_eta - timezone.timedelta(seconds=15)
|
|
assert result == expected_result
|
|
assert notify_schedule_step.log_records.filter(type=AlertGroupLogRecord.TYPE_ESCALATION_FAILED).exists()
|
|
assert list(escalation_policy_snapshot.notify_to_users_queue) == []
|
|
assert mocked_execute_tasks.called
|
|
|
|
|
|
@patch("apps.alerts.escalation_snapshot.snapshot_classes.EscalationPolicySnapshot._execute_tasks", return_value=None)
|
|
@pytest.mark.django_db
|
|
def test_escalation_step_notify_user_group(
|
|
mocked_execute_tasks,
|
|
escalation_step_test_setup,
|
|
make_slack_team_identity,
|
|
make_slack_user_group,
|
|
make_escalation_policy,
|
|
):
|
|
organization, _, _, channel_filter, alert_group, reason = escalation_step_test_setup
|
|
slack_team_identity = make_slack_team_identity()
|
|
organization.slack_team_identity = slack_team_identity
|
|
organization.save()
|
|
user_group = make_slack_user_group(slack_team_identity)
|
|
|
|
notify_user_group_step = make_escalation_policy(
|
|
escalation_chain=channel_filter.escalation_chain,
|
|
escalation_policy_step=EscalationPolicy.STEP_NOTIFY_GROUP,
|
|
notify_to_group=user_group,
|
|
)
|
|
escalation_policy_snapshot = get_escalation_policy_snapshot_from_model(notify_user_group_step)
|
|
expected_eta = timezone.now() + timezone.timedelta(seconds=NEXT_ESCALATION_DELAY)
|
|
result = escalation_policy_snapshot.execute(alert_group, reason)
|
|
expected_result = EscalationPolicySnapshot.StepExecutionResultData(
|
|
eta=result.eta,
|
|
stop_escalation=False,
|
|
pause_escalation=False,
|
|
start_from_beginning=False,
|
|
)
|
|
assert expected_eta + timezone.timedelta(seconds=15) > result.eta > expected_eta - timezone.timedelta(seconds=15)
|
|
assert result == expected_result
|
|
assert mocked_execute_tasks.called
|
|
|
|
|
|
@patch("apps.alerts.escalation_snapshot.snapshot_classes.EscalationPolicySnapshot._execute_tasks", return_value=None)
|
|
@pytest.mark.django_db
|
|
def test_escalation_step_notify_if_time(
|
|
mocked_execute_tasks,
|
|
escalation_step_test_setup,
|
|
make_escalation_policy,
|
|
):
|
|
_, _, _, channel_filter, alert_group, reason = escalation_step_test_setup
|
|
|
|
# current time is not between from_time and to_time, step returns eta
|
|
now = timezone.now()
|
|
from_time = (now - timezone.timedelta(hours=2)).time()
|
|
to_time = (now - timezone.timedelta(hours=1)).time()
|
|
notify_if_time_step_1 = make_escalation_policy(
|
|
escalation_chain=channel_filter.escalation_chain,
|
|
escalation_policy_step=EscalationPolicy.STEP_NOTIFY_IF_TIME,
|
|
from_time=from_time,
|
|
to_time=to_time,
|
|
)
|
|
|
|
escalation_policy_snapshot = get_escalation_policy_snapshot_from_model(notify_if_time_step_1)
|
|
estimated_time_of_arrival = eta_for_escalation_step_notify_if_time(from_time, to_time)
|
|
expected_result = EscalationPolicySnapshot.StepExecutionResultData(
|
|
eta=estimated_time_of_arrival,
|
|
stop_escalation=False,
|
|
pause_escalation=False,
|
|
start_from_beginning=False,
|
|
)
|
|
assert estimated_time_of_arrival is not None
|
|
|
|
result = escalation_policy_snapshot.execute(alert_group, reason)
|
|
|
|
assert result == expected_result
|
|
assert notify_if_time_step_1.log_records.filter(type=AlertGroupLogRecord.TYPE_ESCALATION_TRIGGERED).exists()
|
|
assert not mocked_execute_tasks.called
|
|
|
|
# current time is between from_time and to_time, eta is None
|
|
from_time = (now - timezone.timedelta(hours=2)).time()
|
|
to_time = (now + timezone.timedelta(hours=1)).time()
|
|
notify_if_time_step_2 = make_escalation_policy(
|
|
escalation_chain=channel_filter.escalation_chain,
|
|
escalation_policy_step=EscalationPolicy.STEP_NOTIFY_IF_TIME,
|
|
from_time=from_time,
|
|
to_time=to_time,
|
|
)
|
|
|
|
escalation_policy_snapshot = get_escalation_policy_snapshot_from_model(notify_if_time_step_2)
|
|
estimated_time_of_arrival = eta_for_escalation_step_notify_if_time(from_time, to_time)
|
|
assert estimated_time_of_arrival is None
|
|
|
|
expected_eta = timezone.now() + timezone.timedelta(seconds=NEXT_ESCALATION_DELAY)
|
|
result = escalation_policy_snapshot.execute(alert_group, reason)
|
|
expected_result = EscalationPolicySnapshot.StepExecutionResultData(
|
|
eta=result.eta,
|
|
stop_escalation=False,
|
|
pause_escalation=False,
|
|
start_from_beginning=False,
|
|
)
|
|
assert expected_eta + timezone.timedelta(seconds=15) > result.eta > expected_eta - timezone.timedelta(seconds=15)
|
|
|
|
assert result == expected_result
|
|
assert notify_if_time_step_2.log_records.filter(type=AlertGroupLogRecord.TYPE_ESCALATION_TRIGGERED).exists()
|
|
assert not mocked_execute_tasks.called
|
|
|
|
|
|
@patch("apps.alerts.escalation_snapshot.snapshot_classes.EscalationPolicySnapshot._execute_tasks", return_value=None)
|
|
@pytest.mark.django_db
|
|
def test_escalation_step_notify_if_num_alerts_in_window(
|
|
mocked_execute_tasks, escalation_step_test_setup, make_escalation_policy, make_alert
|
|
):
|
|
_, _, _, channel_filter, alert_group, reason = escalation_step_test_setup
|
|
|
|
make_alert(alert_group=alert_group, raw_request_data={})
|
|
make_alert(alert_group=alert_group, raw_request_data={})
|
|
|
|
notify_if_3_alerts_per_1_minute = make_escalation_policy(
|
|
escalation_chain=channel_filter.escalation_chain,
|
|
escalation_policy_step=EscalationPolicy.STEP_NOTIFY_IF_NUM_ALERTS_IN_TIME_WINDOW,
|
|
num_alerts_in_window=3,
|
|
num_minutes_in_window=1,
|
|
)
|
|
|
|
escalation_policy_snapshot = get_escalation_policy_snapshot_from_model(notify_if_3_alerts_per_1_minute)
|
|
expected_eta = None # eta is None if escalation was paused
|
|
result = escalation_policy_snapshot.execute(alert_group, reason)
|
|
expected_result = EscalationPolicySnapshot.StepExecutionResultData(
|
|
eta=expected_eta,
|
|
stop_escalation=False,
|
|
pause_escalation=True,
|
|
start_from_beginning=False,
|
|
)
|
|
assert result.eta == expected_eta
|
|
assert result == expected_result
|
|
assert notify_if_3_alerts_per_1_minute.log_records.filter(
|
|
type=AlertGroupLogRecord.TYPE_ESCALATION_TRIGGERED
|
|
).exists()
|
|
assert not mocked_execute_tasks.called
|
|
|
|
_, _, _, channel_filter, alert_group, reason = escalation_step_test_setup
|
|
|
|
make_alert(alert_group=alert_group, raw_request_data={})
|
|
|
|
notify_if_1_alert = make_escalation_policy(
|
|
escalation_chain=channel_filter.escalation_chain,
|
|
escalation_policy_step=EscalationPolicy.STEP_NOTIFY_IF_NUM_ALERTS_IN_TIME_WINDOW,
|
|
num_alerts_in_window=1,
|
|
num_minutes_in_window=2,
|
|
)
|
|
|
|
escalation_policy_snapshot = get_escalation_policy_snapshot_from_model(notify_if_1_alert)
|
|
expected_eta = timezone.now() + timezone.timedelta(seconds=NEXT_ESCALATION_DELAY)
|
|
result = escalation_policy_snapshot.execute(alert_group, reason)
|
|
expected_result = EscalationPolicySnapshot.StepExecutionResultData(
|
|
eta=result.eta,
|
|
stop_escalation=False,
|
|
pause_escalation=False,
|
|
start_from_beginning=False,
|
|
)
|
|
assert expected_eta + timezone.timedelta(seconds=15) > result.eta > expected_eta - timezone.timedelta(seconds=15)
|
|
assert result == expected_result
|
|
assert not mocked_execute_tasks.called
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_escalation_step_notify_if_num_alerts_in_window_deleted_escalation_policy(
|
|
escalation_step_test_setup, make_escalation_policy, make_alert
|
|
):
|
|
_, _, _, channel_filter, alert_group, reason = escalation_step_test_setup
|
|
|
|
make_alert(alert_group=alert_group, raw_request_data={})
|
|
|
|
notify_if_2_alerts_per_1_minute = make_escalation_policy(
|
|
escalation_chain=channel_filter.escalation_chain,
|
|
escalation_policy_step=EscalationPolicy.STEP_NOTIFY_IF_NUM_ALERTS_IN_TIME_WINDOW,
|
|
num_alerts_in_window=2,
|
|
num_minutes_in_window=1,
|
|
)
|
|
|
|
escalation_policy_snapshot = get_escalation_policy_snapshot_from_model(notify_if_2_alerts_per_1_minute)
|
|
notify_if_2_alerts_per_1_minute.delete()
|
|
|
|
with pytest.raises(EscalationPolicy.DoesNotExist):
|
|
notify_if_2_alerts_per_1_minute.refresh_from_db()
|
|
|
|
assert not alert_group.log_records.filter(
|
|
type=AlertGroupLogRecord.TYPE_ESCALATION_TRIGGERED,
|
|
escalation_policy_step=EscalationPolicy.STEP_NOTIFY_IF_NUM_ALERTS_IN_TIME_WINDOW,
|
|
).exists()
|
|
|
|
result = escalation_policy_snapshot.execute(alert_group, reason)
|
|
expected_result = EscalationPolicySnapshot.StepExecutionResultData(
|
|
eta=None,
|
|
stop_escalation=False,
|
|
pause_escalation=True,
|
|
start_from_beginning=False,
|
|
)
|
|
assert result == expected_result
|
|
assert alert_group.log_records.filter(
|
|
type=AlertGroupLogRecord.TYPE_ESCALATION_TRIGGERED,
|
|
escalation_policy_step=EscalationPolicy.STEP_NOTIFY_IF_NUM_ALERTS_IN_TIME_WINDOW,
|
|
).exists()
|
|
|
|
|
|
@patch("apps.alerts.escalation_snapshot.snapshot_classes.EscalationPolicySnapshot._execute_tasks", return_value=None)
|
|
@pytest.mark.django_db
|
|
def test_escalation_step_trigger_custom_webhook(
|
|
mocked_execute_tasks,
|
|
escalation_step_test_setup,
|
|
make_custom_webhook,
|
|
make_escalation_policy,
|
|
):
|
|
organization, _, _, channel_filter, alert_group, reason = escalation_step_test_setup
|
|
|
|
custom_webhook = make_custom_webhook(organization=organization)
|
|
|
|
trigger_custom_webhook_step = make_escalation_policy(
|
|
escalation_chain=channel_filter.escalation_chain,
|
|
escalation_policy_step=EscalationPolicy.STEP_TRIGGER_CUSTOM_WEBHOOK,
|
|
custom_webhook=custom_webhook,
|
|
)
|
|
escalation_policy_snapshot = get_escalation_policy_snapshot_from_model(trigger_custom_webhook_step)
|
|
expected_eta = timezone.now() + timezone.timedelta(seconds=NEXT_ESCALATION_DELAY)
|
|
result = escalation_policy_snapshot.execute(alert_group, reason)
|
|
expected_result = EscalationPolicySnapshot.StepExecutionResultData(
|
|
eta=result.eta,
|
|
stop_escalation=False,
|
|
pause_escalation=False,
|
|
start_from_beginning=False,
|
|
)
|
|
assert expected_eta + timezone.timedelta(seconds=15) > result.eta > expected_eta - timezone.timedelta(seconds=15)
|
|
assert result == expected_result
|
|
assert mocked_execute_tasks.called
|
|
|
|
with patch(
|
|
"apps.alerts.escalation_snapshot.snapshot_classes.EscalationPolicySnapshot._escalation_step_trigger_custom_webhook"
|
|
) as mock_webhook_escalation_step:
|
|
escalation_policy_snapshot.execute(alert_group, reason)
|
|
|
|
mock_webhook_escalation_step.assert_called_once_with(alert_group, reason)
|
|
|
|
|
|
@patch("apps.alerts.escalation_snapshot.snapshot_classes.EscalationPolicySnapshot._execute_tasks", return_value=None)
|
|
@pytest.mark.django_db
|
|
def test_escalation_step_trigger_disabled_custom_webhook(
|
|
mocked_execute_tasks,
|
|
escalation_step_test_setup,
|
|
make_custom_webhook,
|
|
make_escalation_policy,
|
|
):
|
|
organization, _, _, channel_filter, alert_group, reason = escalation_step_test_setup
|
|
|
|
custom_webhook = make_custom_webhook(organization=organization, is_webhook_enabled=False)
|
|
|
|
trigger_custom_webhook_step = make_escalation_policy(
|
|
escalation_chain=channel_filter.escalation_chain,
|
|
escalation_policy_step=EscalationPolicy.STEP_TRIGGER_CUSTOM_WEBHOOK,
|
|
custom_webhook=custom_webhook,
|
|
)
|
|
escalation_policy_snapshot = get_escalation_policy_snapshot_from_model(trigger_custom_webhook_step)
|
|
escalation_policy_snapshot.execute(alert_group, reason)
|
|
assert call([]) in mocked_execute_tasks.call_args_list
|
|
|
|
log_record = AlertGroupLogRecord.objects.get(
|
|
alert_group_id=alert_group.id, escalation_policy=trigger_custom_webhook_step
|
|
)
|
|
assert log_record.escalation_error_code == AlertGroupLogRecord.ERROR_ESCALATION_TRIGGER_WEBHOOK_IS_DISABLED
|
|
|
|
|
|
@patch("apps.alerts.escalation_snapshot.snapshot_classes.EscalationPolicySnapshot._execute_tasks", return_value=None)
|
|
@pytest.mark.django_db
|
|
def test_escalation_step_repeat_escalation_n_times(
|
|
mocked_execute_tasks,
|
|
escalation_step_test_setup,
|
|
make_escalation_policy,
|
|
):
|
|
_, _, _, channel_filter, alert_group, reason = escalation_step_test_setup
|
|
|
|
repeat_escalation_step = make_escalation_policy(
|
|
escalation_chain=channel_filter.escalation_chain,
|
|
escalation_policy_step=EscalationPolicy.STEP_REPEAT_ESCALATION_N_TIMES,
|
|
)
|
|
escalation_policy_snapshot = get_escalation_policy_snapshot_from_model(repeat_escalation_step)
|
|
|
|
assert escalation_policy_snapshot.escalation_counter == 0
|
|
|
|
expected_eta = timezone.now() + timezone.timedelta(seconds=NEXT_ESCALATION_DELAY)
|
|
result = escalation_policy_snapshot.execute(alert_group, reason)
|
|
expected_result = EscalationPolicySnapshot.StepExecutionResultData(
|
|
eta=result.eta,
|
|
stop_escalation=False,
|
|
pause_escalation=False,
|
|
start_from_beginning=True,
|
|
)
|
|
assert expected_eta + timezone.timedelta(seconds=15) > result.eta > expected_eta - timezone.timedelta(seconds=15)
|
|
assert escalation_policy_snapshot.escalation_counter == 1
|
|
assert result == expected_result
|
|
assert repeat_escalation_step.log_records.filter(type=AlertGroupLogRecord.TYPE_ESCALATION_TRIGGERED).exists()
|
|
assert not mocked_execute_tasks.called
|
|
|
|
|
|
@patch("apps.alerts.escalation_snapshot.snapshot_classes.EscalationPolicySnapshot._execute_tasks", return_value=None)
|
|
@pytest.mark.django_db
|
|
def test_escalation_step_resolve(
|
|
mocked_execute_tasks,
|
|
escalation_step_test_setup,
|
|
make_escalation_policy,
|
|
):
|
|
_, _, _, channel_filter, alert_group, reason = escalation_step_test_setup
|
|
|
|
resolve_step = make_escalation_policy(
|
|
escalation_chain=channel_filter.escalation_chain,
|
|
escalation_policy_step=EscalationPolicy.STEP_FINAL_RESOLVE,
|
|
)
|
|
escalation_policy_snapshot = get_escalation_policy_snapshot_from_model(resolve_step)
|
|
expected_eta = timezone.now() + timezone.timedelta(seconds=NEXT_ESCALATION_DELAY)
|
|
result = escalation_policy_snapshot.execute(alert_group, reason)
|
|
expected_result = EscalationPolicySnapshot.StepExecutionResultData(
|
|
eta=result.eta,
|
|
stop_escalation=True,
|
|
pause_escalation=False,
|
|
start_from_beginning=False,
|
|
)
|
|
assert expected_eta + timezone.timedelta(seconds=15) > result.eta > expected_eta - timezone.timedelta(seconds=15)
|
|
assert result == expected_result
|
|
assert resolve_step.log_records.filter(type=AlertGroupLogRecord.TYPE_ESCALATION_TRIGGERED).exists()
|
|
assert mocked_execute_tasks.called
|
|
|
|
|
|
@patch("apps.alerts.escalation_snapshot.snapshot_classes.EscalationPolicySnapshot._execute_tasks", return_value=None)
|
|
@pytest.mark.django_db
|
|
def test_escalation_step_is_not_configured(
|
|
mocked_execute_tasks,
|
|
escalation_step_test_setup,
|
|
make_escalation_policy,
|
|
):
|
|
_, _, _, channel_filter, alert_group, reason = escalation_step_test_setup
|
|
|
|
not_configured_step = make_escalation_policy(
|
|
escalation_chain=channel_filter.escalation_chain,
|
|
escalation_policy_step=None,
|
|
)
|
|
escalation_policy_snapshot = get_escalation_policy_snapshot_from_model(not_configured_step)
|
|
expected_eta = timezone.now() + timezone.timedelta(seconds=NEXT_ESCALATION_DELAY)
|
|
result = escalation_policy_snapshot.execute(alert_group, reason)
|
|
expected_result = EscalationPolicySnapshot.StepExecutionResultData(
|
|
eta=result.eta,
|
|
stop_escalation=False,
|
|
pause_escalation=False,
|
|
start_from_beginning=False,
|
|
)
|
|
assert expected_eta + timezone.timedelta(seconds=15) > result.eta > expected_eta - timezone.timedelta(seconds=15)
|
|
assert result == expected_result
|
|
assert not_configured_step.log_records.filter(type=AlertGroupLogRecord.TYPE_ESCALATION_FAILED).exists()
|
|
assert not mocked_execute_tasks.called
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_escalation_step_with_deleted_user(
|
|
escalation_step_test_setup,
|
|
make_user_for_organization,
|
|
make_escalation_policy,
|
|
):
|
|
"""
|
|
Test that deleted user in escalation policy snapshot will be simply ignored instead of ValidationError
|
|
"""
|
|
organization, user, _, channel_filter, _, _ = escalation_step_test_setup
|
|
inactive_user = make_user_for_organization(organization=organization, is_active=False)
|
|
|
|
escalation_policy = make_escalation_policy(
|
|
escalation_chain=channel_filter.escalation_chain,
|
|
escalation_policy_step=EscalationPolicy.STEP_NOTIFY_MULTIPLE_USERS,
|
|
)
|
|
escalation_policy.notify_to_users_queue.set([user, inactive_user])
|
|
raw_snapshot = {
|
|
"id": escalation_policy.pk,
|
|
"order": 0,
|
|
"step": escalation_policy.step,
|
|
"wait_delay": None,
|
|
"notify_to_users_queue": [user.pk, inactive_user.pk],
|
|
"last_notified_user": None,
|
|
"from_time": None,
|
|
"to_time": None,
|
|
"num_alerts_in_window": None,
|
|
"num_minutes_in_window": None,
|
|
"notify_schedule": None,
|
|
"notify_to_group": None,
|
|
"escalation_counter": 0,
|
|
"passed_last_time": None,
|
|
"pause_escalation": False,
|
|
}
|
|
|
|
deserialized_escalation_snapshot = EscalationPolicySnapshotSerializer().to_internal_value(raw_snapshot)
|
|
assert deserialized_escalation_snapshot["notify_to_users_queue"] == [user]
|
|
|
|
|
|
@patch("apps.alerts.escalation_snapshot.snapshot_classes.EscalationPolicySnapshot._execute_tasks", return_value=None)
|
|
@pytest.mark.django_db
|
|
@pytest.mark.parametrize(
|
|
"step",
|
|
(EscalationPolicy.STEP_NOTIFY_TEAM_MEMBERS, EscalationPolicy.STEP_NOTIFY_TEAM_MEMBERS_IMPORTANT),
|
|
)
|
|
def test_notify_team_members(
|
|
mocked_execute_tasks, escalation_step_test_setup, make_user, make_team, make_escalation_policy, step
|
|
):
|
|
organization, user, _, channel_filter, alert_group, reason = escalation_step_test_setup
|
|
user_1 = make_user(organization=organization)
|
|
user_2 = make_user(organization=organization)
|
|
team_1 = make_team(organization=organization)
|
|
team_1.users.add(user_1)
|
|
team_1.users.add(user_2)
|
|
|
|
notify_team_members_step = make_escalation_policy(
|
|
escalation_chain=channel_filter.escalation_chain,
|
|
escalation_policy_step=step,
|
|
notify_to_team_members=team_1,
|
|
)
|
|
escalation_policy_snapshot = get_escalation_policy_snapshot_from_model(notify_team_members_step)
|
|
expected_eta = timezone.now() + timezone.timedelta(seconds=NEXT_ESCALATION_DELAY)
|
|
with patch(
|
|
"apps.alerts.escalation_snapshot.snapshot_classes.escalation_policy_snapshot.notify_user_task"
|
|
) as mock_execute:
|
|
result = escalation_policy_snapshot.execute(alert_group, reason)
|
|
|
|
expected_result = EscalationPolicySnapshot.StepExecutionResultData(
|
|
eta=result.eta,
|
|
stop_escalation=False,
|
|
pause_escalation=False,
|
|
start_from_beginning=False,
|
|
)
|
|
assert expected_eta + timezone.timedelta(seconds=15) > result.eta > expected_eta - timezone.timedelta(seconds=15)
|
|
assert result == expected_result
|
|
assert notify_team_members_step.log_records.filter(type=AlertGroupLogRecord.TYPE_ESCALATION_TRIGGERED).exists()
|
|
assert list(escalation_policy_snapshot.notify_to_users_queue) == list(team_1.users.all())
|
|
assert mocked_execute_tasks.called
|
|
expected_kwargs = {
|
|
"reason": f"user belongs to team {team_1.name}",
|
|
"important": step == EscalationPolicy.STEP_NOTIFY_TEAM_MEMBERS_IMPORTANT,
|
|
}
|
|
assert mock_execute.signature.call_args_list[0] == call(
|
|
(user_1.pk, alert_group.pk), expected_kwargs, immutable=True
|
|
)
|
|
assert mock_execute.signature.call_args_list[1] == call(
|
|
(user_2.pk, alert_group.pk), expected_kwargs, immutable=True
|
|
)
|
|
assert mock_execute.signature.call_count == 2
|
|
|
|
|
|
@pytest.mark.django_db
|
|
def test_escalation_step_declare_incident(
|
|
escalation_step_test_setup,
|
|
make_escalation_policy,
|
|
):
|
|
organization, _, _, channel_filter, alert_group, reason = escalation_step_test_setup
|
|
|
|
declare_incident_step = make_escalation_policy(
|
|
escalation_chain=channel_filter.escalation_chain,
|
|
escalation_policy_step=EscalationPolicy.STEP_DECLARE_INCIDENT,
|
|
)
|
|
escalation_policy_snapshot = get_escalation_policy_snapshot_from_model(declare_incident_step)
|
|
expected_eta = timezone.now() + timezone.timedelta(seconds=NEXT_ESCALATION_DELAY)
|
|
with patch.object(EscalationPolicySnapshot, "_execute_tasks") as mocked_execute_tasks:
|
|
with patch(
|
|
"apps.alerts.escalation_snapshot.snapshot_classes.escalation_policy_snapshot.is_declare_incident_step_enabled",
|
|
return_value=True,
|
|
):
|
|
result = escalation_policy_snapshot.execute(alert_group, reason)
|
|
expected_result = EscalationPolicySnapshot.StepExecutionResultData(
|
|
eta=result.eta,
|
|
stop_escalation=False,
|
|
pause_escalation=False,
|
|
start_from_beginning=False,
|
|
)
|
|
assert (
|
|
expected_eta + timezone.timedelta(seconds=15)
|
|
> result.eta
|
|
> expected_eta - timezone.timedelta(seconds=15)
|
|
)
|
|
assert result == expected_result
|
|
assert not alert_group.log_records.exists()
|
|
mocked_execute_tasks.assert_called_once()
|
|
with patch.object(EscalationPolicySnapshot, "_execute_tasks") as mocked_execute_tasks:
|
|
with patch(
|
|
"apps.alerts.escalation_snapshot.snapshot_classes.escalation_policy_snapshot.is_declare_incident_step_enabled",
|
|
return_value=False,
|
|
):
|
|
escalation_policy_snapshot.execute(alert_group, reason)
|
|
mocked_execute_tasks.assert_not_called()
|
|
assert alert_group.log_records.exists()
|
|
log_record = alert_group.log_records.get()
|
|
assert log_record.type == AlertGroupLogRecord.TYPE_ESCALATION_FAILED
|
|
assert (
|
|
log_record.escalation_error_code
|
|
== AlertGroupLogRecord.ERROR_ESCALATION_DECLARE_INCIDENT_STEP_IS_NOT_ENABLED
|
|
)
|