Setup one-at-a-time lock for sync_organization tasks (#3612)

Related to https://github.com/grafana/support-escalations/issues/8844

Queuing multiple sync_organization tasks for the same org could lead to
parallel running of the sync task for the same organization, potentially
creating duplicated entries and/or generating multiple unneeded API
calls. This prevents running an organization sync while there is a sync
for that same org in progress.
This commit is contained in:
Matias Bordese 2024-01-04 12:34:28 -03:00 committed by GitHub
parent 0a39f90979
commit 181d5d5712
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 79 additions and 14 deletions

View file

@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Handle message to reply to not found in Telegram send log ([#3587](https://github.com/grafana/oncall/pull/3587))
- Upgrade mobx lib to the latest version 6.12.0 ([#3453](https://github.com/grafana/oncall/issues/3453))
- Add task lock to avoid running multiple sync_organization tasks in parallel for the same org ([#3612](https://github.com/grafana/oncall/pull/3612))
## v1.3.81 (2023-12-28)

View file

@ -1,4 +1,5 @@
import logging
import uuid
from celery.utils.log import get_task_logger
from django.conf import settings
@ -7,12 +8,25 @@ from django.utils import timezone
from apps.grafana_plugin.helpers.client import GcomAPIClient, GrafanaAPIClient
from apps.user_management.models import Organization, Team, User
from apps.user_management.signals import org_sync_signal
from common.utils import task_lock
logger = get_task_logger(__name__)
logger.setLevel(logging.DEBUG)
def sync_organization(organization: Organization) -> None:
# ensure one sync task is running at most for a given org at a given time
lock_id = "sync-organization-lock-{}".format(organization.id)
random_value = str(uuid.uuid4())
with task_lock(lock_id, random_value) as acquired:
if acquired:
_sync_organization(organization)
else:
# sync already running
logger.info(f"Sync for Organization {organization.pk} already in progress.")
def _sync_organization(organization: Organization) -> None:
grafana_api_client = GrafanaAPIClient(api_url=organization.grafana_url, api_token=organization.api_token)
# NOTE: checking whether or not RBAC is enabled depends on whether we are dealing with an open-source or cloud

View file

@ -326,10 +326,13 @@ def test_sync_organization_is_rbac_permissions_enabled_open_source(make_organiza
@pytest.mark.parametrize("gcom_api_response", [False, True])
@patch("apps.user_management.sync.GcomAPIClient")
@patch("common.utils.cache")
@override_settings(LICENSE=settings.CLOUD_LICENSE_NAME)
@override_settings(GRAFANA_COM_ADMIN_API_TOKEN="mockedToken")
@pytest.mark.django_db
def test_sync_organization_is_rbac_permissions_enabled_cloud(mocked_gcom_client, make_organization, gcom_api_response):
def test_sync_organization_is_rbac_permissions_enabled_cloud(
mock_cache, mocked_gcom_client, make_organization, gcom_api_response
):
stack_id = 5
organization = make_organization(stack_id=stack_id)
@ -369,22 +372,27 @@ def test_sync_organization_is_rbac_permissions_enabled_cloud(mocked_gcom_client,
},
)
with patch.object(GrafanaAPIClient, "check_token", return_value=(None, api_check_token_call_status)):
with patch.object(GrafanaAPIClient, "get_users", return_value=api_users_response):
with patch.object(GrafanaAPIClient, "get_teams", return_value=(api_teams_response, None)):
with patch.object(GrafanaAPIClient, "get_team_members", return_value=(api_members_response, None)):
with patch.object(
GrafanaAPIClient,
"get_grafana_incident_plugin_settings",
return_value=(
{"enabled": True, "jsonData": {"backendUrl": MOCK_GRAFANA_INCIDENT_BACKEND_URL}},
None,
),
):
sync_organization(organization)
random_uuid = "random"
with patch("apps.user_management.sync.uuid.uuid4", return_value=random_uuid):
with patch.object(GrafanaAPIClient, "check_token", return_value=(None, api_check_token_call_status)):
with patch.object(GrafanaAPIClient, "get_users", return_value=api_users_response):
with patch.object(GrafanaAPIClient, "get_teams", return_value=(api_teams_response, None)):
with patch.object(GrafanaAPIClient, "get_team_members", return_value=(api_members_response, None)):
with patch.object(
GrafanaAPIClient,
"get_grafana_incident_plugin_settings",
return_value=(
{"enabled": True, "jsonData": {"backendUrl": MOCK_GRAFANA_INCIDENT_BACKEND_URL}},
None,
),
):
sync_organization(organization)
organization.refresh_from_db()
# lock is set and released
mock_cache.add.assert_called_once_with(f"sync-organization-lock-{organization.id}", random_uuid, 60 * 10)
mock_cache.delete.assert_called_once_with(f"sync-organization-lock-{organization.id}")
assert mocked_gcom_client.return_value.called_once_with("mockedToken")
assert mocked_gcom_client.return_value.is_rbac_enabled_for_stack.called_once_with(stack_id)
assert organization.is_rbac_permissions_enabled == gcom_api_response
@ -433,3 +441,19 @@ def test_cleanup_organization_deleted(make_organization):
organization.refresh_from_db()
assert organization.deleted_at is not None
@pytest.mark.django_db
def test_sync_organization_lock(make_organization):
organization = make_organization()
random_uuid = "random"
with patch("apps.user_management.sync.GrafanaAPIClient") as mock_client:
with patch("apps.user_management.sync.uuid.uuid4", return_value=random_uuid):
with patch("apps.user_management.sync.task_lock") as mock_task_lock:
# lock couldn't be acquired
mock_task_lock.return_value.__enter__.return_value = False
sync_organization(organization)
mock_task_lock.assert_called_once_with(f"sync-organization-lock-{organization.id}", random_uuid)
assert not mock_client.called

View file

@ -5,6 +5,7 @@ import os
import random
import re
import time
from contextlib import contextmanager
from functools import reduce
import factory
@ -12,6 +13,7 @@ import markdown2
from bs4 import BeautifulSoup
from celery.utils.log import get_task_logger
from celery.utils.time import get_exponential_backoff_interval
from django.core.cache import cache
from django.utils.html import urlize
logger = get_task_logger(__name__)
@ -73,6 +75,30 @@ class OkToRetry:
)
LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes
# Context manager for tasks that are intended to run once at a time
# (ie. no parallel instances of the same task running)
# based on https://docs.celeryq.dev/en/stable/tutorials/task-cookbook.html#ensuring-a-task-is-only-executed-one-at-a-time
@contextmanager
def task_lock(lock_id, oid):
timeout_at = time.monotonic() + LOCK_EXPIRE - 3
# cache.add returns False if the key already exists
status = cache.add(lock_id, oid, LOCK_EXPIRE)
try:
yield status
finally:
# cache delete may be slow, but we have to use it to take
# advantage of using add() for atomic locking
if time.monotonic() < timeout_at and status:
# don't release the lock if we exceeded the timeout
# to lessen the chance of releasing an expired lock
# owned by someone else
# also don't release the lock if we didn't acquire it
cache.delete(lock_id)
# lru cache version with addition of timeout.
# Timeout added to not to occupy memory with too old values
def timed_lru_cache(timeout: int, maxsize: int = 128, typed: bool = False):