refactor apps.mobile_app.tasks into individual task files (#2888)

# What this PR does

Closes #2722 

## Checklist

- [x] Unit, integration, and e2e (if applicable) tests updated
- [x] Documentation added (or `pr:no public docs` PR label added if not
required)
- [x] `CHANGELOG.md` updated (or `pr:no changelog` PR label added if not
required)
This commit is contained in:
Joey Orlando 2023-08-29 11:34:09 +02:00 committed by GitHub
parent fee0a90e56
commit cba4ccb8a9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 996 additions and 889 deletions

View file

@ -3,7 +3,7 @@ import json
from django.conf import settings
from apps.base.messaging import BaseMessagingBackend
from apps.mobile_app.tasks import notify_user_async
from apps.mobile_app.tasks.new_alert_group import notify_user_async
class MobileAppBackend(BaseMessagingBackend):

View file

@ -1,4 +1,5 @@
import json
import logging
import random
import string
import typing
@ -6,13 +7,16 @@ import typing
from firebase_admin.messaging import APNSPayload, Aps, ApsAlert, CriticalSound, Message
from apps.mobile_app.exceptions import DeviceNotSet
from apps.mobile_app.tasks import _construct_fcm_message, _send_push_notification, logger
from apps.mobile_app.types import FCMMessageData, MessageType, Platform
from apps.mobile_app.utils import construct_fcm_message, send_push_notification
from apps.user_management.models import User
if typing.TYPE_CHECKING:
from apps.mobile_app.models import FCMDevice
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
def send_test_push(user, critical=False):
from apps.mobile_app.models import FCMDevice
@ -22,11 +26,11 @@ def send_test_push(user, critical=False):
logger.info(f"send_test_push: fcm_device not found user_id={user.id}")
raise DeviceNotSet
message = _get_test_escalation_fcm_message(user, device_to_notify, critical)
_send_push_notification(device_to_notify, message)
send_push_notification(device_to_notify, message)
def _get_test_escalation_fcm_message(user: User, device_to_notify: "FCMDevice", critical: bool) -> Message:
# TODO: this method is copied from _get_alert_group_escalation_fcm_message
# TODO: this method is copied from apps.mobile_app.tasks.new_alert_group._get_fcm_message
# to have same notification/sound/overrideDND logic. Ideally this logic should be abstracted, not repeated.
from apps.mobile_app.models import MobileAppUserSettings
@ -80,7 +84,7 @@ def _get_test_escalation_fcm_message(user: User, device_to_notify: "FCMDevice",
),
)
return _construct_fcm_message(message_type, device_to_notify, thread_id, fcm_message_data, apns_payload)
return construct_fcm_message(message_type, device_to_notify, thread_id, fcm_message_data, apns_payload)
def get_test_push_title(critical: bool) -> str:

View file

