diff --git a/engine/apps/metrics_exporter/helpers.py b/engine/apps/metrics_exporter/helpers.py index ec979f5b..b8b48ede 100644 --- a/engine/apps/metrics_exporter/helpers.py +++ b/engine/apps/metrics_exporter/helpers.py @@ -22,6 +22,7 @@ from apps.metrics_exporter.constants import ( RecalculateMetricsTimer, UserWasNotifiedOfAlertGroupsMetricsDict, ) +from common.cache import ensure_cache_key_allocates_to_the_same_hash_slot if typing.TYPE_CHECKING: from apps.alerts.models import AlertReceiveChannel @@ -98,24 +99,27 @@ def get_metrics_cache_timeout(organization_id): def get_metrics_cache_timer_key(organization_id) -> str: - return f"{METRICS_CACHE_TIMER}_{organization_id}" - - -def get_metrics_cache_timer_for_organization(organization_id): - key = get_metrics_cache_timer_key(organization_id) - return cache.get(key) + return ensure_cache_key_allocates_to_the_same_hash_slot( + f"{METRICS_CACHE_TIMER}_{organization_id}", METRICS_CACHE_TIMER + ) def get_metric_alert_groups_total_key(organization_id) -> str: - return f"{ALERT_GROUPS_TOTAL}_{organization_id}" + return ensure_cache_key_allocates_to_the_same_hash_slot( + f"{ALERT_GROUPS_TOTAL}_{organization_id}", ALERT_GROUPS_TOTAL + ) def get_metric_alert_groups_response_time_key(organization_id) -> str: - return f"{ALERT_GROUPS_RESPONSE_TIME}_{organization_id}" + return ensure_cache_key_allocates_to_the_same_hash_slot( + f"{ALERT_GROUPS_RESPONSE_TIME}_{organization_id}", ALERT_GROUPS_RESPONSE_TIME + ) def get_metric_user_was_notified_of_alert_groups_key(organization_id) -> str: - return f"{USER_WAS_NOTIFIED_OF_ALERT_GROUPS}_{organization_id}" + return ensure_cache_key_allocates_to_the_same_hash_slot( + f"{USER_WAS_NOTIFIED_OF_ALERT_GROUPS}_{organization_id}", USER_WAS_NOTIFIED_OF_ALERT_GROUPS + ) def get_metric_calculation_started_key(metric_name) -> str: diff --git a/engine/apps/mobile_app/tasks/going_oncall_notification.py b/engine/apps/mobile_app/tasks/going_oncall_notification.py index 05406e10..9d58a707 100644 --- a/engine/apps/mobile_app/tasks/going_oncall_notification.py +++ b/engine/apps/mobile_app/tasks/going_oncall_notification.py @@ -15,6 +15,7 @@ from apps.mobile_app.types import FCMMessageData, MessageType, Platform from apps.mobile_app.utils import MAX_RETRIES, construct_fcm_message, send_push_notification from apps.schedules.models.on_call_schedule import OnCallSchedule, ScheduleEvent from apps.user_management.models import User +from common.cache import ensure_cache_key_allocates_to_the_same_hash_slot from common.custom_celery_tasks import shared_dedicated_queue_retry_task from common.l10n import format_localized_datetime, format_localized_time @@ -164,7 +165,10 @@ def _should_we_send_push_notification( def _generate_cache_key(user_pk: str, schedule_event: ScheduleEvent) -> str: - return f"going_oncall_push_notification:{user_pk}:{schedule_event['shift']['pk']}" + KEY_PREFIX = "going_oncall_push_notification" + return ensure_cache_key_allocates_to_the_same_hash_slot( + f"{KEY_PREFIX}:{user_pk}:{schedule_event['shift']['pk']}", KEY_PREFIX + ) @shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=MAX_RETRIES) diff --git a/engine/apps/schedules/ical_utils.py b/engine/apps/schedules/ical_utils.py index e39874fd..9dede8fc 100644 --- a/engine/apps/schedules/ical_utils.py +++ b/engine/apps/schedules/ical_utils.py @@ -35,6 +35,7 @@ from apps.schedules.constants import ( RE_PRIORITY, ) from apps.schedules.ical_events import ical_events +from common.cache import ensure_cache_key_allocates_to_the_same_hash_slot from common.timezones import is_valid_timezone from common.utils import timed_lru_cache @@ -403,15 +404,24 @@ def get_cached_oncall_users_for_multiple_schedules(schedules: typing.List["OnCal from apps.schedules.models import OnCallSchedule from apps.user_management.models import User + CACHE_KEY_PREFIX = "schedule_oncall_users_" + def _generate_cache_key_for_schedule_oncall_users(schedule: "OnCallSchedule") -> str: - return f"schedule_{schedule.public_primary_key}_oncall_users" + return ensure_cache_key_allocates_to_the_same_hash_slot( + f"{CACHE_KEY_PREFIX}{schedule.public_primary_key}", CACHE_KEY_PREFIX + ) def _get_schedule_public_primary_key_from_schedule_oncall_users_cache_key(cache_key: str) -> str: - return cache_key.replace("schedule_", "").replace("_oncall_users", "") + """ + remove any brackets that might be included in the cache key (when redis cluster is active). + See `_generate_cache_key_for_schedule_oncall_users` just above + """ + cache_key = cache_key.replace("{", "").replace("}", "") + return cache_key.replace(CACHE_KEY_PREFIX, "") CACHE_TTL = 15 * 60 # 15 minutes in seconds - cache_keys: typing.List[str] = [_generate_cache_key_for_schedule_oncall_users(schedule) for schedule in schedules] + cache_keys = [_generate_cache_key_for_schedule_oncall_users(schedule) for schedule in schedules] # get_many returns a dictionary with all the keys we asked for that actually exist # in the cache (and haven’t expired) diff --git a/engine/apps/schedules/tests/test_ical_utils.py b/engine/apps/schedules/tests/test_ical_utils.py index 97e98b40..3b6cf517 100644 --- a/engine/apps/schedules/tests/test_ical_utils.py +++ b/engine/apps/schedules/tests/test_ical_utils.py @@ -584,7 +584,7 @@ def test_get_cached_oncall_users_for_multiple_schedules( return users, (schedule1, schedule2, schedule3) def _generate_cache_key(schedule): - return f"schedule_{schedule.public_primary_key}_oncall_users" + return f"schedule_oncall_users_{schedule.public_primary_key}" # scenario: nothing is cached, need to recalculate everything and cache it users, schedules = _test_setup() diff --git a/engine/common/cache.py b/engine/common/cache.py new file mode 100644 index 00000000..94047a66 --- /dev/null +++ b/engine/common/cache.py @@ -0,0 +1,41 @@ +import typing + +from django.conf import settings + +_RT = typing.TypeVar("_RT", str, typing.List[str], typing.Dict[str, typing.Any]) + + +def ensure_cache_key_allocates_to_the_same_hash_slot(cache_keys: _RT, pattern_to_wrap_in_brackets: str) -> _RT: + """ + This method will ensure that when using Redis Cluster, multiple cache keys will be allocated to the same hash slot. + This ensures that multi-key operations (ex `cache.get_many` and `cache.set_many`) will work without raising this + exception: + + ``` + File "/usr/local/lib/python3.11/site-packages/redis/cluster.py", line 1006, in determine_slot + raise RedisClusterException( + redis.exceptions.RedisClusterException: MGET - all keys must map to the same key slot + ``` + + From the Redis Cluster [docs](https://redis.io/docs/reference/cluster-spec/#hash-tags): + + There is an exception for the computation of the hash slot that is used in order to implement hash tags. + Hash tags are a way to ensure that multiple keys are allocated in the same hash slot. + This is used in order to implement multi-key operations in Redis Cluster. + + To implement hash tags, the hash slot for a key is computed in a slightly different way in certain conditions. + If the key contains a "{...}" pattern only the substring between { and } is hashed in order to obtain the hash slot. + However since it is possible that there are multiple occurrences of { or } the algorithm is well specified by the + following rules: + """ + if not settings.USE_REDIS_CLUSTER: + return cache_keys + + def _replace_key(key: str) -> str: + return key.replace(pattern_to_wrap_in_brackets, f"{{{pattern_to_wrap_in_brackets}}}") + + if isinstance(cache_keys, str): + return _replace_key(cache_keys) + elif isinstance(cache_keys, dict): + return {_replace_key(key): value for key, value in cache_keys.items()} + return [_replace_key(key) for key in cache_keys] diff --git a/engine/common/tests/test_cache.py b/engine/common/tests/test_cache.py new file mode 100644 index 00000000..6a4ab193 --- /dev/null +++ b/engine/common/tests/test_cache.py @@ -0,0 +1,51 @@ +from django.test import override_settings + +from common.cache import ensure_cache_key_allocates_to_the_same_hash_slot + +PATTERN = "schedule_oncall_users" +NON_EXISTENT_PATTERN = "nmzxcnvmzxcv" +NUM_CACHE_KEYS = 5 +SINGLE_CACHE_KEY = f"{PATTERN}_0" +CACHE_KEYS = [f"{PATTERN}_{pk}" for pk in range(NUM_CACHE_KEYS)] +SET_MANY_CACHE_KEYS_DICT = {k: "foo" for k in CACHE_KEYS} + + +def test_ensure_cache_key_allocates_to_the_same_hash_slot() -> None: + def _convert_key(key: str) -> str: + return key.replace(PATTERN, f"{{{PATTERN}}}") + + # when USE_REDIS_CLUSTER is False the method should just return the cache keys + with override_settings(USE_REDIS_CLUSTER=False): + assert ensure_cache_key_allocates_to_the_same_hash_slot(SINGLE_CACHE_KEY, PATTERN) == SINGLE_CACHE_KEY + assert ensure_cache_key_allocates_to_the_same_hash_slot(CACHE_KEYS, PATTERN) == CACHE_KEYS + assert ( + ensure_cache_key_allocates_to_the_same_hash_slot(SET_MANY_CACHE_KEYS_DICT, PATTERN) + == SET_MANY_CACHE_KEYS_DICT + ) + + # when USE_REDIS_CLUSTER is True the method should wrap the specified pattern within the cache keys in curly brackets + with override_settings(USE_REDIS_CLUSTER=True): + # works with a single str cache key + assert ensure_cache_key_allocates_to_the_same_hash_slot(SINGLE_CACHE_KEY, PATTERN) == _convert_key( + SINGLE_CACHE_KEY + ) + + # works with a list (useful for cache.get_many operations) + assert ensure_cache_key_allocates_to_the_same_hash_slot(CACHE_KEYS, PATTERN) == [ + _convert_key(k) for k in CACHE_KEYS + ] + + # works with a dict (useful for cache.set_many operations) + assert ensure_cache_key_allocates_to_the_same_hash_slot(SET_MANY_CACHE_KEYS_DICT, PATTERN) == { + _convert_key(k): v for k, v in SET_MANY_CACHE_KEYS_DICT.items() + } + + # if the pattern doesn't exist, we don't wrap it in brackets + assert ( + ensure_cache_key_allocates_to_the_same_hash_slot(SINGLE_CACHE_KEY, NON_EXISTENT_PATTERN) == SINGLE_CACHE_KEY + ) + assert ensure_cache_key_allocates_to_the_same_hash_slot(CACHE_KEYS, NON_EXISTENT_PATTERN) == CACHE_KEYS + assert ( + ensure_cache_key_allocates_to_the_same_hash_slot(SET_MANY_CACHE_KEYS_DICT, NON_EXISTENT_PATTERN) + == SET_MANY_CACHE_KEYS_DICT + ) diff --git a/engine/settings/base.py b/engine/settings/base.py index 5b6bf177..9dbc684e 100644 --- a/engine/settings/base.py +++ b/engine/settings/base.py @@ -194,6 +194,7 @@ REDIS_URI = os.getenv("REDIS_URI") if not REDIS_URI: REDIS_URI = f"{REDIS_PROTOCOL}://{REDIS_USERNAME}:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/{REDIS_DATABASE}" +USE_REDIS_CLUSTER = getenv_boolean("USE_REDIS_CLUSTER", default=False) REDIS_USE_SSL = os.getenv("REDIS_USE_SSL") REDIS_SSL_CONFIG = {}