Inbound email improvements (#5259)

# What this PR does

* Allows to use multiple inbound email ESPs at the same time by setting
the `INBOUND_EMAIL_ESP` env variable to `amazon_ses,mailgun` for example
* Adds a new ESP `amazon_ses_validated` that performs SNS message
vaildation (`django-anymail` doesn't implement it:
[comment](35383c7140/anymail/webhooks/amazon_ses.py (L107-L108)))

## Which issue(s) this PR closes

Related to https://github.com/grafana/oncall-private/issues/2905

<!--
*Note*: If you want the issue to be auto-closed once the PR is merged,
change "Related to" to "Closes" in the line above.
If you have more than one GitHub issue that this PR closes, be sure to
preface
each issue link with a [closing
keyword](https://docs.github.com/en/get-started/writing-on-github/working-with-advanced-formatting/using-keywords-in-issues-and-pull-requests#linking-a-pull-request-to-an-issue).
This ensures that the issue(s) are auto-closed once the PR has been
merged.
-->

## Checklist

- [x] Unit, integration, and e2e (if applicable) tests updated
- [x] Documentation added (or `pr:no public docs` PR label added if not
required)
- [x] Added the relevant release notes label (see labels prefixed w/
`release:`). These labels dictate how your PR will
    show up in the autogenerated release notes.
This commit is contained in:
Vadim Stepanov 2024-11-18 09:44:32 +00:00 committed by GitHub
parent 208db9cdb7
commit 10dc454c7b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 600 additions and 33 deletions

View file

@ -1,27 +1,42 @@
import logging
from functools import cached_property
from typing import Optional, TypedDict
from anymail.exceptions import AnymailInvalidAddress, AnymailWebhookValidationFailure
from anymail.exceptions import AnymailAPIError, AnymailInvalidAddress, AnymailWebhookValidationFailure
from anymail.inbound import AnymailInboundMessage
from anymail.signals import AnymailInboundEvent
from anymail.webhooks import amazon_ses, mailgun, mailjet, mandrill, postal, postmark, sendgrid, sparkpost
from django.http import HttpResponse, HttpResponseNotAllowed
from django.utils import timezone
from rest_framework import status
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.views import APIView
from apps.base.utils import live_settings
from apps.email.validate_amazon_sns_message import validate_amazon_sns_message
from apps.integrations.mixins import AlertChannelDefiningMixin
from apps.integrations.tasks import create_alert
logger = logging.getLogger(__name__)
class AmazonSESValidatedInboundWebhookView(amazon_ses.AmazonSESInboundWebhookView):
# disable "Your Anymail webhooks are insecure and open to anyone on the web." warning
warn_if_no_basic_auth = False
def validate_request(self, request):
"""Add SNS message validation to Amazon SES inbound webhook view, which is not implemented in Anymail."""
super().validate_request(request)
sns_message = self._parse_sns_message(request)
if not validate_amazon_sns_message(sns_message):
raise AnymailWebhookValidationFailure("SNS message validation failed")
# {<ESP name>: (<django-anymail inbound webhook view class>, <webhook secret argument name to pass to the view>), ...}
INBOUND_EMAIL_ESP_OPTIONS = {
"amazon_ses": (amazon_ses.AmazonSESInboundWebhookView, None),
"amazon_ses_validated": (AmazonSESValidatedInboundWebhookView, None),
"mailgun": (mailgun.MailgunInboundWebhookView, "webhook_signing_key"),
"mailjet": (mailjet.MailjetInboundWebhookView, "webhook_secret"),
"mandrill": (mandrill.MandrillCombinedWebhookView, "webhook_key"),
@ -62,38 +77,33 @@ class InboundEmailWebhookView(AlertChannelDefiningMixin, APIView):
return super().dispatch(request, alert_channel_key=integration_token)
def post(self, request):
timestamp = timezone.now().isoformat()
for message in self.get_messages_from_esp_request(request):
payload = self.get_alert_payload_from_email_message(message)
create_alert.delay(
title=payload["subject"],
message=payload["message"],
alert_receive_channel_pk=request.alert_receive_channel.pk,
image_url=None,
link_to_upstream_details=None,
integration_unique_data=None,
raw_request_data=payload,
received_at=timestamp,
)
payload = self.get_alert_payload_from_email_message(self.message)
create_alert.delay(
title=payload["subject"],
message=payload["message"],
alert_receive_channel_pk=request.alert_receive_channel.pk,
image_url=None,
link_to_upstream_details=None,
integration_unique_data=None,
raw_request_data=payload,
received_at=timezone.now().isoformat(),
)
return Response("OK", status=status.HTTP_200_OK)
def get_integration_token_from_request(self, request) -> Optional[str]:
messages = self.get_messages_from_esp_request(request)
if not messages:
if not self.message:
return None
message = messages[0]
# First try envelope_recipient field.
# According to AnymailInboundMessage it's provided not by all ESPs.
if message.envelope_recipient:
recipients = message.envelope_recipient.split(",")
if self.message.envelope_recipient:
recipients = self.message.envelope_recipient.split(",")
for recipient in recipients:
# if there is more than one recipient, the first matching the expected domain will be used
try:
token, domain = recipient.strip().split("@")
except ValueError:
logger.error(
f"get_integration_token_from_request: envelope_recipient field has unexpected format: {message.envelope_recipient}"
f"get_integration_token_from_request: envelope_recipient field has unexpected format: {self.message.envelope_recipient}"
)
continue
if domain == live_settings.INBOUND_EMAIL_DOMAIN:
@ -113,20 +123,27 @@ class InboundEmailWebhookView(AlertChannelDefiningMixin, APIView):
# return cc.address.split("@")[0]
return None
def get_messages_from_esp_request(self, request: Request) -> list[AnymailInboundMessage]:
view_class, secret_name = INBOUND_EMAIL_ESP_OPTIONS[live_settings.INBOUND_EMAIL_ESP]
@cached_property
def message(self) -> AnymailInboundMessage | None:
esps = live_settings.INBOUND_EMAIL_ESP.split(",")
for esp in esps:
view_class, secret_name = INBOUND_EMAIL_ESP_OPTIONS[esp]
kwargs = {secret_name: live_settings.INBOUND_EMAIL_WEBHOOK_SECRET} if secret_name else {}
view = view_class(**kwargs)
kwargs = {secret_name: live_settings.INBOUND_EMAIL_WEBHOOK_SECRET} if secret_name else {}
view = view_class(**kwargs)
try:
view.run_validators(request)
events = view.parse_events(request)
except AnymailWebhookValidationFailure as e:
logger.info(f"get_messages_from_esp_request: inbound email webhook validation failed: {e}")
return []
try:
view.run_validators(self.request)
events = view.parse_events(self.request)
except (AnymailWebhookValidationFailure, AnymailAPIError) as e:
logger.info(f"inbound email webhook validation failed for ESP {esp}: {e}")
continue
return [event.message for event in events if isinstance(event, AnymailInboundEvent)]
messages = [event.message for event in events if isinstance(event, AnymailInboundEvent)]
if messages:
return messages[0]
return None
def check_inbound_email_settings_set(self):
"""

View file

@ -1,13 +1,295 @@
import datetime
import hashlib
import hmac
import json
from base64 import b64encode
from textwrap import dedent
from unittest.mock import ANY, Mock, patch
import pytest
from anymail.inbound import AnymailInboundMessage
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.x509 import CertificateBuilder, NameOID
from django.conf import settings
from django.urls import reverse
from rest_framework import status
from rest_framework.test import APIClient
from apps.alerts.models import AlertReceiveChannel
from apps.email.inbound import InboundEmailWebhookView
from apps.integrations.tasks import create_alert
PRIVATE_KEY = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
ISSUER_NAME = x509.Name(
[
x509.NameAttribute(NameOID.COUNTRY_NAME, "US"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Test"),
x509.NameAttribute(NameOID.LOCALITY_NAME, "Test"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Amazon"),
x509.NameAttribute(NameOID.COMMON_NAME, "Test"),
]
)
CERTIFICATE = (
CertificateBuilder()
.subject_name(ISSUER_NAME)
.issuer_name(ISSUER_NAME)
.public_key(PRIVATE_KEY.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.datetime.now() - datetime.timedelta(days=1))
.not_valid_after(datetime.datetime.now() + datetime.timedelta(days=10))
.sign(PRIVATE_KEY, hashes.SHA256())
.public_bytes(serialization.Encoding.PEM)
)
AMAZON_SNS_TOPIC_ARN = "arn:aws:sns:us-east-2:123456789012:test"
SIGNING_CERT_URL = "https://sns.us-east-2.amazonaws.com/SimpleNotificationService-example.pem"
def _sns_inbound_email_payload_and_headers(sender_email, to_email, subject, message):
content = (
f"From: Sender Name <{sender_email}>\n"
f"To: {to_email}\n"
f"Subject: {subject}\n"
"Date: Tue, 5 Nov 2024 16:05:39 +0000\n"
"Message-ID: <example-message-id@mail.example.com>\n\n"
f"{message}\r\n"
)
message = {
"notificationType": "Received",
"mail": {
"timestamp": "2024-11-05T16:05:52.387Z",
"source": sender_email,
"messageId": "example-message-id-5678",
"destination": [to_email],
"headersTruncated": False,
"headers": [
{"name": "Return-Path", "value": f"<{sender_email}>"},
{
"name": "Received",
"value": (
f"from mail.example.com (mail.example.com [203.0.113.1]) "
f"by inbound-smtp.us-east-2.amazonaws.com with SMTP id example-id "
f"for {to_email}; Tue, 05 Nov 2024 16:05:52 +0000 (UTC)"
),
},
{"name": "X-SES-Spam-Verdict", "value": "PASS"},
{"name": "X-SES-Virus-Verdict", "value": "PASS"},
{
"name": "Received-SPF",
"value": (
"pass (spfCheck: domain of example.com designates 203.0.113.1 as permitted sender) "
f"client-ip=203.0.113.1; envelope-from={sender_email}; helo=mail.example.com;"
),
},
{
"name": "Authentication-Results",
"value": (
"amazonses.com; spf=pass (spfCheck: domain of example.com designates 203.0.113.1 as permitted sender) "
f"client-ip=203.0.113.1; envelope-from={sender_email}; helo=mail.example.com; "
"dkim=pass header.i=@example.com; dmarc=pass header.from=example.com;"
),
},
{"name": "X-SES-RECEIPT", "value": "example-receipt-data"},
{"name": "X-SES-DKIM-SIGNATURE", "value": "example-dkim-signature"},
{
"name": "Received",
"value": (
f"by mail.example.com with SMTP id example-id for <{to_email}>; "
"Tue, 05 Nov 2024 08:05:52 -0800 (PST)"
),
},
{
"name": "DKIM-Signature",
"value": (
"v=1; a=rsa-sha256; c=relaxed/relaxed; d=example.com; s=default; t=1234567890; "
"bh=examplehash; h=From:To:Subject:Date:Message-ID; b=example-signature"
),
},
{"name": "X-Google-DKIM-Signature", "value": "example-google-dkim-signature"},
{"name": "X-Gm-Message-State", "value": "example-message-state"},
{"name": "X-Google-Smtp-Source", "value": "example-smtp-source"},
{
"name": "X-Received",
"value": "by 2002:a17:example with SMTP id example-id; Tue, 05 Nov 2024 08:05:50 -0800 (PST)",
},
{"name": "MIME-Version", "value": "1.0"},
{"name": "From", "value": f"Sender Name <{sender_email}>"},
{"name": "Date", "value": "Tue, 5 Nov 2024 16:05:39 +0000"},
{"name": "Message-ID", "value": "<example-message-id@mail.example.com>"},
{"name": "Subject", "value": subject},
{"name": "To", "value": to_email},
{
"name": "Content-Type",
"value": 'multipart/alternative; boundary="00000000000036b9f706262c9312"',
},
],
"commonHeaders": {
"returnPath": sender_email,
"from": [f"Sender Name <{sender_email}>"],
"date": "Tue, 5 Nov 2024 16:05:39 +0000",
"to": [to_email],
"messageId": "<example-message-id@mail.example.com>",
"subject": subject,
},
},
"receipt": {
"timestamp": "2024-11-05T16:05:52.387Z",
"processingTimeMillis": 638,
"recipients": [to_email],
"spamVerdict": {"status": "PASS"},
"virusVerdict": {"status": "PASS"},
"spfVerdict": {"status": "PASS"},
"dkimVerdict": {"status": "PASS"},
"dmarcVerdict": {"status": "PASS"},
"action": {
"type": "SNS",
"topicArn": "arn:aws:sns:us-east-2:123456789012:test",
"encoding": "BASE64",
},
},
"content": b64encode(content.encode()).decode(),
}
payload = {
"Type": "Notification",
"MessageId": "example-message-id-1234",
"TopicArn": AMAZON_SNS_TOPIC_ARN,
"Subject": "Amazon SES Email Receipt Notification",
"Message": json.dumps(message),
"Timestamp": "2024-11-05T16:05:53.041Z",
"SignatureVersion": "1",
"SigningCertURL": SIGNING_CERT_URL,
"UnsubscribeURL": (
"https://sns.us-east-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn="
"arn:aws:sns:us-east-2:123456789012:test:example-subscription-id"
),
}
# Sign the payload
canonical_message = "".join(
f"{key}\n{payload[key]}\n" for key in ("Message", "MessageId", "Subject", "Timestamp", "TopicArn", "Type")
)
signature = PRIVATE_KEY.sign(
canonical_message.encode(),
padding.PKCS1v15(),
hashes.SHA1(),
)
payload["Signature"] = b64encode(signature).decode()
headers = {
"X-Amz-Sns-Message-Type": "Notification",
"X-Amz-Sns-Message-Id": "example-message-id-1234",
}
return payload, headers
def _mailgun_inbound_email_payload(sender_email, to_email, subject, message):
timestamp, token = "1731341416", "example-token"
signature = hmac.new(
key=settings.INBOUND_EMAIL_WEBHOOK_SECRET.encode("ascii"),
msg="{}{}".format(timestamp, token).encode("ascii"),
digestmod=hashlib.sha256,
).hexdigest()
return {
"Content-Type": 'multipart/alternative; boundary="000000000000267130626a556e5"',
"Date": "Mon, 11 Nov 2024 16:10:03 +0000",
"Dkim-Signature": (
"v=1; a=rsa-sha256; c=relaxed/relaxed; d=example.com; s=default; "
"t=1731341415; x=1731946215; darn=example.com; "
"h=to:subject:message-id:date:from:mime-version:from:to:cc:subject "
":date:message-id:reply-to; bh=examplebh; b=exampleb"
),
"From": f"Sender Name <{sender_email}>",
"Message-Id": "<example-message-id@mail.example.com>",
"Mime-Version": "1.0",
"Received": (
f"by mail.example.com with SMTP id example-id for <{to_email}>; " "Mon, 11 Nov 2024 08:10:15 -0800 (PST)"
),
"Subject": subject,
"To": to_email,
"X-Envelope-From": sender_email,
"X-Gm-Message-State": "example-message-state",
"X-Google-Dkim-Signature": (
"v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; "
"t=1731341415; x=1731946215; "
"h=to:subject:message-id:date:from:mime-version:x-gm-message-state "
":from:to:cc:subject:date:message-id:reply-to; bh=examplebh; b=exampleb"
),
"X-Google-Smtp-Source": "example-smtp-source",
"X-Mailgun-Incoming": "Yes",
"X-Received": "by 2002:a17:example with SMTP id example-id; Mon, 11 Nov 2024 08:10:14 -0800 (PST)",
"body-html": f'<div dir="ltr">{message}<br></div>\r\n',
"body-plain": f"{message}\r\n",
"from": f"Sender Name <{sender_email}>",
"message-headers": json.dumps(
[
["X-Mailgun-Incoming", "Yes"],
["X-Envelope-From", sender_email],
[
"Received",
(
"from mail.example.com (mail.example.com [203.0.113.1]) "
"by example.com with SMTP id example-id; "
"Mon, 11 Nov 2024 16:10:15 GMT"
),
],
[
"Received",
(
f"by mail.example.com with SMTP id example-id for <{to_email}>; "
"Mon, 11 Nov 2024 08:10:15 -0800 (PST)"
),
],
[
"Dkim-Signature",
(
"v=1; a=rsa-sha256; c=relaxed/relaxed; d=example.com; s=default; "
"t=1731341415; x=1731946215; darn=example.com; "
"h=to:subject:message-id:date:from:mime-version:from:to:cc:subject "
":date:message-id:reply-to; bh=examplebh; b=exampleb"
),
],
[
"X-Google-Dkim-Signature",
(
"v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; "
"t=1731341415; x=1731946215; "
"h=to:subject:message-id:date:from:mime-version:x-gm-message-state "
":from:to:cc:subject:date:message-id:reply-to; bh=examplebh; b=exampleb"
),
],
["X-Gm-Message-State", "example-message-state"],
["X-Google-Smtp-Source", "example-smtp-source"],
[
"X-Received",
"by 2002:a17:example with SMTP id example-id; Mon, 11 Nov 2024 08:10:14 -0800 (PST)",
],
["Mime-Version", "1.0"],
["From", f"Sender Name <{sender_email}>"],
["Date", "Mon, 11 Nov 2024 16:10:03 +0000"],
["Message-Id", "<example-message-id@mail.example.com>"],
["Subject", subject],
["To", to_email],
[
"Content-Type",
'multipart/alternative; boundary="000000000000267130626a556e5"',
],
]
),
"recipient": to_email,
"sender": sender_email,
"signature": signature,
"stripped-html": f'<div dir="ltr">{message}<br></div>\n',
"stripped-text": f"{message}\n",
"subject": subject,
"timestamp": timestamp,
"token": token,
}
@pytest.mark.parametrize(
@ -141,3 +423,171 @@ def test_get_sender_from_email_message(sender_value, expected_result):
view = InboundEmailWebhookView()
result = view.get_sender_from_email_message(email)
assert result == expected_result
@patch.object(create_alert, "delay")
@pytest.mark.django_db
def test_amazon_ses_pass(create_alert_mock, settings, make_organization, make_alert_receive_channel):
settings.INBOUND_EMAIL_ESP = "amazon_ses,mailgun"
settings.INBOUND_EMAIL_DOMAIN = "inbound.example.com"
settings.INBOUND_EMAIL_WEBHOOK_SECRET = "secret"
organization = make_organization()
alert_receive_channel = make_alert_receive_channel(
organization,
integration=AlertReceiveChannel.INTEGRATION_INBOUND_EMAIL,
token="test-token",
)
sender_email = "sender@example.com"
to_email = "test-token@inbound.example.com"
subject = "Test email"
message = "This is a test email message body."
sns_payload, sns_headers = _sns_inbound_email_payload_and_headers(
sender_email=sender_email,
to_email=to_email,
subject=subject,
message=message,
)
client = APIClient()
response = client.post(
reverse("integrations:inbound_email_webhook"),
data=sns_payload,
headers=sns_headers,
format="json",
)
assert response.status_code == status.HTTP_200_OK
create_alert_mock.assert_called_once_with(
title=subject,
message=message,
alert_receive_channel_pk=alert_receive_channel.pk,
image_url=None,
link_to_upstream_details=None,
integration_unique_data=None,
raw_request_data={
"subject": subject,
"message": message,
"sender": sender_email,
},
received_at=ANY,
)
@patch("requests.get", return_value=Mock(content=CERTIFICATE))
@patch.object(create_alert, "delay")
@pytest.mark.django_db
def test_amazon_ses_validated_pass(
mock_create_alert, mock_requests_get, settings, make_organization, make_alert_receive_channel
):
settings.INBOUND_EMAIL_ESP = "amazon_ses_validated,mailgun"
settings.INBOUND_EMAIL_DOMAIN = "inbound.example.com"
settings.INBOUND_EMAIL_WEBHOOK_SECRET = "secret"
settings.INBOUND_EMAIL_AMAZON_SNS_TOPIC_ARN = AMAZON_SNS_TOPIC_ARN
organization = make_organization()
alert_receive_channel = make_alert_receive_channel(
organization,
integration=AlertReceiveChannel.INTEGRATION_INBOUND_EMAIL,
token="test-token",
)
sender_email = "sender@example.com"
to_email = "test-token@inbound.example.com"
subject = "Test email"
message = "This is a test email message body."
sns_payload, sns_headers = _sns_inbound_email_payload_and_headers(
sender_email=sender_email,
to_email=to_email,
subject=subject,
message=message,
)
client = APIClient()
response = client.post(
reverse("integrations:inbound_email_webhook"),
data=sns_payload,
headers=sns_headers,
format="json",
)
assert response.status_code == status.HTTP_200_OK
mock_create_alert.assert_called_once_with(
title=subject,
message=message,
alert_receive_channel_pk=alert_receive_channel.pk,
image_url=None,
link_to_upstream_details=None,
integration_unique_data=None,
raw_request_data={
"subject": subject,
"message": message,
"sender": sender_email,
},
received_at=ANY,
)
mock_requests_get.assert_called_once_with(SIGNING_CERT_URL, timeout=5)
@patch.object(create_alert, "delay")
@pytest.mark.django_db
def test_mailgun_pass(create_alert_mock, settings, make_organization, make_alert_receive_channel):
settings.INBOUND_EMAIL_ESP = "amazon_ses,mailgun"
settings.INBOUND_EMAIL_DOMAIN = "inbound.example.com"
settings.INBOUND_EMAIL_WEBHOOK_SECRET = "secret"
organization = make_organization()
alert_receive_channel = make_alert_receive_channel(
organization,
integration=AlertReceiveChannel.INTEGRATION_INBOUND_EMAIL,
token="test-token",
)
sender_email = "sender@example.com"
to_email = "test-token@inbound.example.com"
subject = "Test email"
message = "This is a test email message body."
mailgun_payload = _mailgun_inbound_email_payload(
sender_email=sender_email,
to_email=to_email,
subject=subject,
message=message,
)
client = APIClient()
response = client.post(
reverse("integrations:inbound_email_webhook"),
data=mailgun_payload,
format="multipart",
)
assert response.status_code == status.HTTP_200_OK
create_alert_mock.assert_called_once_with(
title=subject,
message=message,
alert_receive_channel_pk=alert_receive_channel.pk,
image_url=None,
link_to_upstream_details=None,
integration_unique_data=None,
raw_request_data={
"subject": subject,
"message": message,
"sender": sender_email,
},
received_at=ANY,
)
@pytest.mark.django_db
def test_multiple_esps_fail(settings):
settings.INBOUND_EMAIL_ESP = "amazon_ses,mailgun"
settings.INBOUND_EMAIL_DOMAIN = "example.com"
settings.INBOUND_EMAIL_WEBHOOK_SECRET = "secret"
client = APIClient()
response = client.post(reverse("integrations:inbound_email_webhook"), data={})
assert response.status_code == status.HTTP_400_BAD_REQUEST

View file

@ -0,0 +1,99 @@
import logging
import re
from base64 import b64decode
from urllib.parse import urlparse
import requests
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
from cryptography.hazmat.primitives.hashes import SHA1, SHA256
from cryptography.x509 import NameOID, load_pem_x509_certificate
from django.conf import settings
logger = logging.getLogger(__name__)
HOST_PATTERN = re.compile(r"^sns\.[a-zA-Z0-9\-]{3,}\.amazonaws\.com(\.cn)?$")
REQUIRED_KEYS = (
"Message",
"MessageId",
"Timestamp",
"TopicArn",
"Type",
"Signature",
"SigningCertURL",
"SignatureVersion",
)
SIGNING_KEYS_NOTIFICATION = ("Message", "MessageId", "Subject", "Timestamp", "TopicArn", "Type")
SIGNING_KEYS_SUBSCRIPTION = ("Message", "MessageId", "SubscribeURL", "Timestamp", "Token", "TopicArn", "Type")
def validate_amazon_sns_message(message: dict) -> bool:
"""
Validate an AWS SNS message. Based on:
- https://docs.aws.amazon.com/sns/latest/dg/sns-verify-signature-of-message.html
- https://github.com/aws/aws-js-sns-message-validator/blob/a6ba4d646dc60912653357660301f3b25f94d686/index.js
- https://github.com/aws/aws-php-sns-message-validator/blob/3cee0fc1aee5538e1bd677654b09fad811061d0b/src/MessageValidator.php
"""
# Check if the message has all the required keys
if not all(key in message for key in REQUIRED_KEYS):
logger.warning("Missing required keys in the message, got: %s", message.keys())
return False
# Check TopicArn
if message["TopicArn"] != settings.INBOUND_EMAIL_AMAZON_SNS_TOPIC_ARN:
logger.warning("Invalid TopicArn: %s", message["TopicArn"])
return False
# Construct the canonical message
if message["Type"] == "Notification":
signing_keys = SIGNING_KEYS_NOTIFICATION
elif message["Type"] in ("SubscriptionConfirmation", "UnsubscribeConfirmation"):
signing_keys = SIGNING_KEYS_SUBSCRIPTION
else:
logger.warning("Invalid message type: %s", message["Type"])
return False
canonical_message = "".join(f"{key}\n{message[key]}\n" for key in signing_keys if key in message).encode()
# Check if SigningCertURL is a valid SNS URL
signing_cert_url = message["SigningCertURL"]
parsed_url = urlparse(signing_cert_url)
if (
parsed_url.scheme != "https"
or not HOST_PATTERN.match(parsed_url.netloc)
or not parsed_url.path.endswith(".pem")
):
logger.warning("Invalid SigningCertURL: %s", signing_cert_url)
return False
# Fetch the certificate
try:
response = requests.get(signing_cert_url, timeout=5)
response.raise_for_status()
certificate_bytes = response.content
except requests.RequestException as e:
logger.warning("Failed to fetch the certificate from %s: %s", signing_cert_url, e)
return False
# Verify the certificate issuer
certificate = load_pem_x509_certificate(certificate_bytes)
if certificate.issuer.get_attributes_for_oid(NameOID.ORGANIZATION_NAME)[0].value != "Amazon":
logger.warning("Invalid certificate issuer: %s", certificate.issuer)
return False
# Verify the signature
signature = b64decode(message["Signature"])
if message["SignatureVersion"] == "1":
hash_algorithm = SHA1()
elif message["SignatureVersion"] == "2":
hash_algorithm = SHA256()
else:
logger.warning("Invalid SignatureVersion: %s", message["SignatureVersion"])
return False
try:
certificate.public_key().verify(signature, canonical_message, PKCS1v15(), hash_algorithm)
except InvalidSignature:
logger.warning("Invalid signature")
return False
return True

View file

@ -867,6 +867,7 @@ if FEATURE_EMAIL_INTEGRATION_ENABLED:
INBOUND_EMAIL_ESP = os.getenv("INBOUND_EMAIL_ESP")
INBOUND_EMAIL_DOMAIN = os.getenv("INBOUND_EMAIL_DOMAIN")
INBOUND_EMAIL_WEBHOOK_SECRET = os.getenv("INBOUND_EMAIL_WEBHOOK_SECRET")
INBOUND_EMAIL_AMAZON_SNS_TOPIC_ARN = os.getenv("INBOUND_EMAIL_AMAZON_SNS_TOPIC_ARN")
INSTALLED_ONCALL_INTEGRATIONS = [
# Featured