@ -1,659 +0,0 @@
import datetime
import json
import logging
import math
import typing
import humanize
import pytz
import requests
from celery.utils.log import get_task_logger
from django.conf import settings
from django.core.cache import cache
from django.utils import timezone
from firebase_admin.exceptions import FirebaseError
from firebase_admin.messaging import AndroidConfig, APNSConfig, APNSPayload, Aps, ApsAlert, CriticalSound, Message
from requests import HTTPError
from rest_framework import status
from apps.alerts.models import AlertGroup
from apps.base.utils import live_settings
from apps.mobile_app.alert_rendering import get_push_notification_subtitle
from apps.mobile_app.types import FCMMessageData, MessageType, Platform
from apps.schedules.models import ShiftSwapRequest
from apps.schedules.models.on_call_schedule import OnCallSchedule, ScheduleEvent
from apps.user_management.models import User
from common.api_helpers.utils import create_engine_url
from common.custom_celery_tasks import shared_dedicated_queue_retry_task
from common.l10n import format_localized_datetime, format_localized_time
if typing.TYPE_CHECKING:
from apps.mobile_app.models import FCMDevice, MobileAppUserSettings
MAX_RETRIES = 1 if settings.DEBUG else 10
logger = get_task_logger(__name__)
logger.setLevel(logging.DEBUG)
def send_push_notification_to_fcm_relay(message: Message) -> requests.Response:
"""
Send push notification to FCM relay on cloud instance: apps.mobile_app.fcm_relay.FCMRelayView
"""
url = create_engine_url("mobile_app/v1/fcm_relay", override_base=settings.GRAFANA_CLOUD_ONCALL_API_URL)
response = requests.post(
url, headers={"Authorization": live_settings.GRAFANA_CLOUD_ONCALL_TOKEN}, json=json.loads(str(message))
)
response.raise_for_status()
return response
def _send_push_notification(
device_to_notify: "FCMDevice", message: Message, error_cb: typing.Optional[typing.Callable[..., None]] = None
) -> None:
logger.debug(f"Sending push notification to device type {device_to_notify.type} with message: {message}")
def _error_cb():
if error_cb:
error_cb()
if settings.IS_OPEN_SOURCE:
# FCM relay uses cloud connection to send push notifications
from apps.oss_installation.models import CloudConnector
if not CloudConnector.objects.exists():
_error_cb()
logger.error("Error while sending a mobile push notification: not connected to cloud")
return
try:
response = send_push_notification_to_fcm_relay(message)
logger.debug(f"FCM relay response: {response}")
except HTTPError as e:
if status.HTTP_400_BAD_REQUEST <= e.response.status_code < status.HTTP_500_INTERNAL_SERVER_ERROR:
# do not retry on HTTP client errors (4xx errors)
_error_cb()
logger.error(
f"Error while sending a mobile push notification: HTTP client error {e.response.status_code}"
)
return
else:
raise
else:
# https://firebase.google.com/docs/cloud-messaging/http-server-ref#interpret-downstream
response = device_to_notify.send_message(message)
logger.debug(f"FCM response: {response}")
if isinstance(response, FirebaseError):
raise response
def _construct_fcm_message(
message_type: MessageType,
device_to_notify: "FCMDevice",
thread_id: str,
data: FCMMessageData,
apns_payload: typing.Optional[APNSPayload] = None,
) -> Message:
apns_config_kwargs = {}
if apns_payload is not None:
apns_config_kwargs["payload"] = apns_payload
return Message(
token=device_to_notify.registration_id,
data={
# from the docs..
# A dictionary of data fields (optional). All keys and values in the dictionary must be strings
**data,
"type": message_type,
"thread_id": thread_id,
},
android=AndroidConfig(
# from the docs
# https://firebase.google.com/docs/cloud-messaging/concept-options#setting-the-priority-of-a-message
#
# Normal priority.
# Normal priority messages are delivered immediately when the app is in the foreground.
# For backgrounded apps, delivery may be delayed. For less time-sensitive messages, such as notifications
# of new email, keeping your UI in sync, or syncing app data in the background, choose normal delivery
# priority.
#
# High priority.
# FCM attempts to deliver high priority messages immediately even if the device is in Doze mode.
# High priority messages are for time-sensitive, user visible content.
priority="high",
),
apns=APNSConfig(
**apns_config_kwargs,
headers={
# From the docs
# https://firebase.google.com/docs/cloud-messaging/concept-options#setting-the-priority-of-a-message
"apns-priority": "10",
},
),
)
def _get_alert_group_escalation_fcm_message(
alert_group: AlertGroup, user: User, device_to_notify: "FCMDevice", critical: bool
) -> Message:
# avoid circular import
from apps.mobile_app.models import MobileAppUserSettings
thread_id = f"{alert_group.channel.organization.public_primary_key}:{alert_group.public_primary_key}"
alert_title = "New Important Alert" if critical else "New Alert"
alert_subtitle = get_push_notification_subtitle(alert_group)
mobile_app_user_settings, _ = MobileAppUserSettings.objects.get_or_create(user=user)
# critical defines the type of notification.
# we use overrideDND to establish if the notification should sound even if DND is on
overrideDND = critical and mobile_app_user_settings.important_notification_override_dnd
# APNS only allows to specify volume for critical notifications
apns_volume = mobile_app_user_settings.important_notification_volume if critical else None
message_type = MessageType.IMPORTANT if critical else MessageType.DEFAULT
apns_sound_name = mobile_app_user_settings.get_notification_sound_name(message_type, Platform.IOS)
fcm_message_data: FCMMessageData = {
"title": alert_title,
"subtitle": alert_subtitle,
"orgId": alert_group.channel.organization.public_primary_key,
"orgName": alert_group.channel.organization.stack_slug,
"alertGroupId": alert_group.public_primary_key,
# alert_group.status is an int so it must be casted...
"status": str(alert_group.status),
# Pass user settings, so the Android app can use them to play the correct sound and volume
"default_notification_sound_name": mobile_app_user_settings.get_notification_sound_name(
MessageType.DEFAULT, Platform.ANDROID
),
"default_notification_volume_type": mobile_app_user_settings.default_notification_volume_type,
"default_notification_volume": str(mobile_app_user_settings.default_notification_volume),
"default_notification_volume_override": json.dumps(
mobile_app_user_settings.default_notification_volume_override
),
"important_notification_sound_name": mobile_app_user_settings.get_notification_sound_name(
MessageType.IMPORTANT, Platform.ANDROID
),
"important_notification_volume_type": mobile_app_user_settings.important_notification_volume_type,
"important_notification_volume": str(mobile_app_user_settings.important_notification_volume),
"important_notification_volume_override": json.dumps(
mobile_app_user_settings.important_notification_volume_override
),
"important_notification_override_dnd": json.dumps(mobile_app_user_settings.important_notification_override_dnd),
}
number_of_alerts = alert_group.alerts.count()
apns_payload = APNSPayload(
aps=Aps(
thread_id=thread_id,
badge=number_of_alerts,
alert=ApsAlert(title=alert_title, subtitle=alert_subtitle),
sound=CriticalSound(
# The notification shouldn't be critical if the user has disabled "override DND" setting
critical=overrideDND,
name=apns_sound_name,
volume=apns_volume,
),
custom_data={
"interruption-level": "critical" if overrideDND else "time-sensitive",
},
),
)
return _construct_fcm_message(message_type, device_to_notify, thread_id, fcm_message_data, apns_payload)
def _get_youre_going_oncall_notification_title(seconds_until_going_oncall: int) -> str:
return f"Your on-call shift starts in {humanize.naturaldelta(seconds_until_going_oncall)}"
def _get_youre_going_oncall_notification_subtitle(
schedule: OnCallSchedule,
schedule_event: ScheduleEvent,
mobile_app_user_settings: "MobileAppUserSettings",
) -> str:
shift_start = schedule_event["start"]
shift_end = schedule_event["end"]
shift_starts_and_ends_on_same_day = shift_start.date() == shift_end.date()
dt_formatter_func = format_localized_time if shift_starts_and_ends_on_same_day else format_localized_datetime
def _format_datetime(dt: datetime.datetime) -> str:
"""
1. Convert the shift datetime to the user's mobile device's timezone
2. Display the timezone aware datetime as a formatted string that is based on the user's configured mobile
app locale, otherwise fallback to "en"
"""
localized_dt = dt.astimezone(pytz.timezone(mobile_app_user_settings.time_zone))
return dt_formatter_func(localized_dt, mobile_app_user_settings.locale)
formatted_shift = f"{_format_datetime(shift_start)} - {_format_datetime(shift_end)}"
return f"{formatted_shift}\nSchedule {schedule.name}"
def _get_youre_going_oncall_fcm_message(
user: User,
schedule: OnCallSchedule,
device_to_notify: "FCMDevice",
seconds_until_going_oncall: int,
schedule_event: ScheduleEvent,
) -> Message:
# avoid circular import
from apps.mobile_app.models import MobileAppUserSettings
thread_id = f"{schedule.public_primary_key}:{user.public_primary_key}:going-oncall"
mobile_app_user_settings, _ = MobileAppUserSettings.objects.get_or_create(user=user)
notification_title = _get_youre_going_oncall_notification_title(seconds_until_going_oncall)
notification_subtitle = _get_youre_going_oncall_notification_subtitle(
schedule, schedule_event, mobile_app_user_settings
)
data: FCMMessageData = {
"title": notification_title,
"subtitle": notification_subtitle,
"info_notification_sound_name": mobile_app_user_settings.get_notification_sound_name(
MessageType.INFO, Platform.ANDROID
),
"info_notification_volume_type": mobile_app_user_settings.info_notification_volume_type,
"info_notification_volume": str(mobile_app_user_settings.info_notification_volume),
"info_notification_volume_override": json.dumps(mobile_app_user_settings.info_notification_volume_override),
}
apns_payload = APNSPayload(
aps=Aps(
thread_id=thread_id,
alert=ApsAlert(title=notification_title, subtitle=notification_subtitle),
sound=CriticalSound(
critical=False,
name=mobile_app_user_settings.get_notification_sound_name(MessageType.INFO, Platform.IOS),
),
custom_data={
"interruption-level": "time-sensitive",
},
),
)
return _construct_fcm_message(MessageType.INFO, device_to_notify, thread_id, data, apns_payload)
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=MAX_RETRIES)
def notify_user_async(user_pk, alert_group_pk, notification_policy_pk, critical):
# avoid circular import
from apps.base.models import UserNotificationPolicy, UserNotificationPolicyLogRecord
from apps.mobile_app.models import FCMDevice
try:
user = User.objects.get(pk=user_pk)
except User.DoesNotExist:
logger.warning(f"User {user_pk} does not exist")
return
try:
alert_group = AlertGroup.objects.get(pk=alert_group_pk)
except AlertGroup.DoesNotExist:
logger.warning(f"Alert group {alert_group_pk} does not exist")
return
try:
notification_policy = UserNotificationPolicy.objects.get(pk=notification_policy_pk)
except UserNotificationPolicy.DoesNotExist:
logger.warning(f"User notification policy {notification_policy_pk} does not exist")
return
def _create_error_log_record():
"""
Utility method to create a UserNotificationPolicyLogRecord with error
"""
UserNotificationPolicyLogRecord.objects.create(
author=user,
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED,
notification_policy=notification_policy,
alert_group=alert_group,
reason="Mobile push notification error",
notification_step=notification_policy.step,
notification_channel=notification_policy.notify_by,
)
device_to_notify = FCMDevice.get_active_device_for_user(user)
# create an error log in case user has no devices set up
if not device_to_notify:
_create_error_log_record()
logger.error(f"Error while sending a mobile push notification: user {user_pk} has no device set up")
return
message = _get_alert_group_escalation_fcm_message(alert_group, user, device_to_notify, critical)
_send_push_notification(device_to_notify, message, _create_error_log_record)
def _shift_starts_within_range(
timing_window_lower: int, timing_window_upper: int, seconds_until_shift_starts: int
) -> bool:
return timing_window_lower <= seconds_until_shift_starts <= timing_window_upper
def should_we_send_going_oncall_push_notification(
now: datetime.datetime, user_settings: "MobileAppUserSettings", schedule_event: ScheduleEvent
) -> typing.Optional[int]:
"""
If the user should be set a "you're going oncall" push notification, return the number of seconds
until they will be going oncall.
If no notification should be sent, return None.
Currently we will send notifications for the following scenarios:
- schedule is starting in user's "configured notification timing preference" +/- a 4 minute buffer
- schedule is starting within the next fifteen minutes
Returns `None` if conditions are not met for the user to receive a push notification. Otherwise returns
an `int` which represents the # of seconds until the oncall shift starts.
"""
NOTIFICATION_TIMING_BUFFER = 7 * 60 # 7 minutes in seconds
FIFTEEN_MINUTES_IN_SECONDS = 15 * 60
# this _should_ always be positive since final_events is returning only events in the future
seconds_until_shift_starts = math.floor((schedule_event["start"] - now).total_seconds())
user_wants_to_receive_info_notifications = user_settings.info_notifications_enabled
# int representing num of seconds before the shift starts that the user wants to be notified
user_notification_timing_preference = user_settings.going_oncall_notification_timing
if not user_wants_to_receive_info_notifications:
logger.info("not sending going oncall push notification because info_notifications_enabled is false")
return None
# 14 minute window where the notification could be sent (7 mins before or 7 mins after)
timing_window_lower = user_notification_timing_preference - NOTIFICATION_TIMING_BUFFER
timing_window_upper = user_notification_timing_preference + NOTIFICATION_TIMING_BUFFER
shift_starts_within_users_notification_timing_preference = _shift_starts_within_range(
timing_window_lower, timing_window_upper, seconds_until_shift_starts
)
shift_starts_within_fifteen_minutes = _shift_starts_within_range(
0, FIFTEEN_MINUTES_IN_SECONDS, seconds_until_shift_starts
)
timing_logging_msg = (
f"seconds_until_shift_starts: {seconds_until_shift_starts}\n"
f"user_notification_timing_preference: {user_notification_timing_preference}\n"
f"timing_window_lower: {timing_window_lower}\n"
f"timing_window_upper: {timing_window_upper}\n"
f"shift_starts_within_users_notification_timing_preference: {shift_starts_within_users_notification_timing_preference}\n"
f"shift_starts_within_fifteen_minutes: {shift_starts_within_fifteen_minutes}"
)
# Temporary remove `shift_starts_within_users_notification_timing_preference` from condition to send notification only 15 minutes before the shift starts
# TODO: Return it once mobile app ready and default value is changed (https://github.com/grafana/oncall/issues/1999)
if shift_starts_within_fifteen_minutes:
logger.info(f"timing is right to send going oncall push notification\n{timing_logging_msg}")
return seconds_until_shift_starts
logger.info(f"timing is not right to send going oncall push notification\n{timing_logging_msg}")
return None
def _generate_going_oncall_push_notification_cache_key(user_pk: str, schedule_event: ScheduleEvent) -> str:
return f"going_oncall_push_notification:{user_pk}:{schedule_event['shift']['pk']}"
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=MAX_RETRIES)
def conditionally_send_going_oncall_push_notifications_for_schedule(schedule_pk) -> None:
# avoid circular import
from apps.mobile_app.models import FCMDevice, MobileAppUserSettings
PUSH_NOTIFICATION_TRACKING_CACHE_KEY_TTL = 60 * 60 # 60 minutes
user_cache: typing.Dict[str, User] = {}
device_cache: typing.Dict[str, "FCMDevice"] = {}
logger.info(f"Start calculate_going_oncall_push_notifications_for_schedule for schedule {schedule_pk}")
try:
schedule: OnCallSchedule = OnCallSchedule.objects.get(pk=schedule_pk)
except OnCallSchedule.DoesNotExist:
logger.info(f"Tried to notify user about going on-call for non-existing schedule {schedule_pk}")
return
now = timezone.now()
datetime_end = now + datetime.timedelta(days=7)
schedule_final_events = schedule.final_events(now, datetime_end)
relevant_cache_keys = [
_generate_going_oncall_push_notification_cache_key(user["pk"], schedule_event)
for schedule_event in schedule_final_events
for user in schedule_event["users"]
]
relevant_notifications_already_sent = cache.get_many(relevant_cache_keys)
for schedule_event in schedule_final_events:
users = schedule_event["users"]
for user in users:
user_pk = user["pk"]
user = user_cache.get(user_pk, None)
if user is None:
try:
user = User.objects.get(public_primary_key=user_pk)
user_cache[user_pk] = user
except User.DoesNotExist:
logger.warning(f"User {user_pk} does not exist")
continue
device_to_notify = device_cache.get(user_pk, None)
if device_to_notify is None:
device_to_notify = FCMDevice.get_active_device_for_user(user)
if not device_to_notify:
continue
else:
device_cache[user_pk] = device_to_notify
mobile_app_user_settings, _ = MobileAppUserSettings.objects.get_or_create(user=user)
cache_key = _generate_going_oncall_push_notification_cache_key(user_pk, schedule_event)
already_sent_this_push_notification = cache_key in relevant_notifications_already_sent
seconds_until_going_oncall = should_we_send_going_oncall_push_notification(
now, mobile_app_user_settings, schedule_event
)
if seconds_until_going_oncall is not None and not already_sent_this_push_notification:
message = _get_youre_going_oncall_fcm_message(
user, schedule, device_to_notify, seconds_until_going_oncall, schedule_event
)
_send_push_notification(device_to_notify, message)
cache.set(cache_key, True, PUSH_NOTIFICATION_TRACKING_CACHE_KEY_TTL)
else:
logger.info(
f"Skipping sending going oncall push notification for user {user_pk} and shift {schedule_event['shift']['pk']}. "
f"Already sent: {already_sent_this_push_notification}"
)
@shared_dedicated_queue_retry_task()
def conditionally_send_going_oncall_push_notifications_for_all_schedules() -> None:
for schedule in OnCallSchedule.objects.all():
conditionally_send_going_oncall_push_notifications_for_schedule.apply_async((schedule.pk,))
# TODO: break down tasks.py into multiple files
@shared_dedicated_queue_retry_task()
def notify_shift_swap_requests() -> None:
"""
A periodic task that notifies users about shift swap requests.
"""
for shift_swap_request, timeout in _get_shift_swap_requests_to_notify(timezone.now()):
notify_shift_swap_request.delay(shift_swap_request.pk, timeout)
def _get_shift_swap_requests_to_notify(now: datetime.datetime) -> list[tuple[ShiftSwapRequest, int]]:
"""
Returns shifts swap requests that are open and are in the notification window.
This method can return the same shift swap request multiple times while it's in the notification window,
but users are only notified once per shift swap request (see _mark_shift_swap_request_notified_for_user).
"""
shift_swap_requests_in_notification_window = []
for shift_swap_request in ShiftSwapRequest.objects.get_open_requests(now):
for idx, offset in enumerate(ShiftSwapRequest.FOLLOWUP_OFFSETS):
next_offset = (
ShiftSwapRequest.FOLLOWUP_OFFSETS[idx + 1]
if idx + 1 < len(ShiftSwapRequest.FOLLOWUP_OFFSETS)
else datetime.timedelta(0)
)
window = offset - next_offset - timezone.timedelta(microseconds=1) # check SSRs up to the next offset
notification_window_start = shift_swap_request.swap_start - offset
notification_window_end = notification_window_start + window
if notification_window_start <= now <= notification_window_end:
next_notification_dt = shift_swap_request.swap_start - next_offset
timeout = math.ceil((next_notification_dt - now).total_seconds()) # don't send notifications twice
shift_swap_requests_in_notification_window.append((shift_swap_request, timeout))
break
return shift_swap_requests_in_notification_window
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=MAX_RETRIES)
def notify_shift_swap_request(shift_swap_request_pk: int, timeout: int) -> None:
"""
Notify relevant users for an individual shift swap request.
"""
try:
shift_swap_request = ShiftSwapRequest.objects.get(pk=shift_swap_request_pk)
except ShiftSwapRequest.DoesNotExist:
logger.info(f"ShiftSwapRequest {shift_swap_request_pk} does not exist")
return
now = timezone.now()
for user in shift_swap_request.possible_benefactors:
if _should_notify_user_about_shift_swap_request(shift_swap_request, user, now):
notify_user_about_shift_swap_request.delay(shift_swap_request.pk, user.pk)
_mark_shift_swap_request_notified_for_user(shift_swap_request, user, timeout)
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=MAX_RETRIES)
def notify_user_about_shift_swap_request(shift_swap_request_pk: int, user_pk: int) -> None:
"""
Send a push notification about a shift swap request to an individual user.
"""
# avoid circular import
from apps.mobile_app.models import FCMDevice, MobileAppUserSettings
try:
shift_swap_request = ShiftSwapRequest.objects.get(pk=shift_swap_request_pk)
except ShiftSwapRequest.DoesNotExist:
logger.info(f"ShiftSwapRequest {shift_swap_request_pk} does not exist")
return
try:
user = User.objects.get(pk=user_pk)
except User.DoesNotExist:
logger.info(f"User {user_pk} does not exist")
return
device_to_notify = FCMDevice.get_active_device_for_user(user)
if not device_to_notify:
logger.info(f"FCMDevice does not exist for user {user_pk}")
return
try:
mobile_app_user_settings = MobileAppUserSettings.objects.get(user=user)
except MobileAppUserSettings.DoesNotExist:
logger.info(f"MobileAppUserSettings does not exist for user {user_pk}")
return
if not mobile_app_user_settings.info_notifications_enabled:
logger.info(f"Info notifications are not enabled for user {user_pk}")
return
if not shift_swap_request.is_open:
logger.info(f"Shift swap request {shift_swap_request_pk} is not open anymore")
return
message = _shift_swap_request_fcm_message(shift_swap_request, user, device_to_notify, mobile_app_user_settings)
_send_push_notification(device_to_notify, message)
def _should_notify_user_about_shift_swap_request(
shift_swap_request: ShiftSwapRequest, user: User, now: datetime.datetime
) -> bool:
# avoid circular import
from apps.mobile_app.models import MobileAppUserSettings
try:
mobile_app_user_settings = MobileAppUserSettings.objects.get(user=user)
except MobileAppUserSettings.DoesNotExist:
return False # don't notify if the app is not configured
return user.is_in_working_hours( # user must be in working hours
now, mobile_app_user_settings.time_zone
) and not _has_user_been_notified_for_shift_swap_request( # don't notify twice
shift_swap_request, user
)
def _mark_shift_swap_request_notified_for_user(shift_swap_request: ShiftSwapRequest, user: User, timeout: int) -> None:
key = _shift_swap_request_cache_key(shift_swap_request, user)
cache.set(key, True, timeout=timeout)
def _has_user_been_notified_for_shift_swap_request(shift_swap_request: ShiftSwapRequest, user: User) -> bool:
key = _shift_swap_request_cache_key(shift_swap_request, user)
return cache.get(key) is True
def _shift_swap_request_cache_key(shift_swap_request: ShiftSwapRequest, user: User) -> str:
return f"ssr_push:{shift_swap_request.pk}:{user.pk}"
def _shift_swap_request_fcm_message(
shift_swap_request: ShiftSwapRequest,
user: User,
device_to_notify: "FCMDevice",
mobile_app_user_settings: "MobileAppUserSettings",
) -> Message:
thread_id = f"{shift_swap_request.public_primary_key}:{user.public_primary_key}:ssr"
notification_title = "New shift swap request"
beneficiary_name = shift_swap_request.beneficiary.name or shift_swap_request.beneficiary.username
notification_subtitle = f"{beneficiary_name}, {shift_swap_request.schedule.name}"
# The mobile app will use this route to open the shift swap request
route = f"/schedules/{shift_swap_request.schedule.public_primary_key}/ssrs/{shift_swap_request.public_primary_key}"
data: FCMMessageData = {
"title": notification_title,
"subtitle": notification_subtitle,
"route": route,
"info_notification_sound_name": mobile_app_user_settings.get_notification_sound_name(
MessageType.INFO, Platform.ANDROID
),
"info_notification_volume_type": mobile_app_user_settings.info_notification_volume_type,
"info_notification_volume": str(mobile_app_user_settings.info_notification_volume),
"info_notification_volume_override": json.dumps(mobile_app_user_settings.info_notification_volume_override),
}
apns_payload = APNSPayload(
aps=Aps(
thread_id=thread_id,
alert=ApsAlert(title=notification_title, subtitle=notification_subtitle),
sound=CriticalSound(
critical=False,
name=mobile_app_user_settings.get_notification_sound_name(MessageType.INFO, Platform.IOS),
),
custom_data={
"interruption-level": "time-sensitive",
},
),
)
return _construct_fcm_message(MessageType.INFO, device_to_notify, thread_id, data, apns_payload)

