Add webhooks app and initial models (#1101)
This commit is contained in:
parent
9709cfbc73
commit
2048e783ba
12 changed files with 721 additions and 0 deletions
10
engine/apps/webhooks/admin.py
Normal file
10
engine/apps/webhooks/admin.py
Normal file
|
|
@ -0,0 +1,10 @@
|
|||
from django.contrib import admin
|
||||
|
||||
from common.admin import CustomModelAdmin
|
||||
|
||||
from .models import Webhook
|
||||
|
||||
|
||||
@admin.register(Webhook)
|
||||
class WebhookAdmin(CustomModelAdmin):
|
||||
list_display = ("id", "public_primary_key", "organization", "name", "url")
|
||||
5
engine/apps/webhooks/apps.py
Normal file
5
engine/apps/webhooks/apps.py
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
from django.apps import AppConfig
|
||||
|
||||
|
||||
class WebhooksConfig(AppConfig):
|
||||
name = "apps.webhooks"
|
||||
60
engine/apps/webhooks/migrations/0001_initial.py
Normal file
60
engine/apps/webhooks/migrations/0001_initial.py
Normal file
|
|
@ -0,0 +1,60 @@
|
|||
# Generated by Django 3.2.18 on 2023-03-09 18:25
|
||||
|
||||
import apps.webhooks.models.webhook
|
||||
import django.core.validators
|
||||
from django.db import migrations, models
|
||||
import django.db.models.deletion
|
||||
import mirage.fields
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
initial = True
|
||||
|
||||
dependencies = [
|
||||
('user_management', '0009_organization_cluster_slug'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='Webhook',
|
||||
fields=[
|
||||
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
|
||||
('public_primary_key', models.CharField(default=apps.webhooks.models.webhook.generate_public_primary_key_for_webhook, max_length=20, unique=True, validators=[django.core.validators.MinLengthValidator(13)])),
|
||||
('created_at', models.DateTimeField(auto_now_add=True)),
|
||||
('deleted_at', models.DateTimeField(blank=True, null=True)),
|
||||
('name', models.CharField(default=None, max_length=100, null=True)),
|
||||
('username', models.CharField(default=None, max_length=100, null=True)),
|
||||
('password', mirage.fields.EncryptedCharField(default=None, max_length=200, null=True)),
|
||||
('authorization_header', mirage.fields.EncryptedCharField(default=None, max_length=1000, null=True)),
|
||||
('trigger_template', models.TextField(default=None, null=True)),
|
||||
('headers', models.TextField(default=None, null=True)),
|
||||
('url', models.TextField(default=None, null=True)),
|
||||
('data', models.TextField(default=None, null=True)),
|
||||
('forward_all', models.BooleanField(default=True)),
|
||||
('http_method', models.CharField(default='POST', max_length=32)),
|
||||
('trigger_type', models.IntegerField(choices=[(0, 'Escalation step'), (1, 'Triggered'), (2, 'Acknowledged'), (3, 'Resolved'), (4, 'Silenced'), (5, 'Unsilenced'), (6, 'Unresolved')], default=None, null=True)),
|
||||
('organization', models.ForeignKey(default=None, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='webhooks', to='user_management.organization')),
|
||||
('team', models.ForeignKey(default=None, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='webhooks', to='user_management.team')),
|
||||
('user', models.ForeignKey(default=None, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='webhooks', to='user_management.user')),
|
||||
],
|
||||
options={
|
||||
'unique_together': {('name', 'organization')},
|
||||
},
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='WebhookLog',
|
||||
fields=[
|
||||
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
|
||||
('last_run_at', models.DateTimeField(blank=True, null=True)),
|
||||
('input_data', models.JSONField(default=None)),
|
||||
('url', models.TextField(default=None, null=True)),
|
||||
('trigger', models.TextField(default=None, null=True)),
|
||||
('headers', models.TextField(default=None, null=True)),
|
||||
('data', models.TextField(default=None, null=True)),
|
||||
('response_status', models.CharField(default=None, max_length=100, null=True)),
|
||||
('response', models.TextField(default=None, null=True)),
|
||||
('webhook', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='logs', to='webhooks.webhook')),
|
||||
],
|
||||
),
|
||||
]
|
||||
0
engine/apps/webhooks/migrations/__init__.py
Normal file
0
engine/apps/webhooks/migrations/__init__.py
Normal file
1
engine/apps/webhooks/models/__init__.py
Normal file
1
engine/apps/webhooks/models/__init__.py
Normal file
|
|
@ -0,0 +1 @@
|
|||
from .webhook import Webhook, WebhookLog # noqa: F401
|
||||
262
engine/apps/webhooks/models/webhook.py
Normal file
262
engine/apps/webhooks/models/webhook.py
Normal file
|
|
@ -0,0 +1,262 @@
|
|||
import json
|
||||
from json import JSONDecodeError
|
||||
|
||||
import requests
|
||||
from django.conf import settings
|
||||
from django.core.validators import MinLengthValidator
|
||||
from django.db import models
|
||||
from django.db.models import F
|
||||
from django.utils import timezone
|
||||
from mirage import fields as mirage_fields
|
||||
from requests.auth import HTTPBasicAuth
|
||||
|
||||
from apps.alerts.utils import OUTGOING_WEBHOOK_TIMEOUT
|
||||
from apps.webhooks.utils import (
|
||||
InvalidWebhookData,
|
||||
InvalidWebhookHeaders,
|
||||
InvalidWebhookTrigger,
|
||||
InvalidWebhookUrl,
|
||||
apply_jinja_template_for_json,
|
||||
parse_url,
|
||||
)
|
||||
from common.jinja_templater import apply_jinja_template
|
||||
from common.jinja_templater.apply_jinja_template import JinjaTemplateError, JinjaTemplateWarning
|
||||
from common.public_primary_keys import generate_public_primary_key, increase_public_primary_key_length
|
||||
|
||||
|
||||
def generate_public_primary_key_for_webhook():
|
||||
prefix = "WH"
|
||||
new_public_primary_key = generate_public_primary_key(prefix)
|
||||
|
||||
failure_counter = 0
|
||||
while Webhook.objects.filter(public_primary_key=new_public_primary_key).exists():
|
||||
new_public_primary_key = increase_public_primary_key_length(
|
||||
failure_counter=failure_counter, prefix=prefix, model_name="Webhook"
|
||||
)
|
||||
failure_counter += 1
|
||||
|
||||
return new_public_primary_key
|
||||
|
||||
|
||||
class WebhookQueryset(models.QuerySet):
|
||||
def delete(self):
|
||||
self.update(deleted_at=timezone.now(), name=F("name") + "_deleted_" + F("public_primary_key"))
|
||||
|
||||
|
||||
class WebhookManager(models.Manager):
|
||||
def get_queryset(self):
|
||||
return WebhookQueryset(self.model, using=self._db).filter(deleted_at=None)
|
||||
|
||||
def hard_delete(self):
|
||||
return self.get_queryset().hard_delete()
|
||||
|
||||
|
||||
class Webhook(models.Model):
|
||||
objects = WebhookManager()
|
||||
objects_with_deleted = models.Manager()
|
||||
|
||||
(
|
||||
TRIGGER_ESCALATION_STEP,
|
||||
TRIGGER_NEW,
|
||||
TRIGGER_ACKNOWLEDGE,
|
||||
TRIGGER_RESOLVE,
|
||||
TRIGGER_SILENCE,
|
||||
TRIGGER_UNSILENCE,
|
||||
TRIGGER_UNRESOLVE,
|
||||
) = range(7)
|
||||
|
||||
# Must be the same order as previous
|
||||
TRIGGER_TYPES = (
|
||||
(TRIGGER_ESCALATION_STEP, "Escalation step"),
|
||||
(TRIGGER_NEW, "Triggered"),
|
||||
(TRIGGER_ACKNOWLEDGE, "Acknowledged"),
|
||||
(TRIGGER_RESOLVE, "Resolved"),
|
||||
(TRIGGER_SILENCE, "Silenced"),
|
||||
(TRIGGER_UNSILENCE, "Unsilenced"),
|
||||
(TRIGGER_UNRESOLVE, "Unresolved"),
|
||||
)
|
||||
|
||||
public_primary_key = models.CharField(
|
||||
max_length=20,
|
||||
validators=[MinLengthValidator(settings.PUBLIC_PRIMARY_KEY_MIN_LENGTH + 1)],
|
||||
unique=True,
|
||||
default=generate_public_primary_key_for_webhook,
|
||||
)
|
||||
|
||||
organization = models.ForeignKey(
|
||||
"user_management.Organization", null=True, on_delete=models.CASCADE, related_name="webhooks", default=None
|
||||
)
|
||||
|
||||
team = models.ForeignKey(
|
||||
"user_management.Team", null=True, on_delete=models.CASCADE, related_name="webhooks", default=None
|
||||
)
|
||||
|
||||
user = models.ForeignKey(
|
||||
"user_management.User", null=True, on_delete=models.CASCADE, related_name="webhooks", default=None
|
||||
)
|
||||
|
||||
created_at = models.DateTimeField(auto_now_add=True)
|
||||
deleted_at = models.DateTimeField(blank=True, null=True)
|
||||
name = models.CharField(max_length=100, null=True, default=None)
|
||||
username = models.CharField(max_length=100, null=True, default=None)
|
||||
password = mirage_fields.EncryptedCharField(max_length=200, null=True, default=None)
|
||||
authorization_header = mirage_fields.EncryptedCharField(max_length=1000, null=True, default=None)
|
||||
trigger_template = models.TextField(null=True, default=None)
|
||||
headers = models.TextField(null=True, default=None)
|
||||
url = models.TextField(null=True, default=None)
|
||||
data = models.TextField(null=True, default=None)
|
||||
forward_all = models.BooleanField(default=True)
|
||||
http_method = models.CharField(max_length=32, default="POST")
|
||||
trigger_type = models.IntegerField(choices=TRIGGER_TYPES, default=None, null=True)
|
||||
|
||||
class Meta:
|
||||
unique_together = ("name", "organization")
|
||||
|
||||
def __str__(self):
|
||||
return str(self.name)
|
||||
|
||||
def delete(self):
|
||||
# TODO: delete related escalation policies on delete, once implemented
|
||||
# self.escalation_policies.all().delete()
|
||||
self.deleted_at = timezone.now()
|
||||
# 100 - 22 = 78. 100 is max len of name field, and 22 is len of suffix _deleted_<public_primary_key>
|
||||
# So for case when user created an entry with maximum length name it is needed to trim it to 78 chars
|
||||
# to be able to add suffix.
|
||||
self.name = f"{self.name[:78]}_deleted_{self.public_primary_key}"
|
||||
self.save()
|
||||
|
||||
def hard_delete(self):
|
||||
super().delete()
|
||||
|
||||
def build_request_kwargs(self, event_data, raise_data_errors=False):
|
||||
request_kwargs = {}
|
||||
if self.username and self.password:
|
||||
request_kwargs["auth"] = HTTPBasicAuth(self.username, self.password)
|
||||
|
||||
request_kwargs["headers"] = {}
|
||||
if self.headers:
|
||||
try:
|
||||
rendered_headers = apply_jinja_template_for_json(
|
||||
self.headers,
|
||||
event_data,
|
||||
)
|
||||
request_kwargs["headers"] = json.loads(rendered_headers)
|
||||
except (JinjaTemplateError, JinjaTemplateWarning) as e:
|
||||
raise InvalidWebhookHeaders(e.fallback_message)
|
||||
except JSONDecodeError:
|
||||
raise InvalidWebhookHeaders("Template did not result in json/dict")
|
||||
|
||||
if self.authorization_header:
|
||||
request_kwargs["headers"]["Authorization"] = self.authorization_header
|
||||
|
||||
if self.http_method in ["POST", "PUT"]:
|
||||
if self.forward_all:
|
||||
request_kwargs["json"] = event_data
|
||||
elif self.data:
|
||||
try:
|
||||
rendered_data = apply_jinja_template_for_json(
|
||||
self.data,
|
||||
event_data,
|
||||
)
|
||||
try:
|
||||
request_kwargs["json"] = json.loads(rendered_data)
|
||||
except (JSONDecodeError, TypeError):
|
||||
request_kwargs["data"] = rendered_data
|
||||
except (JinjaTemplateError, JinjaTemplateWarning) as e:
|
||||
if raise_data_errors:
|
||||
raise InvalidWebhookData(e.fallback_message)
|
||||
else:
|
||||
request_kwargs["json"] = {"error": e.fallback_message}
|
||||
|
||||
return request_kwargs
|
||||
|
||||
def build_url(self, event_data):
|
||||
try:
|
||||
url = apply_jinja_template(
|
||||
self.url,
|
||||
**event_data,
|
||||
)
|
||||
except (JinjaTemplateError, JinjaTemplateWarning) as e:
|
||||
raise InvalidWebhookUrl(e.fallback_message)
|
||||
|
||||
# raise if URL is not valid
|
||||
parse_url(url)
|
||||
|
||||
return url
|
||||
|
||||
def check_trigger(self, event_data):
|
||||
if not self.trigger_template:
|
||||
return True, ""
|
||||
|
||||
try:
|
||||
result = apply_jinja_template(self.trigger_template, **event_data)
|
||||
return result.lower() in ["true", "1"], result
|
||||
except (JinjaTemplateError, JinjaTemplateWarning) as e:
|
||||
raise InvalidWebhookTrigger(e.fallback_message)
|
||||
|
||||
def make_request(self, url, request_kwargs):
|
||||
if self.http_method == "GET":
|
||||
r = requests.get(url, timeout=OUTGOING_WEBHOOK_TIMEOUT, **request_kwargs)
|
||||
elif self.http_method == "POST":
|
||||
r = requests.post(url, timeout=OUTGOING_WEBHOOK_TIMEOUT, **request_kwargs)
|
||||
elif self.http_method == "PUT":
|
||||
r = requests.put(url, timeout=OUTGOING_WEBHOOK_TIMEOUT, **request_kwargs)
|
||||
elif self.http_method == "DELETE":
|
||||
r = requests.delete(url, timeout=OUTGOING_WEBHOOK_TIMEOUT, **request_kwargs)
|
||||
elif self.http_method == "OPTIONS":
|
||||
r = requests.options(url, timeout=OUTGOING_WEBHOOK_TIMEOUT, **request_kwargs)
|
||||
else:
|
||||
raise Exception(f"Unsupported http method: {self.http_method}")
|
||||
return r
|
||||
|
||||
# Insight logs
|
||||
@property
|
||||
def insight_logs_type_verbal(self):
|
||||
return "webhook"
|
||||
|
||||
@property
|
||||
def insight_logs_verbal(self):
|
||||
return self.name
|
||||
|
||||
def _insight_log_team(self):
|
||||
result = {"team": "General"}
|
||||
if self.team:
|
||||
result["team"] = self.team.name
|
||||
result["team_id"] = self.team.public_primary_key
|
||||
return result
|
||||
|
||||
@property
|
||||
def insight_logs_serialized(self):
|
||||
result = {
|
||||
"name": self.name,
|
||||
"trigger_type": self.trigger_type,
|
||||
"url": self.url,
|
||||
"data": self.data,
|
||||
"forward_all": self.forward_all,
|
||||
}
|
||||
result.update(self._insight_log_team())
|
||||
return result
|
||||
|
||||
@property
|
||||
def insight_logs_metadata(self):
|
||||
result = {}
|
||||
result.update(self._insight_log_team())
|
||||
return result
|
||||
|
||||
|
||||
class WebhookLog(models.Model):
|
||||
last_run_at = models.DateTimeField(blank=True, null=True)
|
||||
input_data = models.JSONField(default=None)
|
||||
url = models.TextField(null=True, default=None)
|
||||
trigger = models.TextField(null=True, default=None)
|
||||
headers = models.TextField(null=True, default=None)
|
||||
data = models.TextField(null=True, default=None)
|
||||
response_status = models.CharField(max_length=100, null=True, default=None)
|
||||
response = models.TextField(null=True, default=None)
|
||||
webhook = models.ForeignKey(
|
||||
to="webhooks.Webhook",
|
||||
on_delete=models.CASCADE,
|
||||
related_name="logs",
|
||||
blank=False,
|
||||
null=False,
|
||||
)
|
||||
0
engine/apps/webhooks/tests/__init__.py
Normal file
0
engine/apps/webhooks/tests/__init__.py
Normal file
13
engine/apps/webhooks/tests/factories.py
Normal file
13
engine/apps/webhooks/tests/factories.py
Normal file
|
|
@ -0,0 +1,13 @@
|
|||
import factory
|
||||
|
||||
from apps.webhooks.models import Webhook
|
||||
from common.utils import UniqueFaker
|
||||
|
||||
|
||||
class CustomWebhookFactory(factory.DjangoModelFactory):
|
||||
|
||||
url = factory.Faker("url")
|
||||
name = UniqueFaker("word")
|
||||
|
||||
class Meta:
|
||||
model = Webhook
|
||||
246
engine/apps/webhooks/tests/test_webhook.py
Normal file
246
engine/apps/webhooks/tests/test_webhook.py
Normal file
|
|
@ -0,0 +1,246 @@
|
|||
from unittest.mock import call, patch
|
||||
|
||||
import pytest
|
||||
from requests.auth import HTTPBasicAuth
|
||||
|
||||
from apps.alerts.utils import OUTGOING_WEBHOOK_TIMEOUT
|
||||
from apps.webhooks.models import Webhook
|
||||
from apps.webhooks.utils import InvalidWebhookData, InvalidWebhookHeaders, InvalidWebhookTrigger, InvalidWebhookUrl
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_soft_delete(make_organization, make_custom_webhook):
|
||||
organization = make_organization()
|
||||
webhook = make_custom_webhook(organization=organization)
|
||||
assert webhook.deleted_at is None
|
||||
webhook.delete()
|
||||
|
||||
webhook.refresh_from_db()
|
||||
assert webhook.deleted_at is not None
|
||||
|
||||
assert Webhook.objects.all().count() == 0
|
||||
assert Webhook.objects_with_deleted.all().count() == 1
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_hard_delete(make_organization, make_custom_webhook):
|
||||
organization = make_organization()
|
||||
webhook = make_custom_webhook(organization=organization)
|
||||
assert webhook.pk is not None
|
||||
webhook.hard_delete()
|
||||
|
||||
assert webhook.pk is None
|
||||
assert Webhook.objects.all().count() == 0
|
||||
assert Webhook.objects_with_deleted.all().count() == 0
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_build_request_kwargs_none(make_organization, make_custom_webhook):
|
||||
organization = make_organization()
|
||||
webhook = make_custom_webhook(organization=organization)
|
||||
request_kwargs = webhook.build_request_kwargs({})
|
||||
|
||||
assert request_kwargs == {"headers": {}, "json": {}}
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_build_request_kwargs_http_auth(make_organization, make_custom_webhook):
|
||||
organization = make_organization()
|
||||
webhook = make_custom_webhook(organization=organization, username="foo", password="bar")
|
||||
request_kwargs = webhook.build_request_kwargs({})
|
||||
|
||||
expected = HTTPBasicAuth("foo", "bar")
|
||||
assert request_kwargs == {"headers": {}, "json": {}, "auth": expected}
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_build_request_kwargs_headers(make_organization, make_custom_webhook):
|
||||
organization = make_organization()
|
||||
|
||||
# non json
|
||||
headers = "non-json"
|
||||
webhook = make_custom_webhook(organization=organization, headers=headers)
|
||||
with pytest.raises(InvalidWebhookHeaders):
|
||||
webhook.build_request_kwargs({})
|
||||
|
||||
# template error
|
||||
headers = "{{{foo|invalid}}}"
|
||||
webhook = make_custom_webhook(organization=organization, headers=headers)
|
||||
with pytest.raises(InvalidWebhookHeaders):
|
||||
webhook.build_request_kwargs({})
|
||||
|
||||
# ok (using event data)
|
||||
headers = '{"{{foo}}": "bar"}'
|
||||
webhook = make_custom_webhook(organization=organization, headers=headers)
|
||||
assert webhook.forward_all
|
||||
request_kwargs = webhook.build_request_kwargs({"foo": "bar"})
|
||||
assert request_kwargs == {"headers": {"bar": "bar"}, "json": {"foo": "bar"}}
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_build_request_kwargs_authorization_header(make_organization, make_custom_webhook):
|
||||
organization = make_organization()
|
||||
webhook = make_custom_webhook(organization=organization, authorization_header="some-token")
|
||||
request_kwargs = webhook.build_request_kwargs({})
|
||||
|
||||
assert request_kwargs == {"headers": {"Authorization": "some-token"}, "json": {}}
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_build_request_kwargs_http_get(make_organization, make_custom_webhook):
|
||||
organization = make_organization()
|
||||
webhook = make_custom_webhook(organization=organization, http_method="GET")
|
||||
request_kwargs = webhook.build_request_kwargs({})
|
||||
|
||||
assert request_kwargs == {"headers": {}}
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_build_request_kwargs_custom_data(make_organization, make_custom_webhook):
|
||||
organization = make_organization()
|
||||
webhook = make_custom_webhook(organization=organization, data="{{foo}}", forward_all=False)
|
||||
request_kwargs = webhook.build_request_kwargs({"foo": "bar", "something": "else"})
|
||||
|
||||
assert request_kwargs == {"headers": {}, "data": "bar"}
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_build_request_kwargs_custom_data_error(make_organization, make_custom_webhook):
|
||||
organization = make_organization()
|
||||
webhook = make_custom_webhook(organization=organization, data="{{foo|invalid}}", forward_all=False)
|
||||
|
||||
# raise
|
||||
with pytest.raises(InvalidWebhookData):
|
||||
webhook.build_request_kwargs({"foo": "bar", "something": "else"}, raise_data_errors=True)
|
||||
|
||||
# do not raise
|
||||
request_kwargs = webhook.build_request_kwargs({"foo": "bar", "something": "else"})
|
||||
assert request_kwargs == {"headers": {}, "json": {"error": "Template Error: No filter named 'invalid'."}}
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_build_url_invalid_template(make_organization, make_custom_webhook):
|
||||
organization = make_organization()
|
||||
webhook = make_custom_webhook(organization=organization, url="{{foo|invalid}}")
|
||||
|
||||
with pytest.raises(InvalidWebhookUrl):
|
||||
webhook.build_url({})
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_build_url_invalid_url(make_organization, make_custom_webhook):
|
||||
organization = make_organization()
|
||||
webhook = make_custom_webhook(organization=organization, url="{{foo}}")
|
||||
|
||||
with pytest.raises(InvalidWebhookUrl):
|
||||
webhook.build_url({"foo": "invalid-url"})
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_build_url_private_raises(make_organization, make_custom_webhook):
|
||||
organization = make_organization()
|
||||
webhook = make_custom_webhook(organization=organization, url="{{foo}}")
|
||||
|
||||
with pytest.raises(InvalidWebhookUrl):
|
||||
with patch("apps.webhooks.utils.socket.gethostbyname") as mock_gethostbyname:
|
||||
mock_gethostbyname.return_value = "127.0.0.1"
|
||||
webhook.build_url({"foo": "http://oncall.url"})
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_build_url_ok(make_organization, make_custom_webhook):
|
||||
organization = make_organization()
|
||||
webhook = make_custom_webhook(organization=organization, url="{{foo}}")
|
||||
|
||||
with patch("apps.webhooks.utils.socket.gethostbyname") as mock_gethostbyname:
|
||||
mock_gethostbyname.return_value = "8.8.8.8"
|
||||
url = webhook.build_url({"foo": "http://oncall.url"})
|
||||
|
||||
assert url == "http://oncall.url"
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_check_trigger_empty(make_organization, make_custom_webhook):
|
||||
organization = make_organization()
|
||||
webhook = make_custom_webhook(organization=organization)
|
||||
|
||||
ok, result = webhook.check_trigger({})
|
||||
assert ok
|
||||
assert result == ""
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_check_trigger_template_error(make_organization, make_custom_webhook):
|
||||
organization = make_organization()
|
||||
webhook = make_custom_webhook(organization=organization, trigger_template="{{foo|invalid}}")
|
||||
|
||||
with pytest.raises(InvalidWebhookTrigger):
|
||||
webhook.check_trigger({"foo": "bar"})
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_check_trigger_template_ok(make_organization, make_custom_webhook):
|
||||
organization = make_organization()
|
||||
webhook = make_custom_webhook(organization=organization, trigger_template="{{ foo }}")
|
||||
|
||||
ok, result = webhook.check_trigger({"foo": "true"})
|
||||
assert ok
|
||||
assert result == "true"
|
||||
|
||||
ok, result = webhook.check_trigger({"foo": "bar"})
|
||||
assert not ok
|
||||
assert result == "bar"
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_make_request(make_organization, make_custom_webhook):
|
||||
organization = make_organization()
|
||||
|
||||
with patch("apps.webhooks.models.webhook.requests") as mock_requests:
|
||||
for method in ("GET", "POST", "PUT", "DELETE", "OPTIONS"):
|
||||
webhook = make_custom_webhook(organization=organization, http_method=method)
|
||||
webhook.make_request("url", {"foo": "bar"})
|
||||
expected_call = getattr(mock_requests, method.lower())
|
||||
assert expected_call.called
|
||||
assert expected_call.call_args == call("url", timeout=OUTGOING_WEBHOOK_TIMEOUT, foo="bar")
|
||||
|
||||
# invalid
|
||||
with pytest.raises(Exception):
|
||||
webhook = make_custom_webhook(organization=organization, http_method="NOT")
|
||||
webhook.make_request("url", {"foo": "bar"})
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_escaping_payload_with_double_quotes(make_organization, make_custom_webhook):
|
||||
organization = make_organization()
|
||||
webhook = make_custom_webhook(
|
||||
organization=organization,
|
||||
data='{\n "text" : "{{ alert_payload.text }}"\n}',
|
||||
forward_all=False,
|
||||
)
|
||||
|
||||
payload = {
|
||||
"alert_payload": {
|
||||
"text": '"Hello world"',
|
||||
}
|
||||
}
|
||||
request_kwargs = webhook.build_request_kwargs(payload)
|
||||
assert request_kwargs == {"headers": {}, "json": {"text": '"Hello world"'}}
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_escaping_payload_with_single_quote_in_string(make_organization, make_custom_webhook):
|
||||
organization = make_organization()
|
||||
webhook = make_custom_webhook(
|
||||
organization=organization,
|
||||
data='{"data" : "{{ alert_payload }}"}',
|
||||
forward_all=False,
|
||||
)
|
||||
|
||||
payload = {
|
||||
"alert_payload": {
|
||||
"text": "Hi, it's alert",
|
||||
}
|
||||
}
|
||||
request_kwargs = webhook.build_request_kwargs(payload)
|
||||
assert request_kwargs == {"headers": {}, "json": {"data": "{'text': \"Hi, it's alert\"}"}}
|
||||
113
engine/apps/webhooks/utils.py
Normal file
113
engine/apps/webhooks/utils.py
Normal file
|
|
@ -0,0 +1,113 @@
|
|||
import ipaddress
|
||||
import json
|
||||
import re
|
||||
import socket
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
from apps.base.utils import live_settings
|
||||
from common.jinja_templater import apply_jinja_template
|
||||
|
||||
|
||||
class InvalidWebhookUrl(Exception):
|
||||
def __init__(self, message):
|
||||
self.message = f"URL - {message}"
|
||||
|
||||
|
||||
class InvalidWebhookTrigger(Exception):
|
||||
def __init__(self, message):
|
||||
self.message = f"Trigger - {message}"
|
||||
|
||||
|
||||
class InvalidWebhookHeaders(Exception):
|
||||
def __init__(self, message):
|
||||
self.message = f"Headers - {message}"
|
||||
|
||||
|
||||
class InvalidWebhookData(Exception):
|
||||
def __init__(self, message):
|
||||
self.message = f"Data - {message}"
|
||||
|
||||
|
||||
def parse_url(url):
|
||||
parsed_url = urlparse(url)
|
||||
# ensure the url looks like url
|
||||
if parsed_url.scheme not in ["http", "https"] or not parsed_url.netloc:
|
||||
raise InvalidWebhookUrl("Malformed url")
|
||||
|
||||
if settings.BASE_URL in url:
|
||||
raise InvalidWebhookUrl("Potential self-reference")
|
||||
|
||||
if not live_settings.DANGEROUS_WEBHOOKS_ENABLED:
|
||||
# Get the ip address of the webhook url and check if it belongs to the private network
|
||||
try:
|
||||
webhook_url_ip_address = socket.gethostbyname(parsed_url.hostname)
|
||||
except socket.gaierror:
|
||||
raise InvalidWebhookUrl("Cannot resolve name in url")
|
||||
if ipaddress.ip_address(socket.gethostbyname(webhook_url_ip_address)).is_private:
|
||||
raise InvalidWebhookUrl("This url is not supported for outgoing webhooks")
|
||||
|
||||
return parsed_url
|
||||
|
||||
|
||||
def apply_jinja_template_for_json(template, payload):
|
||||
escaped_payload = escape_payload(payload)
|
||||
return apply_jinja_template(template, **escaped_payload)
|
||||
|
||||
|
||||
def escape_payload(payload: dict):
|
||||
if isinstance(payload, dict):
|
||||
escaped_payload = EscapeDoubleQuotesDict()
|
||||
for key in payload.keys():
|
||||
escaped_payload[key] = escape_payload(payload[key])
|
||||
elif isinstance(payload, list):
|
||||
escaped_payload = []
|
||||
for value in payload:
|
||||
escaped_payload.append(escape_payload(value))
|
||||
elif isinstance(payload, str):
|
||||
escaped_payload = escape_string(payload)
|
||||
else:
|
||||
escaped_payload = payload
|
||||
return escaped_payload
|
||||
|
||||
|
||||
def escape_string(string: str):
|
||||
"""
|
||||
Escapes string to use in json.loads() method.
|
||||
json.dumps is the simples way to escape all special characters in string.
|
||||
First and last chars are quotes from json.dumps(), we don't need them, only escaping.
|
||||
"""
|
||||
return json.dumps(string)[1:-1]
|
||||
|
||||
|
||||
class EscapeDoubleQuotesDict(dict):
|
||||
"""
|
||||
Warning: Please, do not use this dict anywhere except CustomButton._escape_alert_payload.
|
||||
This custom dict escapes double quotes to produce string which is safe to pass to json.loads()
|
||||
It fixes case when CustomButton.build_post_kwargs failing on payloads which contains string with single quote.
|
||||
In this case built-in dict's str method will surround value with double quotes.
|
||||
|
||||
For example:
|
||||
|
||||
alert_payload = {
|
||||
"text": "Hi, it's alert",
|
||||
}
|
||||
template = '{"data" : "{{ alert_payload }}"}'
|
||||
rendered = '{"data" : "{\'text\': "Hi, it\'s alert"}"}'
|
||||
# and json.loads(rendered) will fail due to unescaped double quotes
|
||||
|
||||
# Now with EscapeDoubleQuotesDict.
|
||||
|
||||
alert_payload = EscapeDoubleQuotesDict({
|
||||
"text": "Hi, it's alert",
|
||||
})
|
||||
rendered = '{"data" : "{\'text\': \\"Hi, it\'s alert\\"}"}'
|
||||
# and json.loads(rendered) works.
|
||||
"""
|
||||
|
||||
def __str__(self):
|
||||
original_str = super().__str__()
|
||||
if '"' in original_str:
|
||||
return re.sub('(?<!\\\\)"', '\\\\"', original_str)
|
||||
return original_str
|
||||
|
|
@ -81,6 +81,7 @@ from apps.telegram.tests.factories import (
|
|||
from apps.twilioapp.tests.factories import PhoneCallFactory, SMSFactory
|
||||
from apps.user_management.models.user import User, listen_for_user_model_save
|
||||
from apps.user_management.tests.factories import OrganizationFactory, RegionFactory, TeamFactory, UserFactory
|
||||
from apps.webhooks.tests.factories import CustomWebhookFactory
|
||||
|
||||
register(OrganizationFactory)
|
||||
register(UserFactory)
|
||||
|
|
@ -616,6 +617,15 @@ def make_custom_action():
|
|||
return _make_custom_action
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def make_custom_webhook():
|
||||
def _make_custom_webhook(organization, **kwargs):
|
||||
custom_webhook = CustomWebhookFactory(organization=organization, **kwargs)
|
||||
return custom_webhook
|
||||
|
||||
return _make_custom_webhook
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def make_slack_user_group():
|
||||
def _make_slack_user_group(slack_team_identity, **kwargs):
|
||||
|
|
|
|||
|
|
@ -214,6 +214,7 @@ INSTALLED_APPS = [
|
|||
"apps.auth_token",
|
||||
"apps.public_api",
|
||||
"apps.grafana_plugin",
|
||||
"apps.webhooks",
|
||||
"corsheaders",
|
||||
"debug_toolbar",
|
||||
"social_django",
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue