oncall-engine/engine/apps/phone_notifications/phone_backend.py
Yulya Artyukhina 35ddfab0e4
Add method to send notification bundle by SMS (#4624)
# What this PR does
Adds method to render and send notification bundle by sms.

Example of SMS message:
```
Grafana OnCall: Alert groups #1, #2, #3 and 2 more 
from stack: TestOrganization, 
integrations: Grafana Alerting and 1 more.
```

Should be merged with https://github.com/grafana/oncall/pull/4457

## Which issue(s) this PR closes
https://github.com/grafana/oncall-private/issues/2713

## Checklist

- [x] Unit, integration, and e2e (if applicable) tests updated
- [x] Documentation added (or `pr:no public docs` PR label added if not
required)
- [x] Added the relevant release notes label (see labels prefixed w/
`release:`). These labels dictate how your PR will
    show up in the autogenerated release notes.
2024-07-16 14:20:16 +00:00

480 lines
21 KiB
Python

import logging
from typing import Optional, Tuple
import requests
from django.conf import settings
from apps.alerts.incident_appearance.renderers.phone_call_renderer import AlertGroupPhoneCallRenderer
from apps.alerts.incident_appearance.renderers.sms_renderer import AlertGroupSMSBundleRenderer, AlertGroupSmsRenderer
from apps.alerts.signals import user_notification_action_triggered_signal
from apps.base.utils import live_settings
from common.api_helpers.utils import create_engine_url
from common.custom_celery_tasks import shared_dedicated_queue_retry_task
from common.utils import clean_markup
from .exceptions import (
CallsLimitExceeded,
FailedToMakeCall,
FailedToSendSMS,
NumberAlreadyVerified,
NumberNotVerified,
ProviderNotSupports,
SMSLimitExceeded,
)
from .models import PhoneCallRecord, ProviderPhoneCall, ProviderSMS, SMSRecord
from .models.banned_phone_number import check_banned_phone_number
from .phone_provider import PhoneProvider, get_phone_provider
logger = logging.getLogger(__name__)
@shared_dedicated_queue_retry_task(
autoretry_for=(Exception,), retry_backoff=True, max_retries=0 if settings.DEBUG else None
)
def notify_by_sms_bundle_async_task(user_id, bundle_uuid):
from apps.user_management.models import User
user = User.objects.filter(id=user_id).first()
if not user:
return
phone_backend = PhoneBackend()
phone_backend.notify_by_sms_bundle(user, bundle_uuid)
class PhoneBackend:
def __init__(self):
self.phone_provider: PhoneProvider = self._get_phone_provider()
def _get_phone_provider(self) -> PhoneProvider:
# wrapper to simplify mocking
return get_phone_provider()
def notify_by_call(self, user, alert_group, notification_policy):
"""
notify_by_call makes a notification call to a user using configured phone provider or cloud notifications.
It handles all business logic related to the call.
"""
from apps.base.models import UserNotificationPolicyLogRecord
log_record_error_code = None
renderer = AlertGroupPhoneCallRenderer(alert_group)
message = renderer.render()
record = PhoneCallRecord(
represents_alert_group=alert_group,
receiver=user,
notification_policy=notification_policy,
exceeded_limit=False,
)
try:
if live_settings.GRAFANA_CLOUD_NOTIFICATIONS_ENABLED and settings.IS_OPEN_SOURCE:
self._notify_by_cloud_call(user, message)
record.save()
else:
provider_call = self._notify_by_provider_call(user, message)
# it is important that record is saved here, so it is possible to execute link_and_save
record.save()
if provider_call:
provider_call.link_and_save(record)
except FailedToMakeCall:
log_record_error_code = UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_NOT_ABLE_TO_CALL
except ProviderNotSupports:
log_record_error_code = UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_NOT_ABLE_TO_CALL
except CallsLimitExceeded:
record.exceeded_limit = True
record.save()
log_record_error_code = UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_PHONE_CALLS_LIMIT_EXCEEDED
except NumberNotVerified:
log_record_error_code = UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_PHONE_NUMBER_IS_NOT_VERIFIED
if log_record_error_code is not None:
log_record = UserNotificationPolicyLogRecord(
author=user,
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED,
notification_policy=notification_policy,
alert_group=alert_group,
notification_error_code=log_record_error_code,
notification_step=notification_policy.step if notification_policy else None,
notification_channel=notification_policy.notify_by if notification_policy else None,
)
log_record.save()
user_notification_action_triggered_signal.send(sender=PhoneBackend.notify_by_call, log_record=log_record)
def _notify_by_provider_call(self, user, message) -> Optional[ProviderPhoneCall]:
"""
_notify_by_provider_call makes a notification call using configured phone provider.
"""
if not self._validate_user_number(user):
raise NumberNotVerified
calls_left = self._validate_phone_calls_left(user)
if calls_left <= 0:
raise CallsLimitExceeded
elif calls_left < 3:
message = self._add_call_limit_warning(calls_left, message)
return self.phone_provider.make_notification_call(user.verified_phone_number, message)
def _notify_by_cloud_call(self, user, message):
"""
_notify_by_cloud_call makes a call using connected Grafana Cloud Instance.
This method should be used only in OSS instances.
"""
url = create_engine_url("api/v1/make_call", override_base=settings.GRAFANA_CLOUD_ONCALL_API_URL)
auth = {"Authorization": live_settings.GRAFANA_CLOUD_ONCALL_TOKEN}
data = {
"email": user.email,
"message": message,
}
try:
response = requests.post(url, headers=auth, data=data, timeout=5)
except requests.exceptions.RequestException as e:
logger.error(f"PhoneBackend._notify_by_cloud_call: request exception {str(e)}")
raise FailedToMakeCall
if response.status_code == 200:
logger.info("PhoneBackend._notify_by_cloud_call: OK")
elif response.status_code == 400 and response.json().get("error") == "limit-exceeded":
logger.info("PhoneBackend._notify_by_cloud_call: phone calls limit exceeded")
raise CallsLimitExceeded
elif response.status_code == 400 and response.json().get("error") == "number-not-verified":
logger.info("PhoneBackend._notify_by_cloud_call: cloud number not verified")
raise NumberNotVerified
elif response.status_code == 404:
logger.info(f"PhoneBackend._notify_by_cloud_call: user not found id={user.id} email={user.email}")
raise FailedToMakeCall
else:
logger.error(f"PhoneBackend._notify_by_cloud_call: unexpected response code {response.status_code}")
raise FailedToMakeCall
def _add_call_limit_warning(self, calls_left, message):
return f"{message} {calls_left} phone calls left. Contact your admin."
def _validate_phone_calls_left(self, user) -> int:
return user.organization.phone_calls_left(user)
def notify_by_sms(self, user, alert_group, notification_policy):
"""
notify_by_sms sends a notification sms to a user using configured phone provider.
It handles business logic - limits, cloud notifications and UserNotificationPolicyLogRecord creation
SMS itself is handled by phone provider.
"""
from apps.base.models import UserNotificationPolicyLogRecord
renderer = AlertGroupSmsRenderer(alert_group)
message = renderer.render()
_, log_record_error_code = self._send_sms(
user=user,
alert_group=alert_group,
notification_policy=notification_policy,
message=message,
)
if log_record_error_code is not None:
log_record = UserNotificationPolicyLogRecord(
author=user,
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED,
notification_policy=notification_policy,
alert_group=alert_group,
notification_error_code=log_record_error_code,
notification_step=notification_policy.step if notification_policy else None,
notification_channel=notification_policy.notify_by if notification_policy else None,
)
log_record.save()
user_notification_action_triggered_signal.send(sender=PhoneBackend.notify_by_sms, log_record=log_record)
@staticmethod
def notify_by_sms_bundle_async(user, bundle_uuid):
notify_by_sms_bundle_async_task.apply_async((user.id, bundle_uuid))
def notify_by_sms_bundle(self, user, bundle_uuid):
"""
notify_by_sms_bundle sends an sms notification bundle to a user using configured phone provider.
It handles business logic - limits, cloud notifications and UserNotificationPolicyLogRecord creation.
It creates UserNotificationPolicyLogRecord for every notification in bundle, but only one SMSRecord.
SMS itself is handled by phone provider.
"""
from apps.alerts.models import BundledNotification
from apps.base.models import UserNotificationPolicy, UserNotificationPolicyLogRecord
notifications = BundledNotification.objects.filter(bundle_uuid=bundle_uuid).select_related("alert_group")
if not notifications:
logger.info("Notification bundle is empty, related alert groups might have been deleted")
return
renderer = AlertGroupSMSBundleRenderer(notifications)
message = renderer.render()
_, log_record_error_code = self._send_sms(user=user, message=message, bundle_uuid=bundle_uuid)
if log_record_error_code is not None:
log_records_to_create = []
for notification in notifications:
log_record = UserNotificationPolicyLogRecord(
author=user,
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED,
notification_policy=notification.notification_policy,
alert_group=notification.alert_group,
notification_error_code=log_record_error_code,
notification_step=UserNotificationPolicy.Step.NOTIFY,
notification_channel=UserNotificationPolicy.NotificationChannel.SMS,
)
log_records_to_create.append(log_record)
if log_records_to_create:
if log_record_error_code in UserNotificationPolicyLogRecord.ERRORS_TO_SEND_IN_SLACK_CHANNEL:
# create last log record outside of the bulk_create to get it as an object to send
# the user_notification_action_triggered_signal
log_record = log_records_to_create.pop()
log_record.save()
user_notification_action_triggered_signal.send(
sender=PhoneBackend.notify_by_sms_bundle, log_record=log_record
)
UserNotificationPolicyLogRecord.objects.bulk_create(log_records_to_create, batch_size=5000)
def _send_sms(
self, user, message, alert_group=None, notification_policy=None, bundle_uuid=None
) -> Tuple[bool, Optional[int]]:
from apps.base.models import UserNotificationPolicyLogRecord
log_record_error_code = None
record = SMSRecord(
represents_alert_group=alert_group,
receiver=user,
notification_policy=notification_policy,
exceeded_limit=False,
represents_bundle_uuid=bundle_uuid,
)
try:
if live_settings.GRAFANA_CLOUD_NOTIFICATIONS_ENABLED and settings.IS_OPEN_SOURCE:
self._notify_by_cloud_sms(user, message)
record.save()
else:
provider_sms = self._notify_by_provider_sms(user, message)
record.save()
if provider_sms:
provider_sms.link_and_save(record)
except FailedToSendSMS:
log_record_error_code = UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_NOT_ABLE_TO_SEND_SMS
except ProviderNotSupports:
log_record_error_code = UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_NOT_ABLE_TO_SEND_SMS
except SMSLimitExceeded:
record.exceeded_limit = True
record.save()
log_record_error_code = UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_SMS_LIMIT_EXCEEDED
except NumberNotVerified:
log_record_error_code = UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_PHONE_NUMBER_IS_NOT_VERIFIED
return log_record_error_code is None, log_record_error_code
def _notify_by_provider_sms(self, user, message) -> Optional[ProviderSMS]:
"""
_notify_by_provider_sms sends a notification sms using configured phone provider.
"""
if not self._validate_user_number(user):
raise NumberNotVerified
sms_left = self._validate_sms_left(user)
if sms_left <= 0:
raise SMSLimitExceeded
elif sms_left < 3:
message = self._add_sms_limit_warning(sms_left, message)
return self.phone_provider.send_notification_sms(user.verified_phone_number, message)
def _notify_by_cloud_sms(self, user, message):
"""
_notify_by_cloud_sms sends a sms using connected Grafana Cloud Instance.
This method is used only in OSS instances.
"""
url = create_engine_url("api/v1/send_sms", override_base=settings.GRAFANA_CLOUD_ONCALL_API_URL)
auth = {"Authorization": live_settings.GRAFANA_CLOUD_ONCALL_TOKEN}
data = {
"email": user.email,
"message": message,
}
try:
response = requests.post(url, headers=auth, data=data, timeout=5)
except requests.exceptions.RequestException as e:
logger.error(f"Unable to send SMS through cloud. Request exception {str(e)}")
raise FailedToSendSMS
if response.status_code == 200:
logger.info("Sent cloud sms successfully")
elif response.status_code == 400 and response.json().get("error") == "limit-exceeded":
raise SMSLimitExceeded
elif response.status_code == 400 and response.json().get("error") == "number-not-verified":
raise NumberNotVerified
elif response.status_code == 404:
# user not found
raise FailedToSendSMS
else:
raise FailedToSendSMS
def _validate_sms_left(self, user) -> int:
return user.organization.sms_left(user)
def _add_sms_limit_warning(self, calls_left, message):
return f"{message} {calls_left} sms left. Contact your admin."
def _validate_user_number(self, user):
return user.verified_phone_number is not None
# relay calls/sms from oss related code
def relay_oss_call(self, user, message):
"""
relay_oss_call make phone call received from oss instance.
Caller should handle exceptions raised by phone_provider.make_call.
The difference between relay_oss_call and notify_by_call is that relay_oss_call uses phone_provider.make_call
to only make call, not track status, gather digits or create logs.
"""
if not self._validate_user_number(user):
raise NumberNotVerified
calls_left = self._validate_phone_calls_left(user)
if calls_left <= 0:
PhoneCallRecord.objects.create(
receiver=user,
exceeded_limit=True,
grafana_cloud_notification=True,
)
raise CallsLimitExceeded
elif calls_left < 3:
message = self._add_call_limit_warning(calls_left, message)
# additional cleaning, since message come from api call and wasn't cleaned by our renderer
message = clean_markup(message).replace('"', "")
self.phone_provider.make_call(user.verified_phone_number, message)
# create PhoneCallRecord to track limits for calls from oss instances
PhoneCallRecord.objects.create(
receiver=user,
exceeded_limit=False,
grafana_cloud_notification=True,
)
def relay_oss_sms(self, user, message):
"""
relay_oss_sms send sms received from oss instance.
Caller should handle exceptions raised by phone_provider.send_sms.
The difference between relay_oss_sms and notify_by_sms is that relay_oss_call uses phone_provider.make_call
to only send, not track status or create logs.
"""
if not self._validate_user_number(user):
raise NumberNotVerified
sms_left = self._validate_sms_left(user)
if sms_left <= 0:
SMSRecord.objects.create(
receiver=user,
exceeded_limit=True,
grafana_cloud_notification=True,
)
raise SMSLimitExceeded
elif sms_left < 3:
message = self._add_sms_limit_warning(sms_left, message)
self.phone_provider.send_sms(user.verified_phone_number, message)
SMSRecord.objects.create(
receiver=user,
exceeded_limit=False,
grafana_cloud_notification=True,
)
# Number verification related code
def send_verification_sms(self, user):
"""
send_verification_sms sends a verification code to a user.
Caller should handle exceptions raised by phone_provider.send_verification_sms.
"""
logger.info(f"PhoneBackend.send_verification_sms: start verification for user {user.id}")
if self._validate_user_number(user):
logger.info(f"PhoneBackend.send_verification_sms: number already verified for user {user.id}")
raise NumberAlreadyVerified
check_banned_phone_number(user.unverified_phone_number)
self.phone_provider.send_verification_sms(user.unverified_phone_number)
def make_verification_call(self, user):
"""
make_verification_call makes a verification call to a user.
Caller should handle exceptions raised by phone_provider.make_verification_call
"""
logger.info(f"PhoneBackend.make_verification_call: start verification user_id={user.id}")
if self._validate_user_number(user):
logger.info(f"PhoneBackend.make_verification_call: number already verified user_id={user.id}")
raise NumberAlreadyVerified
check_banned_phone_number(user.unverified_phone_number)
self.phone_provider.make_verification_call(user.unverified_phone_number)
def verify_phone_number(self, user, code) -> bool:
prev_number = user.verified_phone_number
new_number = self.phone_provider.finish_verification(user.unverified_phone_number, code)
if new_number:
user.save_verified_phone_number(new_number)
# TODO: move this to async task
if prev_number:
self._notify_disconnected_number(user, prev_number)
self._notify_connected_number(user)
logger.info(f"PhoneBackend.verify_phone_number: verified user_id={user.id}")
return True
else:
logger.info(f"PhoneBackend.verify_phone_number: verification failed user_id={user.id}")
return False
def forget_number(self, user) -> bool:
prev_number = user.verified_phone_number
user.clear_phone_numbers()
if prev_number:
self._notify_disconnected_number(user, prev_number)
return True
return False
def make_test_call(self, user):
"""
make_test_call makes a test call to user's verified phone number
Caller should handle exceptions raised by phone_provider.make_call.
"""
text = "It is a test call from Grafana OnCall"
if not user.verified_phone_number:
raise NumberNotVerified
self.phone_provider.make_call(user.verified_phone_number, text)
def send_test_sms(self, user):
"""
send_test_sms sends a test sms to user's verified phone number
Caller should handle exceptions raised by phone_provider.send_sms.
"""
text = "It is a test sms from Grafana OnCall"
if not user.verified_phone_number:
raise NumberNotVerified
self.phone_provider.send_sms(user.verified_phone_number, text)
def _notify_connected_number(self, user):
text = (
f"This phone number has been connected to Grafana OnCall team "
f'"{user.organization.stack_slug}"\nYour Grafana OnCall <3'
)
try:
if not user.verified_phone_number:
logger.error("PhoneBackend._notify_connected_number: number not verified")
return
self.phone_provider.send_sms(user.verified_phone_number, text)
except FailedToSendSMS:
logger.error("PhoneBackend._notify_connected_number: failed")
except ProviderNotSupports:
logger.info("PhoneBackend._notify_connected_number: provider not supports sms")
def _notify_disconnected_number(self, user, number):
text = (
f"This phone number has been disconnected from Grafana OnCall team "
f'"{user.organization.stack_slug}"\nYour Grafana OnCall <3'
)
try:
self.phone_provider.send_sms(number, text)
except FailedToSendSMS:
logger.error("PhoneBackend._notify_disconnected_number: failed")
except ProviderNotSupports:
logger.info("PhoneBackend._notify_disconnected_number: provider not supports sms")