View file

@ -0,0 +1,10 @@
from .going_oncall_notification import ( # noqa:F401
conditionally_send_going_oncall_push_notifications_for_all_schedules,
conditionally_send_going_oncall_push_notifications_for_schedule,
)
from .new_alert_group import notify_user_async # noqa:F401
from .new_shift_swap_request import ( # noqa:F401
notify_shift_swap_request,
notify_shift_swap_requests,
notify_user_about_shift_swap_request,
)

View file

@ -0,0 +1,245 @@
import datetime
import json
import logging
import math
import typing
import humanize
import pytz
from celery.utils.log import get_task_logger
from django.core.cache import cache
from django.utils import timezone
from firebase_admin.messaging import APNSPayload, Aps, ApsAlert, CriticalSound, Message
from apps.mobile_app.types import FCMMessageData, MessageType, Platform
from apps.mobile_app.utils import MAX_RETRIES, construct_fcm_message, send_push_notification
from apps.schedules.models.on_call_schedule import OnCallSchedule, ScheduleEvent
from apps.user_management.models import User
from common.custom_celery_tasks import shared_dedicated_queue_retry_task
from common.l10n import format_localized_datetime, format_localized_time
if typing.TYPE_CHECKING:
from apps.mobile_app.models import FCMDevice, MobileAppUserSettings
logger = get_task_logger(__name__)
logger.setLevel(logging.DEBUG)
def _get_notification_title(seconds_until_going_oncall: int) -> str:
return f"Your on-call shift starts in {humanize.naturaldelta(seconds_until_going_oncall)}"
def _get_notification_subtitle(
schedule: OnCallSchedule,
schedule_event: ScheduleEvent,
mobile_app_user_settings: "MobileAppUserSettings",
) -> str:
shift_start = schedule_event["start"]
shift_end = schedule_event["end"]
shift_starts_and_ends_on_same_day = shift_start.date() == shift_end.date()
dt_formatter_func = format_localized_time if shift_starts_and_ends_on_same_day else format_localized_datetime
def _format_datetime(dt: datetime.datetime) -> str:
"""
1. Convert the shift datetime to the user's mobile device's timezone
2. Display the timezone aware datetime as a formatted string that is based on the user's configured mobile
app locale, otherwise fallback to "en"
"""
localized_dt = dt.astimezone(pytz.timezone(mobile_app_user_settings.time_zone))
return dt_formatter_func(localized_dt, mobile_app_user_settings.locale)
formatted_shift = f"{_format_datetime(shift_start)} - {_format_datetime(shift_end)}"
return f"{formatted_shift}\nSchedule {schedule.name}"
def _get_fcm_message(
user: User,
schedule: OnCallSchedule,
device_to_notify: "FCMDevice",
seconds_until_going_oncall: int,
schedule_event: ScheduleEvent,
) -> Message:
# avoid circular import
from apps.mobile_app.models import MobileAppUserSettings
thread_id = f"{schedule.public_primary_key}:{user.public_primary_key}:going-oncall"
mobile_app_user_settings, _ = MobileAppUserSettings.objects.get_or_create(user=user)
notification_title = _get_notification_title(seconds_until_going_oncall)
notification_subtitle = _get_notification_subtitle(schedule, schedule_event, mobile_app_user_settings)
data: FCMMessageData = {
"title": notification_title,
"subtitle": notification_subtitle,
"info_notification_sound_name": mobile_app_user_settings.get_notification_sound_name(
MessageType.INFO, Platform.ANDROID
),
"info_notification_volume_type": mobile_app_user_settings.info_notification_volume_type,
"info_notification_volume": str(mobile_app_user_settings.info_notification_volume),
"info_notification_volume_override": json.dumps(mobile_app_user_settings.info_notification_volume_override),
}
apns_payload = APNSPayload(
aps=Aps(
thread_id=thread_id,
alert=ApsAlert(title=notification_title, subtitle=notification_subtitle),
sound=CriticalSound(
critical=False,
name=mobile_app_user_settings.get_notification_sound_name(MessageType.INFO, Platform.IOS),
),
custom_data={
"interruption-level": "time-sensitive",
},
),
)
return construct_fcm_message(MessageType.INFO, device_to_notify, thread_id, data, apns_payload)
def _shift_starts_within_range(
timing_window_lower: int, timing_window_upper: int, seconds_until_shift_starts: int
) -> bool:
return timing_window_lower <= seconds_until_shift_starts <= timing_window_upper
def _should_we_send_push_notification(
now: datetime.datetime, user_settings: "MobileAppUserSettings", schedule_event: ScheduleEvent
) -> typing.Optional[int]:
"""
If the user should be set a "you're going oncall" push notification, return the number of seconds
until they will be going oncall.
If no notification should be sent, return None.
Currently we will send notifications for the following scenarios:
- schedule is starting in user's "configured notification timing preference" +/- a 4 minute buffer
- schedule is starting within the next fifteen minutes
Returns `None` if conditions are not met for the user to receive a push notification. Otherwise returns
an `int` which represents the # of seconds until the oncall shift starts.
"""
NOTIFICATION_TIMING_BUFFER = 7 * 60 # 7 minutes in seconds
FIFTEEN_MINUTES_IN_SECONDS = 15 * 60
# this _should_ always be positive since final_events is returning only events in the future
seconds_until_shift_starts = math.floor((schedule_event["start"] - now).total_seconds())
user_wants_to_receive_info_notifications = user_settings.info_notifications_enabled
# int representing num of seconds before the shift starts that the user wants to be notified
user_notification_timing_preference = user_settings.going_oncall_notification_timing
if not user_wants_to_receive_info_notifications:
logger.info("not sending going oncall push notification because info_notifications_enabled is false")
return None
# 14 minute window where the notification could be sent (7 mins before or 7 mins after)
timing_window_lower = user_notification_timing_preference - NOTIFICATION_TIMING_BUFFER
timing_window_upper = user_notification_timing_preference + NOTIFICATION_TIMING_BUFFER
shift_starts_within_users_notification_timing_preference = _shift_starts_within_range(
timing_window_lower, timing_window_upper, seconds_until_shift_starts
)
shift_starts_within_fifteen_minutes = _shift_starts_within_range(
0, FIFTEEN_MINUTES_IN_SECONDS, seconds_until_shift_starts
)
timing_logging_msg = (
f"seconds_until_shift_starts: {seconds_until_shift_starts}\n"
f"user_notification_timing_preference: {user_notification_timing_preference}\n"
f"timing_window_lower: {timing_window_lower}\n"
f"timing_window_upper: {timing_window_upper}\n"
f"shift_starts_within_users_notification_timing_preference: {shift_starts_within_users_notification_timing_preference}\n"
f"shift_starts_within_fifteen_minutes: {shift_starts_within_fifteen_minutes}"
)
# Temporary remove `shift_starts_within_users_notification_timing_preference` from condition to send notification only 15 minutes before the shift starts
# TODO: Return it once mobile app ready and default value is changed (https://github.com/grafana/oncall/issues/1999)
if shift_starts_within_fifteen_minutes:
logger.info(f"timing is right to send going oncall push notification\n{timing_logging_msg}")
return seconds_until_shift_starts
logger.info(f"timing is not right to send going oncall push notification\n{timing_logging_msg}")
return None
def _generate_cache_key(user_pk: str, schedule_event: ScheduleEvent) -> str:
return f"going_oncall_push_notification:{user_pk}:{schedule_event['shift']['pk']}"
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=MAX_RETRIES)
def conditionally_send_going_oncall_push_notifications_for_schedule(schedule_pk) -> None:
# avoid circular import
from apps.mobile_app.models import FCMDevice, MobileAppUserSettings
PUSH_NOTIFICATION_TRACKING_CACHE_KEY_TTL = 60 * 60 # 60 minutes
user_cache: typing.Dict[str, User] = {}
device_cache: typing.Dict[str, "FCMDevice"] = {}
logger.info(f"Start calculate_going_oncall_push_notifications_for_schedule for schedule {schedule_pk}")
try:
schedule: OnCallSchedule = OnCallSchedule.objects.get(pk=schedule_pk)
except OnCallSchedule.DoesNotExist:
logger.info(f"Tried to notify user about going on-call for non-existing schedule {schedule_pk}")
return
now = timezone.now()
datetime_end = now + datetime.timedelta(days=7)
schedule_final_events = schedule.final_events(now, datetime_end)
relevant_cache_keys = [
_generate_cache_key(user["pk"], schedule_event)
for schedule_event in schedule_final_events
for user in schedule_event["users"]
]
relevant_notifications_already_sent = cache.get_many(relevant_cache_keys)
for schedule_event in schedule_final_events:
users = schedule_event["users"]
for user in users:
user_pk = user["pk"]
user = user_cache.get(user_pk, None)
if user is None:
try:
user = User.objects.get(public_primary_key=user_pk)
user_cache[user_pk] = user
except User.DoesNotExist:
logger.warning(f"User {user_pk} does not exist")
continue
device_to_notify = device_cache.get(user_pk, None)
if device_to_notify is None:
device_to_notify = FCMDevice.get_active_device_for_user(user)
if not device_to_notify:
continue
else:
device_cache[user_pk] = device_to_notify
mobile_app_user_settings, _ = MobileAppUserSettings.objects.get_or_create(user=user)
cache_key = _generate_cache_key(user_pk, schedule_event)
already_sent_this_push_notification = cache_key in relevant_notifications_already_sent
seconds_until_going_oncall = _should_we_send_push_notification(
now, mobile_app_user_settings, schedule_event
)
if seconds_until_going_oncall is not None and not already_sent_this_push_notification:
message = _get_fcm_message(user, schedule, device_to_notify, seconds_until_going_oncall, schedule_event)
send_push_notification(device_to_notify, message)
cache.set(cache_key, True, PUSH_NOTIFICATION_TRACKING_CACHE_KEY_TTL)
else:
logger.info(
f"Skipping sending going oncall push notification for user {user_pk} and shift {schedule_event['shift']['pk']}. "
f"Already sent: {already_sent_this_push_notification}"
)
@shared_dedicated_queue_retry_task()
def conditionally_send_going_oncall_push_notifications_for_all_schedules() -> None:
for schedule in OnCallSchedule.objects.all():
conditionally_send_going_oncall_push_notifications_for_schedule.apply_async((schedule.pk,))

