oncall-engine/engine/apps/alerts/tests/conftest.py

63 lines
2.1 KiB
Python
Raw Permalink Normal View History

modify check_escalation_finished_task task (#1266) # What this PR does This PR: - modifies the `check_escalation_finished_task` celery task to: - do stricter escalation validation based on the alert group's escalation snapshot (see the `audit_alert_group_escalation` method in `engine/apps/alerts/tasks/check_escalation_finished.py` for the validation logic) - use a read-only database for querying alert-groups if one is configured, otherwise use the "default" one - ping a configurable heartbeat (new env var `ALERT_GROUP_ESCALATION_AUDITOR_CELERY_TASK_HEARTBEAT_URL` added) - increase the task frequency from every 10 to every 13 minutes (this can be configured via an env variable) - adds public documentation on how to configure this auditor task - modifies the local celery startup command to properly take into consideration all celery related env vars (similar to the ones we use in `engine/celery_with_exporter.sh`; this made it easier to enable `celery beat` locally for testing) - removes the following code: - removes references to `AlertGroup.estimate_escalation_finish_time` and marks the model field as deprecated using the [`django-deprecate-fields` library](https://pypi.org/project/django-deprecate-fields/). This field was only used for the previous version of this validation task - `EscalationSnapshotMixin.calculate_eta_for_finish_escalation` was only used to calculate the value for `AlertGroup.estimate_escalation_finish_time` - `calculate_escalation_finish_time` celery task ## Which issue(s) this PR fixes https://github.com/grafana/oncall-private/issues/1558 ## Checklist - [x] Tests updated - [x] Documentation added - [x] `CHANGELOG.md` updated
2023-03-17 11:14:08 +01:00
import datetime
import pytest
from apps.alerts.incident_appearance.templaters import AlertSlackTemplater
modify check_escalation_finished_task task (#1266) # What this PR does This PR: - modifies the `check_escalation_finished_task` celery task to: - do stricter escalation validation based on the alert group's escalation snapshot (see the `audit_alert_group_escalation` method in `engine/apps/alerts/tasks/check_escalation_finished.py` for the validation logic) - use a read-only database for querying alert-groups if one is configured, otherwise use the "default" one - ping a configurable heartbeat (new env var `ALERT_GROUP_ESCALATION_AUDITOR_CELERY_TASK_HEARTBEAT_URL` added) - increase the task frequency from every 10 to every 13 minutes (this can be configured via an env variable) - adds public documentation on how to configure this auditor task - modifies the local celery startup command to properly take into consideration all celery related env vars (similar to the ones we use in `engine/celery_with_exporter.sh`; this made it easier to enable `celery beat` locally for testing) - removes the following code: - removes references to `AlertGroup.estimate_escalation_finish_time` and marks the model field as deprecated using the [`django-deprecate-fields` library](https://pypi.org/project/django-deprecate-fields/). This field was only used for the previous version of this validation task - `EscalationSnapshotMixin.calculate_eta_for_finish_escalation` was only used to calculate the value for `AlertGroup.estimate_escalation_finish_time` - `calculate_escalation_finish_time` celery task ## Which issue(s) this PR fixes https://github.com/grafana/oncall-private/issues/1558 ## Checklist - [x] Tests updated - [x] Documentation added - [x] `CHANGELOG.md` updated
2023-03-17 11:14:08 +01:00
from apps.alerts.models import EscalationPolicy
@pytest.fixture()
def mock_alert_renderer_render_for(monkeypatch):
def mock_render_for(*args, **kwargs):
return "invalid_render_for"
monkeypatch.setattr(AlertSlackTemplater, "_render_for", mock_render_for)
modify check_escalation_finished_task task (#1266) # What this PR does This PR: - modifies the `check_escalation_finished_task` celery task to: - do stricter escalation validation based on the alert group's escalation snapshot (see the `audit_alert_group_escalation` method in `engine/apps/alerts/tasks/check_escalation_finished.py` for the validation logic) - use a read-only database for querying alert-groups if one is configured, otherwise use the "default" one - ping a configurable heartbeat (new env var `ALERT_GROUP_ESCALATION_AUDITOR_CELERY_TASK_HEARTBEAT_URL` added) - increase the task frequency from every 10 to every 13 minutes (this can be configured via an env variable) - adds public documentation on how to configure this auditor task - modifies the local celery startup command to properly take into consideration all celery related env vars (similar to the ones we use in `engine/celery_with_exporter.sh`; this made it easier to enable `celery beat` locally for testing) - removes the following code: - removes references to `AlertGroup.estimate_escalation_finish_time` and marks the model field as deprecated using the [`django-deprecate-fields` library](https://pypi.org/project/django-deprecate-fields/). This field was only used for the previous version of this validation task - `EscalationSnapshotMixin.calculate_eta_for_finish_escalation` was only used to calculate the value for `AlertGroup.estimate_escalation_finish_time` - `calculate_escalation_finish_time` celery task ## Which issue(s) this PR fixes https://github.com/grafana/oncall-private/issues/1558 ## Checklist - [x] Tests updated - [x] Documentation added - [x] `CHANGELOG.md` updated
2023-03-17 11:14:08 +01:00
@pytest.fixture()
def escalation_snapshot_test_setup(
make_organization_and_user,
make_user_for_organization,
make_alert_receive_channel,
make_channel_filter,
make_escalation_chain,
make_escalation_policy,
make_alert_group,
):
organization, user_1 = make_organization_and_user()
user_2 = make_user_for_organization(organization)
alert_receive_channel = make_alert_receive_channel(organization)
escalation_chain = make_escalation_chain(organization)
channel_filter = make_channel_filter(
alert_receive_channel,
escalation_chain=escalation_chain,
notification_backends={"BACKEND": {"channel_id": "abc123"}},
)
notify_to_multiple_users_step = make_escalation_policy(
escalation_chain=channel_filter.escalation_chain,
escalation_policy_step=EscalationPolicy.STEP_NOTIFY_MULTIPLE_USERS,
)
notify_to_multiple_users_step.notify_to_users_queue.set([user_1, user_2])
wait_step = make_escalation_policy(
escalation_chain=channel_filter.escalation_chain,
escalation_policy_step=EscalationPolicy.STEP_WAIT,
wait_delay=EscalationPolicy.FIFTEEN_MINUTES,
)
# random time for test
from_time = datetime.time(10, 30)
to_time = datetime.time(18, 45)
notify_if_time_step = make_escalation_policy(
escalation_chain=channel_filter.escalation_chain,
escalation_policy_step=EscalationPolicy.STEP_NOTIFY_IF_TIME,
from_time=from_time,
to_time=to_time,
)
alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter)
alert_group.raw_escalation_snapshot = alert_group.build_raw_escalation_snapshot()
alert_group.save()
return alert_group, notify_to_multiple_users_step, wait_step, notify_if_time_step