From 181d5d5712d57fcce1be7345c72c42d76e0fff34 Mon Sep 17 00:00:00 2001 From: Matias Bordese Date: Thu, 4 Jan 2024 12:34:28 -0300 Subject: [PATCH] Setup one-at-a-time lock for sync_organization tasks (#3612) Related to https://github.com/grafana/support-escalations/issues/8844 Queuing multiple sync_organization tasks for the same org could lead to parallel running of the sync task for the same organization, potentially creating duplicated entries and/or generating multiple unneeded API calls. This prevents running an organization sync while there is a sync for that same org in progress. --- CHANGELOG.md | 1 + engine/apps/user_management/sync.py | 14 +++++ .../apps/user_management/tests/test_sync.py | 52 ++++++++++++++----- engine/common/utils.py | 26 ++++++++++ 4 files changed, 79 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a998ad9f..f7378467 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Handle message to reply to not found in Telegram send log ([#3587](https://github.com/grafana/oncall/pull/3587)) - Upgrade mobx lib to the latest version 6.12.0 ([#3453](https://github.com/grafana/oncall/issues/3453)) +- Add task lock to avoid running multiple sync_organization tasks in parallel for the same org ([#3612](https://github.com/grafana/oncall/pull/3612)) ## v1.3.81 (2023-12-28) diff --git a/engine/apps/user_management/sync.py b/engine/apps/user_management/sync.py index f28a3a75..94be2ebc 100644 --- a/engine/apps/user_management/sync.py +++ b/engine/apps/user_management/sync.py @@ -1,4 +1,5 @@ import logging +import uuid from celery.utils.log import get_task_logger from django.conf import settings @@ -7,12 +8,25 @@ from django.utils import timezone from apps.grafana_plugin.helpers.client import GcomAPIClient, GrafanaAPIClient from apps.user_management.models import Organization, Team, User from apps.user_management.signals import org_sync_signal +from common.utils import task_lock logger = get_task_logger(__name__) logger.setLevel(logging.DEBUG) def sync_organization(organization: Organization) -> None: + # ensure one sync task is running at most for a given org at a given time + lock_id = "sync-organization-lock-{}".format(organization.id) + random_value = str(uuid.uuid4()) + with task_lock(lock_id, random_value) as acquired: + if acquired: + _sync_organization(organization) + else: + # sync already running + logger.info(f"Sync for Organization {organization.pk} already in progress.") + + +def _sync_organization(organization: Organization) -> None: grafana_api_client = GrafanaAPIClient(api_url=organization.grafana_url, api_token=organization.api_token) # NOTE: checking whether or not RBAC is enabled depends on whether we are dealing with an open-source or cloud diff --git a/engine/apps/user_management/tests/test_sync.py b/engine/apps/user_management/tests/test_sync.py index d6808815..20013ba9 100644 --- a/engine/apps/user_management/tests/test_sync.py +++ b/engine/apps/user_management/tests/test_sync.py @@ -326,10 +326,13 @@ def test_sync_organization_is_rbac_permissions_enabled_open_source(make_organiza @pytest.mark.parametrize("gcom_api_response", [False, True]) @patch("apps.user_management.sync.GcomAPIClient") +@patch("common.utils.cache") @override_settings(LICENSE=settings.CLOUD_LICENSE_NAME) @override_settings(GRAFANA_COM_ADMIN_API_TOKEN="mockedToken") @pytest.mark.django_db -def test_sync_organization_is_rbac_permissions_enabled_cloud(mocked_gcom_client, make_organization, gcom_api_response): +def test_sync_organization_is_rbac_permissions_enabled_cloud( + mock_cache, mocked_gcom_client, make_organization, gcom_api_response +): stack_id = 5 organization = make_organization(stack_id=stack_id) @@ -369,22 +372,27 @@ def test_sync_organization_is_rbac_permissions_enabled_cloud(mocked_gcom_client, }, ) - with patch.object(GrafanaAPIClient, "check_token", return_value=(None, api_check_token_call_status)): - with patch.object(GrafanaAPIClient, "get_users", return_value=api_users_response): - with patch.object(GrafanaAPIClient, "get_teams", return_value=(api_teams_response, None)): - with patch.object(GrafanaAPIClient, "get_team_members", return_value=(api_members_response, None)): - with patch.object( - GrafanaAPIClient, - "get_grafana_incident_plugin_settings", - return_value=( - {"enabled": True, "jsonData": {"backendUrl": MOCK_GRAFANA_INCIDENT_BACKEND_URL}}, - None, - ), - ): - sync_organization(organization) + random_uuid = "random" + with patch("apps.user_management.sync.uuid.uuid4", return_value=random_uuid): + with patch.object(GrafanaAPIClient, "check_token", return_value=(None, api_check_token_call_status)): + with patch.object(GrafanaAPIClient, "get_users", return_value=api_users_response): + with patch.object(GrafanaAPIClient, "get_teams", return_value=(api_teams_response, None)): + with patch.object(GrafanaAPIClient, "get_team_members", return_value=(api_members_response, None)): + with patch.object( + GrafanaAPIClient, + "get_grafana_incident_plugin_settings", + return_value=( + {"enabled": True, "jsonData": {"backendUrl": MOCK_GRAFANA_INCIDENT_BACKEND_URL}}, + None, + ), + ): + sync_organization(organization) organization.refresh_from_db() + # lock is set and released + mock_cache.add.assert_called_once_with(f"sync-organization-lock-{organization.id}", random_uuid, 60 * 10) + mock_cache.delete.assert_called_once_with(f"sync-organization-lock-{organization.id}") assert mocked_gcom_client.return_value.called_once_with("mockedToken") assert mocked_gcom_client.return_value.is_rbac_enabled_for_stack.called_once_with(stack_id) assert organization.is_rbac_permissions_enabled == gcom_api_response @@ -433,3 +441,19 @@ def test_cleanup_organization_deleted(make_organization): organization.refresh_from_db() assert organization.deleted_at is not None + + +@pytest.mark.django_db +def test_sync_organization_lock(make_organization): + organization = make_organization() + + random_uuid = "random" + with patch("apps.user_management.sync.GrafanaAPIClient") as mock_client: + with patch("apps.user_management.sync.uuid.uuid4", return_value=random_uuid): + with patch("apps.user_management.sync.task_lock") as mock_task_lock: + # lock couldn't be acquired + mock_task_lock.return_value.__enter__.return_value = False + sync_organization(organization) + + mock_task_lock.assert_called_once_with(f"sync-organization-lock-{organization.id}", random_uuid) + assert not mock_client.called diff --git a/engine/common/utils.py b/engine/common/utils.py index 7b156636..a157bbd7 100644 --- a/engine/common/utils.py +++ b/engine/common/utils.py @@ -5,6 +5,7 @@ import os import random import re import time +from contextlib import contextmanager from functools import reduce import factory @@ -12,6 +13,7 @@ import markdown2 from bs4 import BeautifulSoup from celery.utils.log import get_task_logger from celery.utils.time import get_exponential_backoff_interval +from django.core.cache import cache from django.utils.html import urlize logger = get_task_logger(__name__) @@ -73,6 +75,30 @@ class OkToRetry: ) +LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes + + +# Context manager for tasks that are intended to run once at a time +# (ie. no parallel instances of the same task running) +# based on https://docs.celeryq.dev/en/stable/tutorials/task-cookbook.html#ensuring-a-task-is-only-executed-one-at-a-time +@contextmanager +def task_lock(lock_id, oid): + timeout_at = time.monotonic() + LOCK_EXPIRE - 3 + # cache.add returns False if the key already exists + status = cache.add(lock_id, oid, LOCK_EXPIRE) + try: + yield status + finally: + # cache delete may be slow, but we have to use it to take + # advantage of using add() for atomic locking + if time.monotonic() < timeout_at and status: + # don't release the lock if we exceeded the timeout + # to lessen the chance of releasing an expired lock + # owned by someone else + # also don't release the lock if we didn't acquire it + cache.delete(lock_id) + + # lru cache version with addition of timeout. # Timeout added to not to occupy memory with too old values def timed_lru_cache(timeout: int, maxsize: int = 128, typed: bool = False):