View file

@ -0,0 +1,139 @@
import json
import logging
import typing
from celery.utils.log import get_task_logger
from firebase_admin.messaging import APNSPayload, Aps, ApsAlert, CriticalSound, Message
from apps.alerts.models import AlertGroup
from apps.mobile_app.alert_rendering import get_push_notification_subtitle
from apps.mobile_app.types import FCMMessageData, MessageType, Platform
from apps.mobile_app.utils import MAX_RETRIES, construct_fcm_message, send_push_notification
from apps.user_management.models import User
from common.custom_celery_tasks import shared_dedicated_queue_retry_task
if typing.TYPE_CHECKING:
from apps.mobile_app.models import FCMDevice
logger = get_task_logger(__name__)
logger.setLevel(logging.DEBUG)
def _get_fcm_message(alert_group: AlertGroup, user: User, device_to_notify: "FCMDevice", critical: bool) -> Message:
# avoid circular import
from apps.mobile_app.models import MobileAppUserSettings
thread_id = f"{alert_group.channel.organization.public_primary_key}:{alert_group.public_primary_key}"
alert_title = "New Important Alert" if critical else "New Alert"
alert_subtitle = get_push_notification_subtitle(alert_group)
mobile_app_user_settings, _ = MobileAppUserSettings.objects.get_or_create(user=user)
# critical defines the type of notification.
# we use overrideDND to establish if the notification should sound even if DND is on
overrideDND = critical and mobile_app_user_settings.important_notification_override_dnd
# APNS only allows to specify volume for critical notifications
apns_volume = mobile_app_user_settings.important_notification_volume if critical else None
message_type = MessageType.IMPORTANT if critical else MessageType.DEFAULT
apns_sound_name = mobile_app_user_settings.get_notification_sound_name(message_type, Platform.IOS)
fcm_message_data: FCMMessageData = {
"title": alert_title,
"subtitle": alert_subtitle,
"orgId": alert_group.channel.organization.public_primary_key,
"orgName": alert_group.channel.organization.stack_slug,
"alertGroupId": alert_group.public_primary_key,
# alert_group.status is an int so it must be casted...
"status": str(alert_group.status),
# Pass user settings, so the Android app can use them to play the correct sound and volume
"default_notification_sound_name": mobile_app_user_settings.get_notification_sound_name(
MessageType.DEFAULT, Platform.ANDROID
),
"default_notification_volume_type": mobile_app_user_settings.default_notification_volume_type,
"default_notification_volume": str(mobile_app_user_settings.default_notification_volume),
"default_notification_volume_override": json.dumps(
mobile_app_user_settings.default_notification_volume_override
),
"important_notification_sound_name": mobile_app_user_settings.get_notification_sound_name(
MessageType.IMPORTANT, Platform.ANDROID
),
"important_notification_volume_type": mobile_app_user_settings.important_notification_volume_type,
"important_notification_volume": str(mobile_app_user_settings.important_notification_volume),
"important_notification_volume_override": json.dumps(
mobile_app_user_settings.important_notification_volume_override
),
"important_notification_override_dnd": json.dumps(mobile_app_user_settings.important_notification_override_dnd),
}
number_of_alerts = alert_group.alerts.count()
apns_payload = APNSPayload(
aps=Aps(
thread_id=thread_id,
badge=number_of_alerts,
alert=ApsAlert(title=alert_title, subtitle=alert_subtitle),
sound=CriticalSound(
# The notification shouldn't be critical if the user has disabled "override DND" setting
critical=overrideDND,
name=apns_sound_name,
volume=apns_volume,
),
custom_data={
"interruption-level": "critical" if overrideDND else "time-sensitive",
},
),
)
return construct_fcm_message(message_type, device_to_notify, thread_id, fcm_message_data, apns_payload)
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=MAX_RETRIES)
def notify_user_async(user_pk, alert_group_pk, notification_policy_pk, critical):
# avoid circular import
from apps.base.models import UserNotificationPolicy, UserNotificationPolicyLogRecord
from apps.mobile_app.models import FCMDevice
try:
user = User.objects.get(pk=user_pk)
except User.DoesNotExist:
logger.warning(f"User {user_pk} does not exist")
return
try:
alert_group = AlertGroup.objects.get(pk=alert_group_pk)
except AlertGroup.DoesNotExist:
logger.warning(f"Alert group {alert_group_pk} does not exist")
return
try:
notification_policy = UserNotificationPolicy.objects.get(pk=notification_policy_pk)
except UserNotificationPolicy.DoesNotExist:
logger.warning(f"User notification policy {notification_policy_pk} does not exist")
return
def _create_error_log_record():
"""
Utility method to create a UserNotificationPolicyLogRecord with error
"""
UserNotificationPolicyLogRecord.objects.create(
author=user,
type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED,
notification_policy=notification_policy,
alert_group=alert_group,
reason="Mobile push notification error",
notification_step=notification_policy.step,
notification_channel=notification_policy.notify_by,
)
device_to_notify = FCMDevice.get_active_device_for_user(user)
# create an error log in case user has no devices set up
if not device_to_notify:
_create_error_log_record()
logger.error(f"Error while sending a mobile push notification: user {user_pk} has no device set up")
return
message = _get_fcm_message(alert_group, user, device_to_notify, critical)
send_push_notification(device_to_notify, message, _create_error_log_record)

View file

