Make sync settings configurable (#5002)
# What this PR does Add settings for how sync jobs get split up to control throughput of requests. - `SYNC_V2_MAX_TASKS ` controls how many tasks can run concurrently - `SYNC_V2_PERIOD_SECONDS` controls the time offset before starting another set of tasks each time `SYNC_V2_MAX_TASKS` is reached - `SYNC_V2_BATCH_SIZE` controls how many organizations will be sync'd per task ## Which issue(s) this PR closes Related to [issue link here] <!-- *Note*: If you want the issue to be auto-closed once the PR is merged, change "Related to" to "Closes" in the line above. If you have more than one GitHub issue that this PR closes, be sure to preface each issue link with a [closing keyword](https://docs.github.com/en/get-started/writing-on-github/working-with-advanced-formatting/using-keywords-in-issues-and-pull-requests#linking-a-pull-request-to-an-issue). This ensures that the issue(s) are auto-closed once the PR has been merged. --> ## Checklist - [x] Unit, integration, and e2e (if applicable) tests updated - [x] Documentation added (or `pr:no public docs` PR label added if not required) - [x] Added the relevant release notes label (see labels prefixed w/ `release:`). These labels dictate how your PR will show up in the autogenerated release notes.
This commit is contained in:
parent
c0a84291e6
commit
d1cb862125
3 changed files with 46 additions and 13 deletions
|
|
@ -1,7 +1,7 @@
|
|||
import logging
|
||||
|
||||
from celery.utils.log import get_task_logger
|
||||
from django.utils import timezone
|
||||
from django.conf import settings
|
||||
|
||||
from apps.grafana_plugin.helpers.client import GrafanaAPIClient
|
||||
from apps.grafana_plugin.helpers.gcom import get_active_instance_ids
|
||||
|
|
@ -12,10 +12,6 @@ logger = get_task_logger(__name__)
|
|||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
SYNC_PERIOD = timezone.timedelta(minutes=4)
|
||||
SYNC_BATCH_SIZE = 500
|
||||
|
||||
|
||||
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=0)
|
||||
def start_sync_organizations_v2():
|
||||
organization_qs = Organization.objects.all()
|
||||
|
|
@ -30,20 +26,22 @@ def start_sync_organizations_v2():
|
|||
|
||||
logger.info(f"Found {len(organization_qs)} active organizations")
|
||||
batch = []
|
||||
batch_index = 0
|
||||
task_countdown_seconds = 0
|
||||
for org in organization_qs:
|
||||
if GrafanaAPIClient.validate_grafana_token_format(org.api_token):
|
||||
batch.append(org.pk)
|
||||
if len(batch) == SYNC_BATCH_SIZE:
|
||||
sync_organizations_v2.apply_async(
|
||||
(batch,),
|
||||
)
|
||||
if len(batch) == settings.SYNC_V2_BATCH_SIZE:
|
||||
sync_organizations_v2.apply_async((batch,), countdown=task_countdown_seconds)
|
||||
batch = []
|
||||
batch_index += 1
|
||||
if batch_index == settings.SYNC_V2_MAX_TASKS:
|
||||
batch_index = 0
|
||||
task_countdown_seconds += settings.SYNC_V2_PERIOD_SECONDS
|
||||
else:
|
||||
logger.info(f"Skipping stack_slug={org.stack_slug}, api_token format is invalid or not set")
|
||||
if batch:
|
||||
sync_organizations_v2.apply_async(
|
||||
(batch,),
|
||||
)
|
||||
sync_organizations_v2.apply_async((batch,), countdown=task_countdown_seconds)
|
||||
|
||||
|
||||
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=0)
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
import gzip
|
||||
import json
|
||||
from dataclasses import asdict
|
||||
from unittest.mock import patch
|
||||
from unittest.mock import call, patch
|
||||
|
||||
import pytest
|
||||
from django.urls import reverse
|
||||
|
|
@ -159,3 +159,34 @@ def test_sync_team_serialization(test_team, validation_pass):
|
|||
except ValidationError as e:
|
||||
validation_error = e
|
||||
assert (validation_error is None) == validation_pass
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_sync_batch_tasks(make_organization, settings):
|
||||
settings.SYNC_V2_MAX_TASKS = 2
|
||||
settings.SYNC_V2_PERIOD_SECONDS = 10
|
||||
settings.SYNC_V2_BATCH_SIZE = 2
|
||||
|
||||
for _ in range(9):
|
||||
make_organization(api_token="glsa_abcdefghijklmnopqrstuvwxyz")
|
||||
|
||||
expected_calls = [
|
||||
call(size=2, countdown=0),
|
||||
call(size=2, countdown=0),
|
||||
call(size=2, countdown=10),
|
||||
call(size=2, countdown=10),
|
||||
call(size=1, countdown=20),
|
||||
]
|
||||
with patch("apps.grafana_plugin.tasks.sync_v2.sync_organizations_v2.apply_async", return_value=None) as mock_sync:
|
||||
start_sync_organizations_v2()
|
||||
|
||||
def check_call(actual, expected):
|
||||
return (
|
||||
len(actual.args[0][0]) == expected.kwargs["size"]
|
||||
and actual.kwargs["countdown"] == expected.kwargs["countdown"]
|
||||
)
|
||||
|
||||
for actual_call, expected_call in zip(mock_sync.call_args_list, expected_calls):
|
||||
assert check_call(actual_call, expected_call)
|
||||
|
||||
assert mock_sync.call_count == len(expected_calls)
|
||||
|
|
|
|||
|
|
@ -963,3 +963,7 @@ EXOTEL_SMS_DLT_ENTITY_ID = os.getenv("EXOTEL_SMS_DLT_ENTITY_ID", None)
|
|||
DETACHED_INTEGRATIONS_SERVER = getenv_boolean("DETACHED_INTEGRATIONS_SERVER", default=False)
|
||||
|
||||
ACKNOWLEDGE_REMINDER_TASK_EXPIRY_DAYS = os.environ.get("ACKNOWLEDGE_REMINDER_TASK_EXPIRY_DAYS", default=14)
|
||||
|
||||
SYNC_V2_MAX_TASKS = getenv_integer("SYNC_V2_MAX_TASKS", 10)
|
||||
SYNC_V2_PERIOD_SECONDS = getenv_integer("SYNC_V2_PERIOD_SECONDS", 300)
|
||||
SYNC_V2_BATCH_SIZE = getenv_integer("SYNC_V2_BATCH_SIZE", 500)
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue