2022-06-03 08:09:47 -06:00
|
|
|
import logging
|
|
|
|
|
|
|
|
|
|
from celery.utils.log import get_task_logger
|
|
|
|
|
from django.conf import settings
|
|
|
|
|
from django.utils import timezone
|
|
|
|
|
|
2024-02-27 10:34:41 -07:00
|
|
|
from apps.alerts.models import AlertReceiveChannel
|
2022-06-03 08:09:47 -06:00
|
|
|
from apps.grafana_plugin.helpers import GcomAPIClient
|
2023-01-24 13:44:07 +08:00
|
|
|
from apps.grafana_plugin.helpers.client import GrafanaAPIClient
|
2022-10-24 21:25:32 -06:00
|
|
|
from apps.grafana_plugin.helpers.gcom import get_active_instance_ids, get_deleted_instance_ids, get_stack_regions
|
2022-06-03 08:09:47 -06:00
|
|
|
from apps.user_management.models import Organization
|
2022-10-24 21:25:32 -06:00
|
|
|
from apps.user_management.models.region import sync_regions
|
2023-01-24 13:44:07 +08:00
|
|
|
from apps.user_management.sync import cleanup_organization, sync_organization, sync_team_members
|
2022-06-03 08:09:47 -06:00
|
|
|
from common.custom_celery_tasks import shared_dedicated_queue_retry_task
|
|
|
|
|
|
|
|
|
|
logger = get_task_logger(__name__)
|
|
|
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
|
|
|
|
|
|
# celery beat will schedule start_sync_organizations for every 30 minutes
|
|
|
|
|
# to make sure that orgs are synced every 30 minutes, SYNC_PERIOD should be a little lower
|
|
|
|
|
SYNC_PERIOD = timezone.timedelta(minutes=25)
|
2022-09-07 07:58:44 -06:00
|
|
|
INACTIVE_PERIOD = timezone.timedelta(minutes=55)
|
2024-07-16 07:59:08 -06:00
|
|
|
CLEANUP_PERIOD = timezone.timedelta(hours=13)
|
2022-06-03 08:09:47 -06:00
|
|
|
|
|
|
|
|
|
2023-05-01 20:33:26 -06:00
|
|
|
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=0)
|
2022-06-03 08:09:47 -06:00
|
|
|
def start_sync_organizations():
|
|
|
|
|
sync_threshold = timezone.now() - SYNC_PERIOD
|
|
|
|
|
|
2024-08-22 12:09:28 -03:00
|
|
|
organization_qs = Organization.objects.filter(last_time_synced__lte=sync_threshold) | Organization.objects.filter(
|
|
|
|
|
last_time_synced__isnull=True
|
|
|
|
|
)
|
2022-06-03 08:09:47 -06:00
|
|
|
|
|
|
|
|
active_instance_ids, is_cloud_configured = get_active_instance_ids()
|
|
|
|
|
if is_cloud_configured:
|
|
|
|
|
if not active_instance_ids:
|
|
|
|
|
logger.warning("Did not find any active instances!")
|
|
|
|
|
return
|
|
|
|
|
else:
|
|
|
|
|
logger.debug(f"Found {len(active_instance_ids)} active instances")
|
|
|
|
|
organization_qs = organization_qs.filter(stack_id__in=active_instance_ids)
|
|
|
|
|
|
|
|
|
|
organization_pks = organization_qs.values_list("pk", flat=True)
|
|
|
|
|
|
2022-09-07 07:58:44 -06:00
|
|
|
max_countdown = SYNC_PERIOD.seconds
|
2022-06-03 08:09:47 -06:00
|
|
|
for idx, organization_pk in enumerate(organization_pks):
|
|
|
|
|
countdown = idx % max_countdown # Spread orgs evenly along SYNC_PERIOD
|
|
|
|
|
sync_organization_async.apply_async((organization_pk,), countdown=countdown)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=3)
|
|
|
|
|
def sync_organization_async(organization_pk):
|
2023-07-26 18:57:57 +08:00
|
|
|
"""
|
|
|
|
|
This task is called periodically to sync an organization with Grafana.
|
|
|
|
|
It runs syncronization without force_sync flag.
|
|
|
|
|
"""
|
2022-06-03 08:09:47 -06:00
|
|
|
run_organization_sync(organization_pk, False)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), max_retries=1)
|
|
|
|
|
def plugin_sync_organization_async(organization_pk):
|
2023-07-26 18:57:57 +08:00
|
|
|
"""
|
|
|
|
|
This task is called each time when the plugin is loaded.
|
|
|
|
|
It runs syncronization with force_sync flag.
|
|
|
|
|
Which means it will sync even if the organization was synced recently.
|
|
|
|
|
"""
|
2022-06-03 08:09:47 -06:00
|
|
|
run_organization_sync(organization_pk, True)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run_organization_sync(organization_pk, force_sync):
|
|
|
|
|
logger.info(f"Start sync Organization {organization_pk}")
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
organization = Organization.objects.get(pk=organization_pk)
|
|
|
|
|
except Organization.DoesNotExist:
|
|
|
|
|
logger.info(f"Organization {organization_pk} was not found")
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
if not force_sync:
|
|
|
|
|
if organization.last_time_synced and timezone.now() - organization.last_time_synced < SYNC_PERIOD:
|
|
|
|
|
logger.debug(f"Canceling sync for Organization {organization_pk}, since it was synced recently.")
|
|
|
|
|
return
|
|
|
|
|
if settings.GRAFANA_COM_API_TOKEN and settings.LICENSE == settings.CLOUD_LICENSE_NAME:
|
|
|
|
|
client = GcomAPIClient(settings.GRAFANA_COM_API_TOKEN)
|
2022-11-29 09:41:56 +01:00
|
|
|
instance_info = client.get_instance_info(organization.stack_id)
|
|
|
|
|
if not instance_info or instance_info["status"] != client.STACK_STATUS_ACTIVE:
|
2022-06-03 08:09:47 -06:00
|
|
|
logger.debug(f"Canceling sync for Organization {organization_pk}, as it is no longer active.")
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
sync_organization(organization)
|
|
|
|
|
logger.info(f"Finish sync Organization {organization_pk}")
|
2022-09-02 14:06:42 -06:00
|
|
|
|
|
|
|
|
|
2024-07-08 14:04:46 -06:00
|
|
|
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), max_retries=0)
|
2022-09-02 14:06:42 -06:00
|
|
|
def start_cleanup_deleted_organizations():
|
|
|
|
|
sync_threshold = timezone.now() - INACTIVE_PERIOD
|
|
|
|
|
|
|
|
|
|
organization_qs = Organization.objects.filter(last_time_synced__lte=sync_threshold)
|
2022-09-06 10:21:05 -06:00
|
|
|
|
|
|
|
|
deleted_instance_ids, is_cloud_configured = get_deleted_instance_ids()
|
|
|
|
|
if is_cloud_configured:
|
|
|
|
|
if not deleted_instance_ids:
|
|
|
|
|
logger.warning("Did not find any deleted instances!")
|
|
|
|
|
return
|
|
|
|
|
else:
|
2022-09-07 07:58:44 -06:00
|
|
|
logger.debug(f"Found {len(deleted_instance_ids)} deleted instances")
|
2022-09-06 10:21:05 -06:00
|
|
|
organization_qs = organization_qs.filter(stack_id__in=deleted_instance_ids)
|
|
|
|
|
|
2022-09-02 14:06:42 -06:00
|
|
|
organization_pks = organization_qs.values_list("pk", flat=True)
|
|
|
|
|
|
2022-09-06 10:21:05 -06:00
|
|
|
logger.debug(f"Found {len(organization_pks)} deleted organizations not synced recently")
|
2022-09-07 07:58:44 -06:00
|
|
|
max_countdown = INACTIVE_PERIOD.seconds
|
2022-09-02 14:06:42 -06:00
|
|
|
for idx, organization_pk in enumerate(organization_pks):
|
|
|
|
|
countdown = idx % max_countdown # Spread orgs evenly
|
2022-09-07 07:58:44 -06:00
|
|
|
cleanup_organization_async.apply_async((organization_pk,), countdown=countdown)
|
|
|
|
|
|
|
|
|
|
|
2024-07-08 14:04:46 -06:00
|
|
|
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), max_retries=0)
|
2022-09-07 07:58:44 -06:00
|
|
|
def cleanup_organization_async(organization_pk):
|
|
|
|
|
cleanup_organization(organization_pk)
|
2022-10-24 21:25:32 -06:00
|
|
|
|
|
|
|
|
|
|
|
|
|
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), max_retries=1)
|
|
|
|
|
def start_sync_regions():
|
|
|
|
|
regions, is_cloud_configured = get_stack_regions()
|
|
|
|
|
if not is_cloud_configured:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
if not regions:
|
|
|
|
|
logger.warning("Did not find any stack-regions!")
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
sync_regions(regions)
|
2023-01-24 13:44:07 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), max_retries=1)
|
|
|
|
|
def sync_team_members_for_organization_async(organization_pk):
|
|
|
|
|
try:
|
|
|
|
|
organization = Organization.objects.get(pk=organization_pk)
|
|
|
|
|
except Organization.DoesNotExist:
|
|
|
|
|
logger.info(f"Organization {organization_pk} was not found")
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
grafana_api_client = GrafanaAPIClient(api_url=organization.grafana_url, api_token=organization.api_token)
|
|
|
|
|
sync_team_members(grafana_api_client, organization)
|
2024-02-27 10:34:41 -07:00
|
|
|
|
|
|
|
|
|
|
|
|
|
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), max_retries=1)
|
|
|
|
|
def cleanup_empty_deleted_integrations(organization_pk, dry_run=True):
|
|
|
|
|
try:
|
2024-03-04 12:45:01 -07:00
|
|
|
organization = Organization.objects_with_deleted.get(pk=organization_pk)
|
2024-02-27 10:34:41 -07:00
|
|
|
except Organization.DoesNotExist:
|
|
|
|
|
logger.info(f"Organization {organization_pk} was not found")
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
integrations_qs = AlertReceiveChannel.objects_with_deleted.filter(
|
|
|
|
|
organization=organization, deleted_at__isnull=False, alert_groups=None
|
|
|
|
|
).distinct()
|
|
|
|
|
logger.info(
|
|
|
|
|
f"Found count={len(integrations_qs)} integrations in org={organization.public_primary_key} that are both empty and deleted"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
for integration in integrations_qs:
|
|
|
|
|
logger.info(
|
|
|
|
|
f"Deleting integration ppk={integration.public_primary_key} in organization={organization.stack_slug} dry_run={dry_run}"
|
|
|
|
|
)
|
|
|
|
|
if not dry_run:
|
|
|
|
|
integration.hard_delete()
|
2024-03-04 12:45:01 -07:00
|
|
|
|
|
|
|
|
|
2024-07-16 07:59:08 -06:00
|
|
|
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), max_retries=0)
|
|
|
|
|
def start_cleanup_deleted_integrations():
|
|
|
|
|
cleanup_threshold = timezone.now() - CLEANUP_PERIOD
|
|
|
|
|
channels_qs = AlertReceiveChannel.objects_with_deleted.filter(deleted_at__gte=cleanup_threshold)
|
|
|
|
|
organization_pks = set(channels_qs.values_list("organization_id", flat=True))
|
|
|
|
|
logger.debug(f"Found {len(organization_pks)} organizations")
|
2024-07-19 08:32:22 -06:00
|
|
|
for _, organization_pk in enumerate(organization_pks):
|
2024-07-16 07:59:08 -06:00
|
|
|
cleanup_empty_deleted_integrations.apply_async(
|
|
|
|
|
(organization_pk, False),
|
|
|
|
|
)
|