@ -0,0 +1,198 @@
import datetime
import json
import logging
import math
import typing
from celery.utils.log import get_task_logger
from django.core.cache import cache
from django.utils import timezone
from firebase_admin.messaging import APNSPayload, Aps, ApsAlert, CriticalSound, Message
from apps.mobile_app.types import FCMMessageData, MessageType, Platform
from apps.mobile_app.utils import MAX_RETRIES, construct_fcm_message, send_push_notification
from apps.schedules.models import ShiftSwapRequest
from apps.user_management.models import User
from common.custom_celery_tasks import shared_dedicated_queue_retry_task
if typing.TYPE_CHECKING:
from apps.mobile_app.models import FCMDevice, MobileAppUserSettings
logger = get_task_logger(__name__)
logger.setLevel(logging.DEBUG)
def _generate_cache_key(shift_swap_request: ShiftSwapRequest, user: User) -> str:
return f"ssr_push:{shift_swap_request.pk}:{user.pk}"
def _mark_shift_swap_request_notified_for_user(shift_swap_request: ShiftSwapRequest, user: User, timeout: int) -> None:
key = _generate_cache_key(shift_swap_request, user)
cache.set(key, True, timeout=timeout)
def _get_shift_swap_requests_to_notify(now: datetime.datetime) -> list[tuple[ShiftSwapRequest, int]]:
"""
Returns shifts swap requests that are open and are in the notification window.
This method can return the same shift swap request multiple times while it's in the notification window,
but users are only notified once per shift swap request (see _mark_shift_swap_request_notified_for_user).
"""
shift_swap_requests_in_notification_window = []
for shift_swap_request in ShiftSwapRequest.objects.get_open_requests(now):
for idx, offset in enumerate(ShiftSwapRequest.FOLLOWUP_OFFSETS):
next_offset = (
ShiftSwapRequest.FOLLOWUP_OFFSETS[idx + 1]
if idx + 1 < len(ShiftSwapRequest.FOLLOWUP_OFFSETS)
else datetime.timedelta(0)
)
window = offset - next_offset - timezone.timedelta(microseconds=1) # check SSRs up to the next offset
notification_window_start = shift_swap_request.swap_start - offset
notification_window_end = notification_window_start + window
if notification_window_start <= now <= notification_window_end:
next_notification_dt = shift_swap_request.swap_start - next_offset
timeout = math.ceil((next_notification_dt - now).total_seconds()) # don't send notifications twice
shift_swap_requests_in_notification_window.append((shift_swap_request, timeout))
break
return shift_swap_requests_in_notification_window
def _has_user_been_notified_for_shift_swap_request(shift_swap_request: ShiftSwapRequest, user: User) -> bool:
key = _generate_cache_key(shift_swap_request, user)
return cache.get(key) is True
def _should_notify_user_about_shift_swap_request(
shift_swap_request: ShiftSwapRequest, user: User, now: datetime.datetime
) -> bool:
# avoid circular import
from apps.mobile_app.models import MobileAppUserSettings
try:
mobile_app_user_settings = MobileAppUserSettings.objects.get(user=user)
except MobileAppUserSettings.DoesNotExist:
return False # don't notify if the app is not configured
return user.is_in_working_hours( # user must be in working hours
now, mobile_app_user_settings.time_zone
) and not _has_user_been_notified_for_shift_swap_request( # don't notify twice
shift_swap_request, user
)
def _get_fcm_message(
shift_swap_request: ShiftSwapRequest,
user: User,
device_to_notify: "FCMDevice",
mobile_app_user_settings: "MobileAppUserSettings",
) -> Message:
thread_id = f"{shift_swap_request.public_primary_key}:{user.public_primary_key}:ssr"
notification_title = "New shift swap request"
beneficiary_name = shift_swap_request.beneficiary.name or shift_swap_request.beneficiary.username
notification_subtitle = f"{beneficiary_name}, {shift_swap_request.schedule.name}"
# The mobile app will use this route to open the shift swap request
route = f"/schedules/{shift_swap_request.schedule.public_primary_key}/ssrs/{shift_swap_request.public_primary_key}"
data: FCMMessageData = {
"title": notification_title,
"subtitle": notification_subtitle,
"route": route,
"info_notification_sound_name": mobile_app_user_settings.get_notification_sound_name(
MessageType.INFO, Platform.ANDROID
),
"info_notification_volume_type": mobile_app_user_settings.info_notification_volume_type,
"info_notification_volume": str(mobile_app_user_settings.info_notification_volume),
"info_notification_volume_override": json.dumps(mobile_app_user_settings.info_notification_volume_override),
}
apns_payload = APNSPayload(
aps=Aps(
thread_id=thread_id,
alert=ApsAlert(title=notification_title, subtitle=notification_subtitle),
sound=CriticalSound(
critical=False,
name=mobile_app_user_settings.get_notification_sound_name(MessageType.INFO, Platform.IOS),
),
custom_data={
"interruption-level": "time-sensitive",
},
),
)
return construct_fcm_message(MessageType.INFO, device_to_notify, thread_id, data, apns_payload)
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=MAX_RETRIES)
def notify_user_about_shift_swap_request(shift_swap_request_pk: int, user_pk: int) -> None:
"""
Send a push notification about a shift swap request to an individual user.
"""
# avoid circular import
from apps.mobile_app.models import FCMDevice, MobileAppUserSettings
try:
shift_swap_request = ShiftSwapRequest.objects.get(pk=shift_swap_request_pk)
except ShiftSwapRequest.DoesNotExist:
logger.info(f"ShiftSwapRequest {shift_swap_request_pk} does not exist")
return
try:
user = User.objects.get(pk=user_pk)
except User.DoesNotExist:
logger.info(f"User {user_pk} does not exist")
return
device_to_notify = FCMDevice.get_active_device_for_user(user)
if not device_to_notify:
logger.info(f"FCMDevice does not exist for user {user_pk}")
return
try:
mobile_app_user_settings = MobileAppUserSettings.objects.get(user=user)
except MobileAppUserSettings.DoesNotExist:
logger.info(f"MobileAppUserSettings does not exist for user {user_pk}")
return
if not mobile_app_user_settings.info_notifications_enabled:
logger.info(f"Info notifications are not enabled for user {user_pk}")
return
if not shift_swap_request.is_open:
logger.info(f"Shift swap request {shift_swap_request_pk} is not open anymore")
return
message = _get_fcm_message(shift_swap_request, user, device_to_notify, mobile_app_user_settings)
send_push_notification(device_to_notify, message)
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=MAX_RETRIES)
def notify_shift_swap_request(shift_swap_request_pk: int, timeout: int) -> None:
"""
Notify relevant users for an individual shift swap request.
"""
try:
shift_swap_request = ShiftSwapRequest.objects.get(pk=shift_swap_request_pk)
except ShiftSwapRequest.DoesNotExist:
logger.info(f"ShiftSwapRequest {shift_swap_request_pk} does not exist")
return
now = timezone.now()
for user in shift_swap_request.possible_benefactors:
if _should_notify_user_about_shift_swap_request(shift_swap_request, user, now):
notify_user_about_shift_swap_request.delay(shift_swap_request.pk, user.pk)
_mark_shift_swap_request_notified_for_user(shift_swap_request, user, timeout)
@shared_dedicated_queue_retry_task()
def notify_shift_swap_requests() -> None:
"""
A periodic task that notifies users about shift swap requests.
"""
for shift_swap_request, timeout in _get_shift_swap_requests_to_notify(timezone.now()):
notify_shift_swap_request.delay(shift_swap_request.pk, timeout)

View file

