This is required to support the install v2 endpoint (to be used by backend plugin) which could be pushing null permissions, teams, or team memberships.
433 lines
18 KiB
Python
433 lines
18 KiB
Python
import logging
|
||
import typing
|
||
import uuid
|
||
|
||
from celery.utils.log import get_task_logger
|
||
from django.conf import settings
|
||
from django.utils import timezone
|
||
|
||
from apps.alerts.models import AlertReceiveChannel
|
||
from apps.api.permissions import LegacyAccessControlRole
|
||
from apps.auth_token.exceptions import InvalidToken
|
||
from apps.grafana_plugin.helpers.client import GcomAPIClient, GCOMInstanceInfo, GrafanaAPIClient
|
||
from apps.grafana_plugin.sync_data import SyncData, SyncPermission, SyncSettings, SyncTeam, SyncUser
|
||
from apps.metrics_exporter.helpers import metrics_bulk_update_team_label_cache
|
||
from apps.metrics_exporter.metrics_cache_manager import MetricsCacheManager
|
||
from apps.user_management.models import Organization, Team, User
|
||
from common.utils import task_lock
|
||
from settings.base import CLOUD_LICENSE_NAME, OPEN_SOURCE_LICENSE_NAME
|
||
|
||
logger = get_task_logger(__name__)
|
||
logger.setLevel(logging.DEBUG)
|
||
|
||
|
||
def sync_organization(organization: Organization) -> None:
|
||
# ensure one sync task is running at most for a given org at a given time
|
||
lock_id = "sync-organization-lock-{}".format(organization.id)
|
||
random_value = str(uuid.uuid4())
|
||
with task_lock(lock_id, random_value) as acquired:
|
||
if acquired:
|
||
_sync_organization(organization)
|
||
else:
|
||
# sync already running
|
||
logger.info(f"Sync for Organization {organization.pk} already in progress.")
|
||
|
||
|
||
def _sync_organization(organization: Organization) -> None:
|
||
grafana_api_client = GrafanaAPIClient(api_url=organization.grafana_url, api_token=organization.api_token)
|
||
gcom_client = GcomAPIClient(settings.GRAFANA_COM_ADMIN_API_TOKEN)
|
||
|
||
rbac_is_enabled = organization.is_rbac_permissions_enabled
|
||
# Update organization's RBAC status if it's an open-source instance, or it's an active cloud instance.
|
||
# Don't update non-active cloud instances (e.g. paused) as they can return 200 OK but not have RBAC enabled.
|
||
if settings.LICENSE == settings.OPEN_SOURCE_LICENSE_NAME or gcom_client.is_stack_active(organization.stack_id):
|
||
rbac_enabled_update, server_error = grafana_api_client.is_rbac_enabled_for_organization()
|
||
if not server_error: # Only update RBAC status if Grafana didn't return a server error
|
||
rbac_is_enabled = rbac_enabled_update
|
||
|
||
# get incident plugin settings
|
||
grafana_incident_settings, _ = grafana_api_client.get_grafana_incident_plugin_settings()
|
||
is_grafana_incident_enabled = False
|
||
grafana_incident_backend_url = None
|
||
if grafana_incident_settings is not None:
|
||
is_grafana_incident_enabled = grafana_incident_settings["enabled"]
|
||
grafana_incident_backend_url = (grafana_incident_settings.get("jsonData") or {}).get(
|
||
GrafanaAPIClient.GRAFANA_INCIDENT_PLUGIN_BACKEND_URL_KEY
|
||
)
|
||
|
||
# get labels plugin settings
|
||
is_grafana_labels_enabled = False
|
||
grafana_labels_plugin_settings, _ = grafana_api_client.get_grafana_labels_plugin_settings()
|
||
if grafana_labels_plugin_settings is not None:
|
||
is_grafana_labels_enabled = grafana_labels_plugin_settings["enabled"]
|
||
|
||
oncall_api_url = settings.BASE_URL
|
||
if settings.LICENSE == CLOUD_LICENSE_NAME:
|
||
oncall_api_url = settings.GRAFANA_CLOUD_ONCALL_API_URL
|
||
|
||
sync_settings = SyncSettings(
|
||
stack_id=organization.stack_id,
|
||
org_id=organization.org_id,
|
||
license=settings.LICENSE,
|
||
oncall_api_url=oncall_api_url,
|
||
oncall_token=organization.gcom_token,
|
||
grafana_url=organization.grafana_url,
|
||
grafana_token=organization.api_token,
|
||
rbac_enabled=rbac_is_enabled,
|
||
incident_enabled=is_grafana_incident_enabled,
|
||
incident_backend_url=grafana_incident_backend_url,
|
||
labels_enabled=is_grafana_labels_enabled,
|
||
)
|
||
_sync_organization_data(organization, sync_settings)
|
||
if organization.api_token_status == Organization.API_TOKEN_STATUS_OK:
|
||
sync_users_and_teams(grafana_api_client, organization)
|
||
|
||
|
||
def _sync_instance_info(organization: Organization) -> None:
|
||
if organization.gcom_token:
|
||
gcom_client = GcomAPIClient(organization.gcom_token)
|
||
instance_info = gcom_client.get_instance_info(organization.stack_id)
|
||
|
||
if not instance_info or instance_info["orgId"] != organization.org_id:
|
||
return
|
||
|
||
organization.stack_slug = instance_info["slug"]
|
||
organization.org_slug = instance_info["orgSlug"]
|
||
organization.org_title = instance_info["orgName"]
|
||
organization.region_slug = instance_info["regionSlug"]
|
||
organization.grafana_url = instance_info["url"]
|
||
organization.cluster_slug = instance_info["clusterSlug"]
|
||
organization.gcom_token_org_last_time_synced = timezone.now()
|
||
|
||
|
||
def sync_users_and_teams(client: GrafanaAPIClient, organization: Organization) -> None:
|
||
sync_users(client, organization)
|
||
sync_teams(client, organization)
|
||
sync_team_members(client, organization)
|
||
|
||
|
||
def sync_users(client: GrafanaAPIClient, organization: Organization, **kwargs) -> None:
|
||
api_users = client.get_users(organization.is_rbac_permissions_enabled, **kwargs)
|
||
# check if api_users are shaped correctly. e.g. for paused instance, the response is not a list.
|
||
if not api_users or not isinstance(api_users, (tuple, list)):
|
||
return
|
||
|
||
sync_users = [
|
||
SyncUser(
|
||
id=user["userId"],
|
||
name=user["name"],
|
||
login=user["login"],
|
||
email=user["email"],
|
||
role=user["role"],
|
||
avatar_url=user["avatarUrl"],
|
||
teams=None,
|
||
permissions=[SyncPermission(action=permission["action"]) for permission in user["permissions"]],
|
||
)
|
||
for user in api_users
|
||
]
|
||
_sync_users_data(organization, sync_users, delete_extra=True)
|
||
|
||
|
||
def sync_teams(client: GrafanaAPIClient, organization: Organization, **kwargs) -> None:
|
||
api_teams_result, _ = client.get_teams(**kwargs)
|
||
if not api_teams_result:
|
||
return
|
||
api_teams = api_teams_result["teams"]
|
||
sync_teams = [
|
||
SyncTeam(
|
||
team_id=team["id"],
|
||
name=team["name"],
|
||
email=team["email"],
|
||
avatar_url=team["avatarUrl"],
|
||
)
|
||
for team in api_teams
|
||
]
|
||
_sync_teams_data(organization, sync_teams)
|
||
|
||
|
||
def sync_team_members(client: GrafanaAPIClient, organization: Organization) -> None:
|
||
team_members = {}
|
||
for team in organization.teams.all():
|
||
members, _ = client.get_team_members(team.team_id)
|
||
if not members:
|
||
continue
|
||
team_members[team.team_id] = [member["userId"] for member in members]
|
||
_sync_teams_members_data(organization, team_members)
|
||
|
||
|
||
def delete_organization_if_needed(organization: Organization) -> bool:
|
||
# Organization has a manually set API token, it will not be found within GCOM
|
||
# and would need to be deleted manually.
|
||
from apps.auth_token.models import PluginAuthToken
|
||
|
||
manually_provisioned_token = PluginAuthToken.objects.filter(organization_id=organization.pk).first()
|
||
if manually_provisioned_token:
|
||
logger.info(f"Organization {organization.pk} has PluginAuthToken. Probably it's needed to delete org manually.")
|
||
return False
|
||
|
||
# Use common token as organization.gcom_token could be already revoked
|
||
client = GcomAPIClient(settings.GRAFANA_COM_ADMIN_API_TOKEN)
|
||
is_stack_deleted = client.is_stack_deleted(organization.stack_id)
|
||
if not is_stack_deleted:
|
||
return False
|
||
|
||
organization.delete()
|
||
return True
|
||
|
||
|
||
def cleanup_organization(organization_pk: int) -> None:
|
||
logger.info(f"Start cleanup Organization {organization_pk}")
|
||
try:
|
||
organization = Organization.objects_with_deleted.get(pk=organization_pk)
|
||
|
||
from apps.grafana_plugin.tasks.sync import cleanup_empty_deleted_integrations
|
||
|
||
cleanup_empty_deleted_integrations.apply_async(
|
||
(
|
||
organization.pk,
|
||
False,
|
||
),
|
||
)
|
||
|
||
if delete_organization_if_needed(organization):
|
||
logger.info(
|
||
f"Deleting organization due to stack deletion. "
|
||
f"pk: {organization_pk}, stack_id: {organization.stack_id}, org_id: {organization.org_id}"
|
||
)
|
||
else:
|
||
logger.info(f"Organization {organization_pk} not deleted in gcom, no action taken")
|
||
|
||
except Organization.DoesNotExist:
|
||
logger.info(f"Organization {organization_pk} was not found")
|
||
|
||
|
||
def _create_cloud_organization(
|
||
org_id: int, stack_id: int, sync_data: SyncData, instance_info: GCOMInstanceInfo
|
||
) -> Organization:
|
||
client = GcomAPIClient(sync_data.settings.oncall_token)
|
||
if not instance_info:
|
||
instance_info = client.get_instance_info(stack_id)
|
||
if not instance_info or str(instance_info["orgId"]) != org_id:
|
||
raise InvalidToken
|
||
|
||
return Organization.objects.create(
|
||
stack_id=str(instance_info["id"]),
|
||
stack_slug=instance_info["slug"],
|
||
grafana_url=instance_info["url"],
|
||
org_id=str(instance_info["orgId"]),
|
||
org_slug=instance_info["orgSlug"],
|
||
org_title=instance_info["orgName"],
|
||
region_slug=instance_info["regionSlug"],
|
||
cluster_slug=instance_info["clusterSlug"],
|
||
api_token=sync_data.settings.grafana_token,
|
||
gcom_token=sync_data.settings.oncall_token,
|
||
is_rbac_permissions_enabled=sync_data.settings.rbac_enabled,
|
||
defaults={"gcom_token_org_last_time_synced": timezone.now()},
|
||
)
|
||
|
||
|
||
def _create_oss_organization(sync_data: SyncData) -> Organization:
|
||
return Organization.objects.create(
|
||
stack_id=settings.SELF_HOSTED_SETTINGS["STACK_ID"],
|
||
stack_slug=settings.SELF_HOSTED_SETTINGS["STACK_SLUG"],
|
||
org_id=settings.SELF_HOSTED_SETTINGS["ORG_ID"],
|
||
org_slug=settings.SELF_HOSTED_SETTINGS["ORG_SLUG"],
|
||
org_title=settings.SELF_HOSTED_SETTINGS["ORG_TITLE"],
|
||
region_slug=settings.SELF_HOSTED_SETTINGS["REGION_SLUG"],
|
||
cluster_slug=settings.SELF_HOSTED_SETTINGS["CLUSTER_SLUG"],
|
||
grafana_url=sync_data.settings.grafana_url,
|
||
api_token=sync_data.settings.grafana_token,
|
||
is_rbac_permissions_enabled=sync_data.settings.rbac_enabled,
|
||
)
|
||
|
||
|
||
def _create_organization(
|
||
org_id: int, stack_id: int, sync_data: SyncData, instance_info: GCOMInstanceInfo
|
||
) -> typing.Optional[Organization]:
|
||
if settings.LICENSE == CLOUD_LICENSE_NAME:
|
||
return _create_cloud_organization(org_id, stack_id, sync_data, instance_info)
|
||
elif settings.LICENSE == OPEN_SOURCE_LICENSE_NAME:
|
||
return _create_oss_organization(sync_data)
|
||
return None
|
||
|
||
|
||
def get_or_create_organization(
|
||
org_id: int, stack_id: int, sync_data: SyncData = None, instance_info: GCOMInstanceInfo = None
|
||
) -> Organization:
|
||
organization = Organization.objects.filter(org_id=org_id, stack_id=stack_id).first()
|
||
if not organization:
|
||
organization = _create_organization(org_id, stack_id, sync_data, instance_info)
|
||
return organization
|
||
|
||
|
||
def get_or_create_user(organization: Organization, sync_user: SyncUser) -> User:
|
||
_sync_users_data(organization, [sync_user], delete_extra=False)
|
||
user = organization.users.get(user_id=sync_user.id)
|
||
|
||
# update team membership if needed
|
||
# (not removing user from teams, assuming this is called on user creation/first login only;
|
||
# periodic sync will keep teams updated)
|
||
membership = sync_user.teams or []
|
||
for team_id in membership:
|
||
team = organization.teams.filter(team_id=team_id).first()
|
||
if team:
|
||
user.teams.add(team)
|
||
|
||
return user
|
||
|
||
|
||
def _sync_organization_data(organization: Organization, sync_settings: SyncSettings):
|
||
organization.is_rbac_permissions_enabled = sync_settings.rbac_enabled
|
||
logger.info(f"RBAC status org={organization.pk} rbac_enabled={organization.is_rbac_permissions_enabled}")
|
||
|
||
organization.is_grafana_labels_enabled = sync_settings.labels_enabled
|
||
organization.is_grafana_incident_enabled = sync_settings.incident_enabled
|
||
organization.grafana_incident_backend_url = sync_settings.incident_backend_url
|
||
organization.grafana_url = sync_settings.grafana_url
|
||
organization.api_token = sync_settings.grafana_token
|
||
organization.last_time_synced = timezone.now()
|
||
|
||
_sync_instance_info(organization)
|
||
|
||
grafana_api_client = GrafanaAPIClient(api_url=organization.grafana_url, api_token=organization.api_token)
|
||
_, check_token_call_status = grafana_api_client.check_token()
|
||
if check_token_call_status["connected"]:
|
||
organization.api_token_status = Organization.API_TOKEN_STATUS_OK
|
||
organization.last_time_synced = timezone.now()
|
||
else:
|
||
organization.api_token_status = Organization.API_TOKEN_STATUS_FAILED
|
||
logger.warning(f"Sync not successful org={organization.pk} token_status=FAILED")
|
||
|
||
organization.save(
|
||
update_fields=[
|
||
"api_token",
|
||
"api_token_status",
|
||
"cluster_slug",
|
||
"stack_slug",
|
||
"org_slug",
|
||
"org_title",
|
||
"region_slug",
|
||
"grafana_url",
|
||
"last_time_synced",
|
||
"gcom_token_org_last_time_synced",
|
||
"is_rbac_permissions_enabled",
|
||
"is_grafana_incident_enabled",
|
||
"is_grafana_labels_enabled",
|
||
"grafana_incident_backend_url",
|
||
]
|
||
)
|
||
|
||
|
||
def _sync_users_data(organization: Organization, sync_users: list[SyncUser], delete_extra=False):
|
||
users_to_sync = (
|
||
User(
|
||
organization_id=organization.pk,
|
||
user_id=user.id,
|
||
email=user.email,
|
||
name=user.name,
|
||
username=user.login,
|
||
role=getattr(LegacyAccessControlRole, user.role.upper(), LegacyAccessControlRole.NONE),
|
||
avatar_url=user.avatar_url,
|
||
permissions=[{"action": permission.action} for permission in user.permissions] if user.permissions else [],
|
||
)
|
||
for user in sync_users
|
||
)
|
||
|
||
existing_user_ids = set(organization.users.all().values_list("user_id", flat=True))
|
||
kwargs = {}
|
||
if settings.DATABASE_TYPE in ("sqlite3", "postgresql"):
|
||
# unique_fields is required for sqlite and postgresql setups
|
||
kwargs["unique_fields"] = ("organization_id", "user_id", "is_active")
|
||
organization.users.bulk_create(
|
||
users_to_sync,
|
||
update_conflicts=True,
|
||
update_fields=("email", "name", "username", "role", "avatar_url", "permissions"),
|
||
batch_size=5000,
|
||
**kwargs,
|
||
)
|
||
|
||
# Retrieve primary keys for the newly created users
|
||
#
|
||
# If the model’s primary key is an AutoField, the primary key attribute can only be retrieved
|
||
# on certain databases (currently PostgreSQL, MariaDB 10.5+, and SQLite 3.35+).
|
||
# On other databases, it will not be set.
|
||
# https://docs.djangoproject.com/en/4.1/ref/models/querysets/#django.db.models.query.QuerySet.bulk_create
|
||
created_users = organization.users.exclude(user_id__in=existing_user_ids)
|
||
|
||
if delete_extra:
|
||
# delete removed users
|
||
existing_user_ids |= set(u.user_id for u in created_users)
|
||
user_ids_to_delete = existing_user_ids - {user.id for user in sync_users}
|
||
organization.users.filter(user_id__in=user_ids_to_delete).delete()
|
||
|
||
|
||
def _sync_teams_data(organization: Organization, sync_teams: list[SyncTeam] | None):
|
||
if sync_teams is None:
|
||
sync_teams = []
|
||
# keep existing team names mapping to check for possible metrics cache updates
|
||
existing_team_names = {team.team_id: team.name for team in organization.teams.all()}
|
||
teams_to_sync = tuple(
|
||
Team(
|
||
organization_id=organization.pk,
|
||
team_id=team.team_id,
|
||
name=team.name,
|
||
email=team.email,
|
||
avatar_url=team.avatar_url,
|
||
)
|
||
for team in sync_teams
|
||
)
|
||
# create entries, update if team_id already exists in the organization
|
||
kwargs = {}
|
||
if settings.DATABASE_TYPE in ("sqlite3", "postgresql"):
|
||
# unique_fields is required for sqlite and postgresql setups
|
||
kwargs["unique_fields"] = ("organization_id", "team_id")
|
||
organization.teams.bulk_create(
|
||
teams_to_sync,
|
||
batch_size=5000,
|
||
update_conflicts=True,
|
||
update_fields=("name", "email", "avatar_url"),
|
||
**kwargs,
|
||
)
|
||
|
||
# create missing direct paging integrations
|
||
AlertReceiveChannel.objects.create_missing_direct_paging_integrations(organization)
|
||
|
||
# delete removed teams and their direct paging integrations
|
||
existing_team_ids = set(organization.teams.all().values_list("team_id", flat=True))
|
||
team_ids_to_delete = existing_team_ids - set(t.team_id for t in sync_teams)
|
||
organization.alert_receive_channels.filter(
|
||
team__team_id__in=team_ids_to_delete, integration=AlertReceiveChannel.INTEGRATION_DIRECT_PAGING
|
||
).delete()
|
||
organization.teams.filter(team_id__in=team_ids_to_delete).delete()
|
||
|
||
# collect teams diffs to update metrics cache
|
||
metrics_teams_to_update: MetricsCacheManager.TeamsDiffMap = {}
|
||
for team_id in team_ids_to_delete:
|
||
metrics_teams_to_update = MetricsCacheManager.update_team_diff(metrics_teams_to_update, team_id, deleted=True)
|
||
for team in sync_teams:
|
||
previous_name = existing_team_names.get(team.team_id)
|
||
if previous_name and previous_name != team.name:
|
||
metrics_teams_to_update = MetricsCacheManager.update_team_diff(
|
||
metrics_teams_to_update, team.team_id, new_name=team.name
|
||
)
|
||
metrics_bulk_update_team_label_cache(metrics_teams_to_update, organization.id)
|
||
|
||
|
||
def _sync_teams_members_data(organization: Organization, team_members: dict[int, list[int]] | None):
|
||
if team_members is None:
|
||
return
|
||
# set team members
|
||
for team_id, members_ids in team_members.items():
|
||
team = organization.teams.get(team_id=team_id)
|
||
team.users.set(organization.users.filter(user_id__in=members_ids))
|
||
|
||
|
||
def apply_sync_data(organization: Organization, sync_data: SyncData):
|
||
# update org + settings
|
||
_sync_organization_data(organization, sync_data.settings)
|
||
# update or create users
|
||
_sync_users_data(organization, sync_data.users, delete_extra=True)
|
||
# update or create teams + direct paging integrations
|
||
_sync_teams_data(organization, sync_data.teams)
|
||
# update team members
|
||
_sync_teams_members_data(organization, sync_data.team_members)
|