oncall-engine/engine/apps/grafana_plugin/tasks/sync.py

181 lines
7.5 KiB
Python
Raw Permalink Normal View History

import logging
from celery.utils.log import get_task_logger
from django.conf import settings
from django.utils import timezone
from apps.alerts.models import AlertReceiveChannel
from apps.grafana_plugin.helpers import GcomAPIClient
from apps.grafana_plugin.helpers.client import GrafanaAPIClient
from apps.grafana_plugin.helpers.gcom import get_active_instance_ids, get_deleted_instance_ids, get_stack_regions
from apps.user_management.models import Organization
from apps.user_management.models.region import sync_regions
from apps.user_management.sync import cleanup_organization, sync_organization, sync_team_members
from common.custom_celery_tasks import shared_dedicated_queue_retry_task
logger = get_task_logger(__name__)
logger.setLevel(logging.DEBUG)
# celery beat will schedule start_sync_organizations for every 30 minutes
# to make sure that orgs are synced every 30 minutes, SYNC_PERIOD should be a little lower
SYNC_PERIOD = timezone.timedelta(minutes=25)
INACTIVE_PERIOD = timezone.timedelta(minutes=55)
CLEANUP_PERIOD = timezone.timedelta(hours=13)
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=0)
def start_sync_organizations():
sync_threshold = timezone.now() - SYNC_PERIOD
organization_qs = Organization.objects.filter(last_time_synced__lte=sync_threshold) | Organization.objects.filter(
last_time_synced__isnull=True
)
active_instance_ids, is_cloud_configured = get_active_instance_ids()
if is_cloud_configured:
if not active_instance_ids:
logger.warning("Did not find any active instances!")
return
else:
logger.debug(f"Found {len(active_instance_ids)} active instances")
organization_qs = organization_qs.filter(stack_id__in=active_instance_ids)
organization_pks = organization_qs.values_list("pk", flat=True)
max_countdown = SYNC_PERIOD.seconds
for idx, organization_pk in enumerate(organization_pks):
countdown = idx % max_countdown # Spread orgs evenly along SYNC_PERIOD
sync_organization_async.apply_async((organization_pk,), countdown=countdown)
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=3)
def sync_organization_async(organization_pk):
"""
This task is called periodically to sync an organization with Grafana.
It runs syncronization without force_sync flag.
"""
run_organization_sync(organization_pk, False)
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), max_retries=1)
def plugin_sync_organization_async(organization_pk):
"""
This task is called each time when the plugin is loaded.
It runs syncronization with force_sync flag.
Which means it will sync even if the organization was synced recently.
"""
run_organization_sync(organization_pk, True)
def run_organization_sync(organization_pk, force_sync):
logger.info(f"Start sync Organization {organization_pk}")
try:
organization = Organization.objects.get(pk=organization_pk)
except Organization.DoesNotExist:
logger.info(f"Organization {organization_pk} was not found")
return
if not force_sync:
if organization.last_time_synced and timezone.now() - organization.last_time_synced < SYNC_PERIOD:
logger.debug(f"Canceling sync for Organization {organization_pk}, since it was synced recently.")
return
if settings.GRAFANA_COM_API_TOKEN and settings.LICENSE == settings.CLOUD_LICENSE_NAME:
client = GcomAPIClient(settings.GRAFANA_COM_API_TOKEN)
Add RBAC Support (#777) * Modify plugin.json to support RBAC role registration * defines 26 new custom roles in plugin.json. The main roles are: - Admin: read/write access to everything in OnCall - Reader: read access to everything in OnCall - OnCaller : read access to everything in OnCall + edit access to Alert Groups and Schedules - <object-type> Editor: read/write access to everything related to <object-type> - <object-type> Reader: read access for <object-type> - User Settings Admin: read/write access to all user's settings, not just own settings. This is in comparison to User Settings Editor which can only read/write own settings * update changelog and documentation (#686) * implement RBAC for OnCall backend This commit refactors backend authorization. It trys to use RBAC authorization if the org's grafana instance supports it, otherwise it falls back to basic role authorization. * update RBAC backend tests * add tests for RBAC changes - run backend tests as matrix where RBAC is enabled/disabled. When RBAC is enabled, the permissions granted are read from the role grants in the frontend's plugin.json file (instead of relying what we specify in RBACPermission.Permissions) - remove --reuse-db --nomigrations flags from engine/tox.ini - minor autoformatting changes to docker-compose-developer.yml * remove --ds=settings.ci-test from pytest CI command DJANGO_SETTINGS_MODULE is already specified as an env var so this is just unecessary duplication * update gitignore * update github action job name for "test" * RBAC frontend changes * refactors the use of basic roles (ex. Viewer, Editor, Admin) use RBAC permissions (when supported), or falling back to basic roles when RBAC is not supported. - updates the UserAction enum in grafana-plugin/src/state/userAction.ts. Previously this was hardcoded to a list of strings that were being returned by the OnCall API. Now the values here correspond to the permissions in plugin.json (plus a fallback role) * changes per Gabriel's comments: - get rid of group attribute in rbac roles - remove displayName role attribute - remove hidden role attribute - add back role to includes section * don't try to update user timezone if they don't have permission
2022-11-29 09:41:56 +01:00
instance_info = client.get_instance_info(organization.stack_id)
if not instance_info or instance_info["status"] != client.STACK_STATUS_ACTIVE:
logger.debug(f"Canceling sync for Organization {organization_pk}, as it is no longer active.")
return
sync_organization(organization)
logger.info(f"Finish sync Organization {organization_pk}")
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), max_retries=0)
def start_cleanup_deleted_organizations():
sync_threshold = timezone.now() - INACTIVE_PERIOD
organization_qs = Organization.objects.filter(last_time_synced__lte=sync_threshold)
deleted_instance_ids, is_cloud_configured = get_deleted_instance_ids()
if is_cloud_configured:
if not deleted_instance_ids:
logger.warning("Did not find any deleted instances!")
return
else:
logger.debug(f"Found {len(deleted_instance_ids)} deleted instances")
organization_qs = organization_qs.filter(stack_id__in=deleted_instance_ids)
organization_pks = organization_qs.values_list("pk", flat=True)
logger.debug(f"Found {len(organization_pks)} deleted organizations not synced recently")
max_countdown = INACTIVE_PERIOD.seconds
for idx, organization_pk in enumerate(organization_pks):
countdown = idx % max_countdown # Spread orgs evenly
cleanup_organization_async.apply_async((organization_pk,), countdown=countdown)
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), max_retries=0)
def cleanup_organization_async(organization_pk):
cleanup_organization(organization_pk)
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), max_retries=1)
def start_sync_regions():
regions, is_cloud_configured = get_stack_regions()
if not is_cloud_configured:
return
if not regions:
logger.warning("Did not find any stack-regions!")
return
sync_regions(regions)
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), max_retries=1)
def sync_team_members_for_organization_async(organization_pk):
try:
organization = Organization.objects.get(pk=organization_pk)
except Organization.DoesNotExist:
logger.info(f"Organization {organization_pk} was not found")
return
grafana_api_client = GrafanaAPIClient(api_url=organization.grafana_url, api_token=organization.api_token)
sync_team_members(grafana_api_client, organization)
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), max_retries=1)
def cleanup_empty_deleted_integrations(organization_pk, dry_run=True):
try:
organization = Organization.objects_with_deleted.get(pk=organization_pk)
except Organization.DoesNotExist:
logger.info(f"Organization {organization_pk} was not found")
return
integrations_qs = AlertReceiveChannel.objects_with_deleted.filter(
organization=organization, deleted_at__isnull=False, alert_groups=None
).distinct()
logger.info(
f"Found count={len(integrations_qs)} integrations in org={organization.public_primary_key} that are both empty and deleted"
)
for integration in integrations_qs:
logger.info(
f"Deleting integration ppk={integration.public_primary_key} in organization={organization.stack_slug} dry_run={dry_run}"
)
if not dry_run:
integration.hard_delete()
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), max_retries=0)
def start_cleanup_deleted_integrations():
cleanup_threshold = timezone.now() - CLEANUP_PERIOD
channels_qs = AlertReceiveChannel.objects_with_deleted.filter(deleted_at__gte=cleanup_threshold)
organization_pks = set(channels_qs.values_list("organization_id", flat=True))
logger.debug(f"Found {len(organization_pks)} organizations")
for _, organization_pk in enumerate(organization_pks):
cleanup_empty_deleted_integrations.apply_async(
(organization_pk, False),
)