oncall-engine/engine/apps/webhooks/tasks/trigger_webhook.py
Michael Derynck 3d74cbf3f5
Webhook 2 improvements and fixes (#1829)
- Rename Firing to Alert Group Created to reduce confusion as to why the
event only first once and not when unresolve or unacknowledge returns
the alert group to the firing state.
- Increase password field length
- Do not filter webhook execution by team, team is just for filtering
ownership now
- Do not log webhook triggers in alert group escalation log if the
webhook does not trigger (Status/response will still be stored)
- Fix formatting for response content and data fields on the Status page
- Add a content length limit for responses being stored (50000
characters)
2023-04-26 15:55:08 -06:00

223 lines
8 KiB
Python

import json
import logging
from json import JSONDecodeError
from celery.utils.log import get_task_logger
from django.apps import apps
from django.conf import settings
from django.db.models import Prefetch
from apps.alerts.models import AlertGroup, AlertGroupLogRecord, EscalationPolicy
from apps.base.models import UserNotificationPolicyLogRecord
from apps.user_management.models import User
from apps.webhooks.models import Webhook, WebhookResponse
from apps.webhooks.utils import (
InvalidWebhookData,
InvalidWebhookHeaders,
InvalidWebhookTrigger,
InvalidWebhookUrl,
serialize_event,
)
from common.custom_celery_tasks import shared_dedicated_queue_retry_task
from settings.base import WEBHOOK_RESPONSE_LIMIT
NOT_FROM_SELECTED_INTEGRATION = "Alert group was not from a selected integration"
logger = get_task_logger(__name__)
logger.setLevel(logging.DEBUG)
TRIGGER_TYPE_TO_LABEL = {
Webhook.TRIGGER_ALERT_GROUP_CREATED: "alert group created",
Webhook.TRIGGER_ACKNOWLEDGE: "acknowledge",
Webhook.TRIGGER_RESOLVE: "resolve",
Webhook.TRIGGER_SILENCE: "silence",
Webhook.TRIGGER_UNSILENCE: "unsilence",
Webhook.TRIGGER_UNRESOLVE: "unresolve",
Webhook.TRIGGER_ESCALATION_STEP: "escalation",
Webhook.TRIGGER_UNACKNOWLEDGE: "unacknowledge",
}
@shared_dedicated_queue_retry_task(
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def send_webhook_event(trigger_type, alert_group_id, organization_id=None, user_id=None):
Webhooks = apps.get_model("webhooks", "Webhook")
webhooks_qs = Webhooks.objects.filter(
trigger_type=trigger_type,
organization_id=organization_id,
).exclude(is_webhook_enabled=False)
for webhook in webhooks_qs:
print(webhook.name)
execute_webhook.apply_async((webhook.pk, alert_group_id, user_id, None))
def _isoformat_date(date_value):
return date_value.isoformat() if date_value else None
def _build_payload(webhook, alert_group, user):
trigger_type = webhook.trigger_type
event = {
"type": TRIGGER_TYPE_TO_LABEL[trigger_type],
}
if trigger_type == Webhook.TRIGGER_ALERT_GROUP_CREATED:
event["time"] = _isoformat_date(alert_group.started_at)
elif trigger_type == Webhook.TRIGGER_ACKNOWLEDGE:
event["time"] = _isoformat_date(alert_group.acknowledged_at)
elif trigger_type == Webhook.TRIGGER_RESOLVE:
event["time"] = _isoformat_date(alert_group.resolved_at)
elif trigger_type == Webhook.TRIGGER_SILENCE:
event["time"] = _isoformat_date(alert_group.silenced_at)
event["until"] = _isoformat_date(alert_group.silenced_until)
# include latest response data per webhook in the event input data
# exclude past responses from webhook being executed
responses_data = {}
responses = (
alert_group.webhook_responses.all()
.exclude(webhook__public_primary_key=webhook.public_primary_key)
.order_by("-timestamp")
)
for r in responses:
if r.webhook.public_primary_key not in responses_data:
try:
response_data = r.json()
except JSONDecodeError:
response_data = r.content
responses_data[r.webhook.public_primary_key] = response_data
data = serialize_event(event, alert_group, user, responses_data)
return data
def make_request(webhook, alert_group, data):
status = {
"url": None,
"request_trigger": None,
"request_headers": None,
"request_data": data,
"status_code": None,
"content": None,
"webhook": webhook,
}
exception = error = None
try:
if not webhook.check_integration_filter(alert_group):
status["request_trigger"] = NOT_FROM_SELECTED_INTEGRATION
return False, status, None, None
triggered, status["request_trigger"] = webhook.check_trigger(data)
if triggered:
status["url"] = webhook.build_url(data)
request_kwargs = webhook.build_request_kwargs(data, raise_data_errors=True)
status["request_headers"] = json.dumps(request_kwargs.get("headers", {}))
if "json" in request_kwargs:
status["request_data"] = json.dumps(request_kwargs["json"])
else:
status["request_data"] = request_kwargs.get("data")
response = webhook.make_request(status["url"], request_kwargs)
status["status_code"] = response.status_code
content_length = response.headers.get("Content-Length")
if content_length and int(content_length) < WEBHOOK_RESPONSE_LIMIT:
try:
status["content"] = json.dumps(response.json())
except JSONDecodeError:
status["content"] = response.content.decode("utf-8")
else:
status["content"] = f"Response content exceeds {WEBHOOK_RESPONSE_LIMIT} character limit"
return triggered, status, None, None
except InvalidWebhookUrl as e:
status["url"] = error = e.message
except InvalidWebhookTrigger as e:
status["request_trigger"] = error = e.message
except InvalidWebhookHeaders as e:
status["request_headers"] = error = e.message
except InvalidWebhookData as e:
status["request_data"] = error = e.message
except Exception as e:
status["content"] = error = str(e)
exception = e
return True, status, error, exception
@shared_dedicated_queue_retry_task(
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def execute_webhook(webhook_pk, alert_group_id, user_id, escalation_policy_id):
Webhooks = apps.get_model("webhooks", "Webhook")
try:
webhook = Webhooks.objects.get(pk=webhook_pk)
except Webhooks.DoesNotExist:
logger.warn(f"Webhook {webhook_pk} does not exist")
return
try:
personal_log_records = UserNotificationPolicyLogRecord.objects.filter(
alert_group_id=alert_group_id,
author__isnull=False,
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_SUCCESS,
).select_related("author")
alert_group = (
AlertGroup.unarchived_objects.prefetch_related(
Prefetch("personal_log_records", queryset=personal_log_records, to_attr="sent_notifications")
)
.select_related("channel")
.get(pk=alert_group_id)
)
except AlertGroup.DoesNotExist:
return
user = None
if user_id is not None:
user = User.objects.filter(pk=user_id).first()
data = _build_payload(webhook, alert_group, user)
triggered, status, error, exception = make_request(webhook, alert_group, data)
# create response entry
WebhookResponse.objects.create(
alert_group=alert_group,
trigger_type=webhook.trigger_type,
**status,
)
escalation_policy = step = None
if escalation_policy_id:
escalation_policy = EscalationPolicy.objects.filter(pk=escalation_policy_id).first()
step = EscalationPolicy.STEP_TRIGGER_CUSTOM_WEBHOOK
# create log record
error_code = None
# reuse existing webhooks record type (TODO: rename after migration)
log_type = AlertGroupLogRecord.TYPE_CUSTOM_BUTTON_TRIGGERED
reason = str(status["status_code"])
if error is not None:
log_type = AlertGroupLogRecord.TYPE_ESCALATION_FAILED
error_code = AlertGroupLogRecord.ERROR_ESCALATION_TRIGGER_CUSTOM_WEBHOOK_ERROR
reason = error
if triggered:
AlertGroupLogRecord.objects.create(
type=log_type,
alert_group=alert_group,
author=user,
reason=reason,
step_specific_info={
"webhook_name": webhook.name,
"webhook_id": webhook.public_primary_key,
"trigger": TRIGGER_TYPE_TO_LABEL[webhook.trigger_type],
},
escalation_policy=escalation_policy,
escalation_policy_step=step,
escalation_error_code=error_code,
)
if exception:
raise exception