@ -6,8 +6,17 @@ import pytest
from django.core.cache import cache
from django.utils import timezone
from apps.mobile_app import tasks
from apps.mobile_app.models import FCMDevice, MobileAppUserSettings
from apps.mobile_app.tasks.going_oncall_notification import (
_generate_cache_key,
_get_fcm_message,
_get_notification_subtitle,
_get_notification_title,
_shift_starts_within_range,
_should_we_send_push_notification,
conditionally_send_going_oncall_push_notifications_for_all_schedules,
conditionally_send_going_oncall_push_notifications_for_schedule,
)
from apps.mobile_app.types import MessageType, Platform
from apps.schedules.models import OnCallScheduleCalendar, OnCallScheduleICal, OnCallScheduleWeb
from apps.schedules.models.on_call_schedule import ScheduleEvent
@ -42,14 +51,11 @@ def _create_schedule_event(
)
@pytest.mark.django_db
def test_shift_starts_within_range(timing_window_lower, timing_window_upper, seconds_until_shift_starts, expected):
assert (
tasks._shift_starts_within_range(timing_window_lower, timing_window_upper, seconds_until_shift_starts)
== expected
)
assert _shift_starts_within_range(timing_window_lower, timing_window_upper, seconds_until_shift_starts) == expected
@pytest.mark.django_db
def test_get_youre_going_oncall_notification_title(make_organization_and_user, make_user, make_schedule):
def test_get_notification_title(make_organization_and_user, make_user, make_schedule):
schedule_name = "asdfasdfasdfasdf"
organization, user = make_organization_and_user()
@ -95,11 +101,9 @@ def test_get_youre_going_oncall_notification_title(make_organization_and_user, m
##################
# same day shift
##################
same_day_shift_title = tasks._get_youre_going_oncall_notification_title(seconds_until_going_oncall)
same_day_shift_subtitle = tasks._get_youre_going_oncall_notification_subtitle(schedule, same_day_shift, maus)
same_day_shift_no_locale_subtitle = tasks._get_youre_going_oncall_notification_subtitle(
schedule, same_day_shift, maus_no_locale
)
same_day_shift_title = _get_notification_title(seconds_until_going_oncall)
same_day_shift_subtitle = _get_notification_subtitle(schedule, same_day_shift, maus)
same_day_shift_no_locale_subtitle = _get_notification_subtitle(schedule, same_day_shift, maus_no_locale)
assert same_day_shift_title == f"Your on-call shift starts in {humanized_time_until_going_oncall}"
assert same_day_shift_subtitle == f"09 h 00 - 17 h 00\nSchedule {schedule_name}"
@ -108,13 +112,9 @@ def test_get_youre_going_oncall_notification_title(make_organization_and_user, m
##################
# multiple day shift
##################
multiple_day_shift_title = tasks._get_youre_going_oncall_notification_title(seconds_until_going_oncall)
multiple_day_shift_subtitle = tasks._get_youre_going_oncall_notification_subtitle(
schedule, multiple_day_shift, maus
)
multiple_day_shift_no_locale_subtitle = tasks._get_youre_going_oncall_notification_subtitle(
schedule, multiple_day_shift, maus_no_locale
)
multiple_day_shift_title = _get_notification_title(seconds_until_going_oncall)
multiple_day_shift_subtitle = _get_notification_subtitle(schedule, multiple_day_shift, maus)
multiple_day_shift_no_locale_subtitle = _get_notification_subtitle(schedule, multiple_day_shift, maus_no_locale)
assert multiple_day_shift_title == f"Your on-call shift starts in {humanized_time_until_going_oncall}"
assert multiple_day_shift_subtitle == f"2023-07-08 09 h 00 - 2023-07-12 17 h 00\nSchedule {schedule_name}"
@ -132,7 +132,7 @@ def test_get_youre_going_oncall_notification_title(make_organization_and_user, m
],
)
@pytest.mark.django_db
def test_get_youre_going_oncall_notification_subtitle(
def test_get_notification_subtitle(
make_organization, make_user_for_organization, make_schedule, user_timezone, expected_shift_times
):
schedule_name = "asdfasdfasdfasdf"
@ -158,28 +158,25 @@ def test_get_youre_going_oncall_notification_subtitle(
],
)
assert (
tasks._get_youre_going_oncall_notification_subtitle(schedule, shift, maus)
== f"{expected_shift_times}\nSchedule {schedule_name}"
)
assert _get_notification_subtitle(schedule, shift, maus) == f"{expected_shift_times}\nSchedule {schedule_name}"
@mock.patch("apps.mobile_app.tasks._get_youre_going_oncall_notification_subtitle")
@mock.patch("apps.mobile_app.tasks._get_youre_going_oncall_notification_title")
@mock.patch("apps.mobile_app.tasks._construct_fcm_message")
@mock.patch("apps.mobile_app.tasks.APNSPayload")
@mock.patch("apps.mobile_app.tasks.Aps")
@mock.patch("apps.mobile_app.tasks.ApsAlert")
@mock.patch("apps.mobile_app.tasks.CriticalSound")
@mock.patch("apps.mobile_app.tasks.going_oncall_notification._get_notification_subtitle")
@mock.patch("apps.mobile_app.tasks.going_oncall_notification._get_notification_title")
@mock.patch("apps.mobile_app.tasks.going_oncall_notification.construct_fcm_message")
@mock.patch("apps.mobile_app.tasks.going_oncall_notification.APNSPayload")
@mock.patch("apps.mobile_app.tasks.going_oncall_notification.Aps")
@mock.patch("apps.mobile_app.tasks.going_oncall_notification.ApsAlert")
@mock.patch("apps.mobile_app.tasks.going_oncall_notification.CriticalSound")
@pytest.mark.django_db
def test_get_youre_going_oncall_fcm_message(
def test_get_fcm_message(
mock_critical_sound,
mock_aps_alert,
mock_aps,
mock_apns_payload,
mock_construct_fcm_message,
mock_get_youre_going_oncall_notification_title,
mock_get_youre_going_oncall_notification_subtitle,
mock_get_notification_title,
mock_get_notification_subtitle,
make_organization,
make_user_for_organization,
make_schedule,
@ -191,8 +188,8 @@ def test_get_youre_going_oncall_fcm_message(
seconds_until_going_oncall = 600
mock_construct_fcm_message.return_value = mock_fcm_message
mock_get_youre_going_oncall_notification_title.return_value = mock_notification_title
mock_get_youre_going_oncall_notification_subtitle.return_value = mock_notification_subtitle
mock_get_notification_title.return_value = mock_notification_title
mock_get_notification_subtitle.return_value = mock_notification_subtitle
organization = make_organization()
user_tz = "Europe/Amsterdam"
@ -224,9 +221,7 @@ def test_get_youre_going_oncall_fcm_message(
"info_notification_volume_override": json.dumps(maus.info_notification_volume_override),
}
fcm_message = tasks._get_youre_going_oncall_fcm_message(
user, schedule, device, seconds_until_going_oncall, schedule_event
)
fcm_message = _get_fcm_message(user, schedule, device, seconds_until_going_oncall, schedule_event)
assert fcm_message == mock_fcm_message
@ -244,8 +239,8 @@ def test_get_youre_going_oncall_fcm_message(
)
mock_apns_payload.assert_called_once_with(aps=mock_aps.return_value)
mock_get_youre_going_oncall_notification_subtitle.assert_called_once_with(schedule, schedule_event, maus)
mock_get_youre_going_oncall_notification_title.assert_called_once_with(seconds_until_going_oncall)
mock_get_notification_subtitle.assert_called_once_with(schedule, schedule_event, maus)
mock_get_notification_title.assert_called_once_with(seconds_until_going_oncall)
mock_construct_fcm_message.assert_called_once_with(
MessageType.INFO, device, notification_thread_id, data, mock_apns_payload.return_value
@ -350,7 +345,7 @@ def test_get_youre_going_oncall_fcm_message(
],
)
@pytest.mark.django_db
def test_should_we_send_going_oncall_push_notification(
def test_should_we_send_push_notification(
make_organization_and_user,
info_notifications_enabled,
now,
@ -366,40 +361,40 @@ def test_should_we_send_going_oncall_push_notification(
)
assert (
tasks.should_we_send_going_oncall_push_notification(
_should_we_send_push_notification(
now, user_mobile_settings, _create_schedule_event(schedule_start, schedule_start, "12345", [])
)
== expected
)
def test_generate_going_oncall_push_notification_cache_key() -> None:
def test_generate_cache_key() -> None:
user_pk = "adfad"
schedule_event = {"shift": {"pk": "dfdfdf"}}
assert (
tasks._generate_going_oncall_push_notification_cache_key(user_pk, schedule_event)
_generate_cache_key(user_pk, schedule_event)
== f"going_oncall_push_notification:{user_pk}:{schedule_event['shift']['pk']}"
)
@mock.patch("apps.mobile_app.tasks._send_push_notification")
@mock.patch("apps.mobile_app.tasks.going_oncall_notification.send_push_notification")
@pytest.mark.django_db
def test_conditionally_send_going_oncall_push_notifications_for_schedule_schedule_not_found(
mocked_send_push_notification,
):
tasks.conditionally_send_going_oncall_push_notifications_for_schedule(12345)
conditionally_send_going_oncall_push_notifications_for_schedule(12345)
mocked_send_push_notification.assert_not_called()
@mock.patch("apps.mobile_app.tasks.OnCallSchedule.final_events")
@mock.patch("apps.mobile_app.tasks._send_push_notification")
@mock.patch("apps.mobile_app.tasks.should_we_send_going_oncall_push_notification")
@mock.patch("apps.mobile_app.tasks._get_youre_going_oncall_fcm_message")
@mock.patch("apps.mobile_app.tasks.going_oncall_notification.OnCallSchedule.final_events")
@mock.patch("apps.mobile_app.tasks.going_oncall_notification.send_push_notification")
@mock.patch("apps.mobile_app.tasks.going_oncall_notification._should_we_send_push_notification")
@mock.patch("apps.mobile_app.tasks.going_oncall_notification._get_fcm_message")
@pytest.mark.django_db
def test_conditionally_send_going_oncall_push_notifications_for_schedule(
mock_get_youre_going_oncall_fcm_message,
mock_should_we_send_going_oncall_push_notification,
mock_get_fcm_message,
mock_should_we_send_push_notification,
mock_send_push_notification,
mock_oncall_schedule_final_events,
make_organization_and_user,
@ -424,8 +419,8 @@ def test_conditionally_send_going_oncall_push_notifications_for_schedule(
final_events = [schedule_event]
seconds_until_shift_starts = 58989
mock_get_youre_going_oncall_fcm_message.return_value = mock_fcm_message
mock_should_we_send_going_oncall_push_notification.return_value = seconds_until_shift_starts
mock_get_fcm_message.return_value = mock_fcm_message
mock_should_we_send_push_notification.return_value = seconds_until_shift_starts
mock_oncall_schedule_final_events.return_value = final_events
schedule = make_schedule(organization, schedule_class=OnCallScheduleWeb)
@ -434,35 +429,35 @@ def test_conditionally_send_going_oncall_push_notifications_for_schedule(
assert cache.get(cache_key) is None
# no device available
tasks.conditionally_send_going_oncall_push_notifications_for_schedule(schedule.pk)
conditionally_send_going_oncall_push_notifications_for_schedule(schedule.pk)
mock_send_push_notification.assert_not_called()
# device available
device = FCMDevice.objects.create(user=user, registration_id="test_device_id")
MobileAppUserSettings.objects.create(user=user, going_oncall_notification_timing=ONCALL_TIMING_PREFERENCE)
tasks.conditionally_send_going_oncall_push_notifications_for_schedule(schedule.pk)
conditionally_send_going_oncall_push_notifications_for_schedule(schedule.pk)
mock_get_youre_going_oncall_fcm_message.assert_called_once_with(
user, schedule, device, seconds_until_shift_starts, schedule_event
)
mock_get_fcm_message.assert_called_once_with(user, schedule, device, seconds_until_shift_starts, schedule_event)
mock_send_push_notification.assert_called_once_with(device, mock_fcm_message)
assert cache.get(cache_key) is True
# we shouldn't double send the same push notification for the same user/shift
tasks.conditionally_send_going_oncall_push_notifications_for_schedule(schedule.pk)
conditionally_send_going_oncall_push_notifications_for_schedule(schedule.pk)
assert mock_send_push_notification.call_count == 1
# if the cache key expires we will resend the push notification for the same user/shift
# (in reality we're setting a timeout on the cache key, here we will just delete it to simulate this)
cache.delete(cache_key)
tasks.conditionally_send_going_oncall_push_notifications_for_schedule(schedule.pk)
conditionally_send_going_oncall_push_notifications_for_schedule(schedule.pk)
assert mock_send_push_notification.call_count == 2
assert cache.get(cache_key) is True
@mock.patch("apps.mobile_app.tasks.conditionally_send_going_oncall_push_notifications_for_schedule")
@mock.patch(
"apps.mobile_app.tasks.going_oncall_notification.conditionally_send_going_oncall_push_notifications_for_schedule"
)
@pytest.mark.django_db
def test_conditionally_send_going_oncall_push_notifications_for_all_schedules(
mocked_conditionally_send_going_oncall_push_notifications_for_schedule,
@ -474,7 +469,7 @@ def test_conditionally_send_going_oncall_push_notifications_for_all_schedules(
schedule2 = make_schedule(organization, schedule_class=OnCallScheduleICal)
schedule3 = make_schedule(organization, schedule_class=OnCallScheduleWeb)
tasks.conditionally_send_going_oncall_push_notifications_for_all_schedules()
conditionally_send_going_oncall_push_notifications_for_all_schedules()
mocked_conditionally_send_going_oncall_push_notifications_for_schedule.apply_async.assert_has_calls(
[

View file

@ -1,21 +1,20 @@
from unittest.mock import patch
import pytest
from firebase_admin.exceptions import FirebaseError
from apps.base.models import UserNotificationPolicy, UserNotificationPolicyLogRecord
from apps.mobile_app.models import FCMDevice, MobileAppUserSettings
from apps.mobile_app.tasks import _get_alert_group_escalation_fcm_message, notify_user_async
from apps.oss_installation.models import CloudConnector
from apps.mobile_app.tasks.new_alert_group import _get_fcm_message, notify_user_async
MOBILE_APP_BACKEND_ID = 5
CLOUD_LICENSE_NAME = "Cloud"
OPEN_SOURCE_LICENSE_NAME = "OpenSource"
@patch("apps.mobile_app.tasks.new_alert_group.send_push_notification")
@pytest.mark.django_db
def test_notify_user_async_cloud(
settings,
def test_notify_user_async(
mock_send_push_notification,
make_organization_and_user,
make_user_notification_policy,
make_alert_receive_channel,
@ -38,62 +37,20 @@ def test_notify_user_async_cloud(
alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter)
make_alert(alert_group=alert_group, raw_request_data={})
# check FCM is contacted directly when using the cloud license
settings.LICENSE = CLOUD_LICENSE_NAME
settings.IS_OPEN_SOURCE = False
with patch.object(FCMDevice, "send_message", return_value="ok") as mock:
notify_user_async(
user_pk=user.pk,
alert_group_pk=alert_group.pk,
notification_policy_pk=notification_policy.pk,
critical=False,
)
mock.assert_called()
@pytest.mark.django_db
def test_notify_user_async_oss(
settings,
make_organization_and_user,
make_user_notification_policy,
make_alert_receive_channel,
make_channel_filter,
make_alert_group,
make_alert,
):
# create a user and connect a mobile device
organization, user = make_organization_and_user()
FCMDevice.objects.create(user=user, registration_id="test_device_id")
# set up notification policy and alert group
notification_policy = make_user_notification_policy(
user,
UserNotificationPolicy.Step.NOTIFY,
notify_by=MOBILE_APP_BACKEND_ID,
notify_user_async(
user_pk=user.pk,
alert_group_pk=alert_group.pk,
notification_policy_pk=notification_policy.pk,
critical=False,
)
alert_receive_channel = make_alert_receive_channel(organization=organization)
channel_filter = make_channel_filter(alert_receive_channel)
alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter)
make_alert(alert_group=alert_group, raw_request_data={})
# create cloud connection
CloudConnector.objects.create(cloud_url="test")
# check FCM relay is contacted when using the OSS license
settings.LICENSE = OPEN_SOURCE_LICENSE_NAME
with patch("apps.mobile_app.tasks.send_push_notification_to_fcm_relay", return_value="ok") as mock:
notify_user_async(
user_pk=user.pk,
alert_group_pk=alert_group.pk,
notification_policy_pk=notification_policy.pk,
critical=False,
)
mock.assert_called()
mock_send_push_notification.assert_called_once()
@patch("apps.mobile_app.tasks.new_alert_group.send_push_notification")
@pytest.mark.django_db
def test_notify_user_async_oss_no_device_connected(
settings,
def test_notify_user_async_no_device_connected(
mock_send_push_notification,
make_organization_and_user,
make_user_notification_policy,
make_alert_receive_channel,
@ -115,102 +72,18 @@ def test_notify_user_async_oss_no_device_connected(
alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter)
make_alert(alert_group=alert_group, raw_request_data={})
# create cloud connection
CloudConnector.objects.create(cloud_url="test")
# check FCM relay is contacted when using the OSS license
settings.LICENSE = OPEN_SOURCE_LICENSE_NAME
with patch("apps.mobile_app.tasks.send_push_notification_to_fcm_relay", return_value="ok") as mock:
notify_user_async(
user_pk=user.pk,
alert_group_pk=alert_group.pk,
notification_policy_pk=notification_policy.pk,
critical=False,
)
mock.assert_not_called()
notify_user_async(
user_pk=user.pk,
alert_group_pk=alert_group.pk,
notification_policy_pk=notification_policy.pk,
critical=False,
)
mock_send_push_notification.assert_not_called()
log_record = alert_group.personal_log_records.last()
assert log_record.type == UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED
@pytest.mark.django_db
def test_notify_user_async_oss_no_cloud_connection(
settings,
make_organization_and_user,
make_user_notification_policy,
make_alert_receive_channel,
make_channel_filter,
make_alert_group,
make_alert,
):
# create a user and connect a mobile device
organization, user = make_organization_and_user()
FCMDevice.objects.create(user=user, registration_id="test_device_id")
# set up notification policy and alert group
notification_policy = make_user_notification_policy(
user,
UserNotificationPolicy.Step.NOTIFY,
notify_by=MOBILE_APP_BACKEND_ID,
)
alert_receive_channel = make_alert_receive_channel(organization=organization)
channel_filter = make_channel_filter(alert_receive_channel)
alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter)
make_alert(alert_group=alert_group, raw_request_data={})
# check FCM relay is contacted when using the OSS license
settings.LICENSE = OPEN_SOURCE_LICENSE_NAME
with patch("apps.mobile_app.tasks.send_push_notification_to_fcm_relay", return_value="ok") as mock:
notify_user_async(
user_pk=user.pk,
alert_group_pk=alert_group.pk,
notification_policy_pk=notification_policy.pk,
critical=False,
)
mock.assert_not_called()
log_record = alert_group.personal_log_records.last()
assert log_record.type == UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_FAILED
@pytest.mark.django_db
def test_notify_user_retry(
settings,
make_organization_and_user,
make_user_notification_policy,
make_alert_receive_channel,
make_channel_filter,
make_alert_group,
make_alert,
):
organization, user = make_organization_and_user()
FCMDevice.objects.create(user=user, registration_id="test_device_id")
notification_policy = make_user_notification_policy(
user,
UserNotificationPolicy.Step.NOTIFY,
notify_by=MOBILE_APP_BACKEND_ID,
)
alert_receive_channel = make_alert_receive_channel(organization=organization)
channel_filter = make_channel_filter(alert_receive_channel)
alert_group = make_alert_group(alert_receive_channel, channel_filter=channel_filter)
make_alert(alert_group=alert_group, raw_request_data={})
settings.LICENSE = CLOUD_LICENSE_NAME
settings.IS_OPEN_SOURCE = False
# check that FirebaseError is raised when send_message returns it so Celery task can retry
with patch.object(
FCMDevice, "send_message", return_value=FirebaseError(code="test_error_code", message="test_error_message")
):
with pytest.raises(FirebaseError):
notify_user_async(
user_pk=user.pk,
alert_group_pk=alert_group.pk,
notification_policy_pk=notification_policy.pk,
critical=False,
)
@pytest.mark.django_db
def test_fcm_message_user_settings(
make_organization_and_user, make_alert_receive_channel, make_alert_group, make_alert
@ -222,7 +95,7 @@ def test_fcm_message_user_settings(
alert_group = make_alert_group(alert_receive_channel)
make_alert(alert_group=alert_group, raw_request_data={})
message = _get_alert_group_escalation_fcm_message(alert_group, user, device, critical=False)
message = _get_fcm_message(alert_group, user, device, critical=False)
# Check user settings are passed to FCM message
assert message.data["default_notification_sound_name"] == "default_sound.mp3"
@ -254,7 +127,7 @@ def test_fcm_message_user_settings_critical(
alert_group = make_alert_group(alert_receive_channel)
make_alert(alert_group=alert_group, raw_request_data={})
message = _get_alert_group_escalation_fcm_message(alert_group, user, device, critical=True)
message = _get_fcm_message(alert_group, user, device, critical=True)
# Check user settings are passed to FCM message
assert message.data["default_notification_sound_name"] == "default_sound.mp3"
@ -289,7 +162,7 @@ def test_fcm_message_user_settings_critical_override_dnd_disabled(
# Disable important notification override DND
MobileAppUserSettings.objects.create(user=user, important_notification_override_dnd=False)
message = _get_alert_group_escalation_fcm_message(alert_group, user, device, critical=True)
message = _get_fcm_message(alert_group, user, device, critical=True)
# Check user settings are passed to FCM message
assert message.data["important_notification_override_dnd"] == "false"

View file

@ -6,7 +6,7 @@ from django.utils import timezone
from firebase_admin.messaging import Message
from apps.mobile_app.models import FCMDevice, MobileAppUserSettings
from apps.mobile_app.tasks import (
from apps.mobile_app.tasks.new_shift_swap_request import (
_get_shift_swap_requests_to_notify,
_has_user_been_notified_for_shift_swap_request,
_mark_shift_swap_request_notified_for_user,
@ -102,7 +102,7 @@ def test_notify_shift_swap_requests(make_organization, make_user, make_schedule,
with patch.object(notify_shift_swap_request, "delay") as mock_notify_shift_swap_request:
with patch(
"apps.mobile_app.tasks._get_shift_swap_requests_to_notify",
"apps.mobile_app.tasks.new_shift_swap_request._get_shift_swap_requests_to_notify",
return_value=[(ShiftSwapRequest.objects.filter(pk=shift_swap_request.pk).first(), TIMEOUT)],
) as mock_get_shift_swap_requests_to_notify:
notify_shift_swap_requests()
@ -127,7 +127,10 @@ def test_notify_shift_swap_request(make_organization, make_user, make_schedule,
)
with patch.object(notify_user_about_shift_swap_request, "delay") as mock_notify_user_about_shift_swap_request:
with patch("apps.mobile_app.tasks._should_notify_user_about_shift_swap_request", return_value=True):
with patch(
"apps.mobile_app.tasks.new_shift_swap_request._should_notify_user_about_shift_swap_request",
return_value=True,
):
with patch.object(
ShiftSwapRequest,
"possible_benefactors",
@ -156,7 +159,10 @@ def test_notify_shift_swap_request_should_not_notify_user(
)
with patch.object(notify_user_about_shift_swap_request, "delay") as mock_notify_user_about_shift_swap_request:
with patch("apps.mobile_app.tasks._should_notify_user_about_shift_swap_request", return_value=False):
with patch(
"apps.mobile_app.tasks.new_shift_swap_request._should_notify_user_about_shift_swap_request",
return_value=False,
):
with patch.object(
ShiftSwapRequest,
"possible_benefactors",
@ -233,7 +239,7 @@ def test_notify_user_about_shift_swap_request(make_organization, make_user, make
schedule, beneficiary, swap_start=swap_start, swap_end=swap_end, created_at=now
)
with patch("apps.mobile_app.tasks._send_push_notification") as mock_send_push_notification:
with patch("apps.mobile_app.tasks.new_shift_swap_request.send_push_notification") as mock_send_push_notification:
notify_user_about_shift_swap_request(shift_swap_request.pk, benefactor.pk)
mock_send_push_notification.assert_called_once()
@ -270,7 +276,7 @@ def test_notify_user_about_shift_swap_request_info_notifications_disabled(
schedule, beneficiary, swap_start=swap_start, swap_end=swap_end, created_at=now
)
with patch("apps.mobile_app.tasks._send_push_notification") as mock_send_push_notification:
with patch("apps.mobile_app.tasks.new_shift_swap_request.send_push_notification") as mock_send_push_notification:
notify_user_about_shift_swap_request(shift_swap_request.pk, benefactor.pk)
mock_send_push_notification.assert_not_called()
@ -297,22 +303,34 @@ def test_should_notify_user(make_organization, make_user, make_schedule, make_sh
# check _should_notify_user_about_shift_swap_request is True when info notifications are disabled
mobile_app_settings = MobileAppUserSettings.objects.create(user=benefactor, info_notifications_enabled=False)
with patch.object(benefactor, "is_in_working_hours", return_value=True):
with patch("apps.mobile_app.tasks._has_user_been_notified_for_shift_swap_request", return_value=False):
with patch(
"apps.mobile_app.tasks.new_shift_swap_request._has_user_been_notified_for_shift_swap_request",
return_value=False,
):
assert _should_notify_user_about_shift_swap_request(shift_swap_request, benefactor, now) is True
mobile_app_settings.info_notifications_enabled = True
mobile_app_settings.save(update_fields=["info_notifications_enabled"])
with patch.object(benefactor, "is_in_working_hours", return_value=True):
with patch("apps.mobile_app.tasks._has_user_been_notified_for_shift_swap_request", return_value=True):
with patch(
"apps.mobile_app.tasks.new_shift_swap_request._has_user_been_notified_for_shift_swap_request",
return_value=True,
):
assert _should_notify_user_about_shift_swap_request(shift_swap_request, benefactor, now) is False
with patch.object(benefactor, "is_in_working_hours", return_value=False):
with patch("apps.mobile_app.tasks._has_user_been_notified_for_shift_swap_request", return_value=False):
with patch(
"apps.mobile_app.tasks.new_shift_swap_request._has_user_been_notified_for_shift_swap_request",
return_value=False,
):
assert _should_notify_user_about_shift_swap_request(shift_swap_request, benefactor, now) is False
with patch.object(benefactor, "is_in_working_hours", return_value=True):
with patch("apps.mobile_app.tasks._has_user_been_notified_for_shift_swap_request", return_value=False):
with patch(
"apps.mobile_app.tasks.new_shift_swap_request._has_user_been_notified_for_shift_swap_request",
return_value=False,
):
assert _should_notify_user_about_shift_swap_request(shift_swap_request, benefactor, now) is True

View file

@ -9,7 +9,7 @@ from rest_framework.test import APIClient
from apps.mobile_app.fcm_relay import FCMRelayThrottler, _get_message_from_request_data, fcm_relay_async
from apps.mobile_app.models import FCMDevice
from apps.mobile_app.tasks import _get_alert_group_escalation_fcm_message
from apps.mobile_app.tasks.new_alert_group import _get_fcm_message
@pytest.mark.django_db
@ -118,7 +118,7 @@ def test_fcm_relay_serialize_deserialize(
make_alert(alert_group=alert_group, raw_request_data={})
# Imitate sending a message to the FCM relay endpoint
original_message = _get_alert_group_escalation_fcm_message(alert_group, user, device, critical=False)
original_message = _get_fcm_message(alert_group, user, device, critical=False)
request_data = json.loads(str(original_message))
# Imitate receiving a message from the FCM relay endpoint

View file

@ -0,0 +1,161 @@
from unittest.mock import Mock, patch
import pytest
from firebase_admin.exceptions import FirebaseError
from requests import HTTPError
from apps.mobile_app import utils
from apps.mobile_app.models import FCMDevice
from apps.oss_installation.models import CloudConnector
MOBILE_APP_BACKEND_ID = 5
CLOUD_LICENSE_NAME = "Cloud"
OPEN_SOURCE_LICENSE_NAME = "OpenSource"
@patch.object(FCMDevice, "send_message", return_value="ok")
@pytest.mark.django_db
def test_send_push_notification_cloud(
mock_send_message,
settings,
make_organization_and_user,
):
# create a user and connect a mobile device
_, user = make_organization_and_user()
device = FCMDevice.objects.create(user=user, registration_id="test_device_id")
mock_message = {"foo": "bar"}
# check FCM is contacted directly when using the cloud license
settings.LICENSE = CLOUD_LICENSE_NAME
settings.IS_OPEN_SOURCE = False
utils.send_push_notification(device, mock_message)
mock_send_message.assert_called_once_with(mock_message)
@patch.object(FCMDevice, "send_message")
@pytest.mark.django_db
def test_send_push_notification_cloud_firebase_error(
mock_send_message,
settings,
make_organization_and_user,
):
mock_send_message.return_value = FirebaseError(code="test_error_code", message="test_error_message")
# create a user and connect a mobile device
_, user = make_organization_and_user()
device = FCMDevice.objects.create(user=user, registration_id="test_device_id")
mock_message = {"foo": "bar"}
# check FCM is contacted directly when using the cloud license
settings.LICENSE = CLOUD_LICENSE_NAME
settings.IS_OPEN_SOURCE = False
with pytest.raises(FirebaseError):
utils.send_push_notification(device, mock_message)
mock_send_message.assert_called_once_with(mock_message)
@patch("apps.mobile_app.utils._send_push_notification_to_fcm_relay", return_value="ok")
@pytest.mark.django_db
def test_send_push_notification_oss(
mock_send_push_notification_to_fcm_relay,
settings,
make_organization_and_user,
):
settings.LICENSE = OPEN_SOURCE_LICENSE_NAME
mock_error_cb = Mock()
# create cloud connection
CloudConnector.objects.create(cloud_url="test")
# create a user and connect a mobile device
_, user = make_organization_and_user()
device = FCMDevice.objects.create(user=user, registration_id="test_device_id")
mock_message = {"foo": "bar"}
utils.send_push_notification(device, mock_message, mock_error_cb)
mock_error_cb.assert_not_called()
mock_send_push_notification_to_fcm_relay.assert_called_once_with(mock_message)
@patch("apps.mobile_app.utils._send_push_notification_to_fcm_relay")
@pytest.mark.django_db
def test_send_push_notification_oss_no_cloud_connector(
mock_send_push_notification_to_fcm_relay,
settings,
make_organization_and_user,
):
settings.LICENSE = OPEN_SOURCE_LICENSE_NAME
mock_error_cb = Mock()
# create a user and connect a mobile device
_, user = make_organization_and_user()
device = FCMDevice.objects.create(user=user, registration_id="test_device_id")
mock_message = {"foo": "bar"}
utils.send_push_notification(device, mock_message, mock_error_cb)
mock_error_cb.assert_called_once_with()
mock_send_push_notification_to_fcm_relay.assert_not_called()
@patch("apps.mobile_app.utils._send_push_notification_to_fcm_relay")
@pytest.mark.django_db
def test_send_push_notification_oss_fcm_relay_returns_client_error(
mock_send_push_notification_to_fcm_relay,
settings,
make_organization_and_user,
):
settings.LICENSE = OPEN_SOURCE_LICENSE_NAME
class MockResponse:
status_code = 400
mock_error_cb = Mock()
mock_send_push_notification_to_fcm_relay.side_effect = HTTPError(response=MockResponse)
# create cloud connection
CloudConnector.objects.create(cloud_url="test")
# create a user and connect a mobile device
_, user = make_organization_and_user()
device = FCMDevice.objects.create(user=user, registration_id="test_device_id")
mock_message = {"foo": "bar"}
utils.send_push_notification(device, mock_message, mock_error_cb)
mock_send_push_notification_to_fcm_relay.assert_called_once_with(mock_message)
@patch("apps.mobile_app.utils._send_push_notification_to_fcm_relay")
@pytest.mark.django_db
def test_send_push_notification_oss_fcm_relay_returns_server_error(
mock_send_push_notification_to_fcm_relay,
settings,
make_organization_and_user,
):
settings.LICENSE = OPEN_SOURCE_LICENSE_NAME
class MockResponse:
status_code = 500
mock_error_cb = Mock()
mock_send_push_notification_to_fcm_relay.side_effect = HTTPError(response=MockResponse)
# create cloud connection
CloudConnector.objects.create(cloud_url="test")
# create a user and connect a mobile device
_, user = make_organization_and_user()
device = FCMDevice.objects.create(user=user, registration_id="test_device_id")
mock_message = {"foo": "bar"}
with pytest.raises(HTTPError):
utils.send_push_notification(device, mock_message, mock_error_cb)
mock_error_cb.assert_not_called()
mock_send_push_notification_to_fcm_relay.assert_called_once_with(mock_message)

View file

@ -0,0 +1,123 @@
import json
import logging
import typing
import requests
from django.conf import settings
from firebase_admin.exceptions import FirebaseError
from firebase_admin.messaging import AndroidConfig, APNSConfig, APNSPayload, Message
from requests import HTTPError
from rest_framework import status
from apps.base.utils import live_settings
from apps.mobile_app.types import FCMMessageData, MessageType
from common.api_helpers.utils import create_engine_url
if typing.TYPE_CHECKING:
from apps.mobile_app.models import FCMDevice
MAX_RETRIES = 1 if settings.DEBUG else 10
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
def _send_push_notification_to_fcm_relay(message: Message) -> requests.Response:
"""
Send push notification to FCM relay on cloud instance: apps.mobile_app.fcm_relay.FCMRelayView
"""
url = create_engine_url("mobile_app/v1/fcm_relay", override_base=settings.GRAFANA_CLOUD_ONCALL_API_URL)
response = requests.post(
url, headers={"Authorization": live_settings.GRAFANA_CLOUD_ONCALL_TOKEN}, json=json.loads(str(message))
)
response.raise_for_status()
return response
def send_push_notification(
device_to_notify: "FCMDevice", message: Message, error_cb: typing.Optional[typing.Callable[..., None]] = None
) -> None:
logger.debug(f"Sending push notification to device type {device_to_notify.type} with message: {message}")
def _error_cb():
if error_cb:
error_cb()
if settings.IS_OPEN_SOURCE:
# FCM relay uses cloud connection to send push notifications
from apps.oss_installation.models import CloudConnector
if not CloudConnector.objects.exists():
_error_cb()
logger.error("Error while sending a mobile push notification: not connected to cloud")
return
try:
response = _send_push_notification_to_fcm_relay(message)
logger.debug(f"FCM relay response: {response}")
except HTTPError as e:
if status.HTTP_400_BAD_REQUEST <= e.response.status_code < status.HTTP_500_INTERNAL_SERVER_ERROR:
# do not retry on HTTP client errors (4xx errors)
_error_cb()
logger.error(
f"Error while sending a mobile push notification: HTTP client error {e.response.status_code}"
)
return
else:
raise
else:
# https://firebase.google.com/docs/cloud-messaging/http-server-ref#interpret-downstream
response = device_to_notify.send_message(message)
logger.debug(f"FCM response: {response}")
if isinstance(response, FirebaseError):
raise response
def construct_fcm_message(
message_type: MessageType,
device_to_notify: "FCMDevice",
thread_id: str,
data: FCMMessageData,
apns_payload: typing.Optional[APNSPayload] = None,
) -> Message:
apns_config_kwargs = {}
if apns_payload is not None:
apns_config_kwargs["payload"] = apns_payload
return Message(
token=device_to_notify.registration_id,
data={
# from the docs..
# A dictionary of data fields (optional). All keys and values in the dictionary must be strings
**data,
"type": message_type,
"thread_id": thread_id,
},
android=AndroidConfig(
# from the docs
# https://firebase.google.com/docs/cloud-messaging/concept-options#setting-the-priority-of-a-message
#
# Normal priority.
# Normal priority messages are delivered immediately when the app is in the foreground.
# For backgrounded apps, delivery may be delayed. For less time-sensitive messages, such as notifications
# of new email, keeping your UI in sync, or syncing app data in the background, choose normal delivery
# priority.
#
# High priority.
# FCM attempts to deliver high priority messages immediately even if the device is in Doze mode.
# High priority messages are for time-sensitive, user visible content.
priority="high",
),
apns=APNSConfig(
**apns_config_kwargs,
headers={
# From the docs
# https://firebase.google.com/docs/cloud-messaging/concept-options#setting-the-priority-of-a-message
"apns-priority": "10",
},
),
)