oncall-engine/engine/apps/user_management/sync.py

456 lines
19 KiB
Python
Raw Permalink Normal View History

import logging
import typing
import uuid
from celery.utils.log import get_task_logger
from django.conf import settings
from django.utils import timezone
from apps.alerts.models import AlertReceiveChannel
from apps.api.permissions import LegacyAccessControlRole
from apps.auth_token.exceptions import InvalidToken
from apps.grafana_plugin.helpers.client import GcomAPIClient, GCOMInstanceInfo, GrafanaAPIClient
from apps.grafana_plugin.sync_data import SyncData, SyncPermission, SyncSettings, SyncTeam, SyncUser
from apps.metrics_exporter.helpers import metrics_bulk_update_team_label_cache
from apps.metrics_exporter.metrics_cache_manager import MetricsCacheManager
from apps.user_management.models import Organization, Team, User
from common.utils import task_lock
from settings.base import CLOUD_LICENSE_NAME, OPEN_SOURCE_LICENSE_NAME
logger = get_task_logger(__name__)
logger.setLevel(logging.DEBUG)
def sync_organization(organization: Organization) -> None:
# ensure one sync task is running at most for a given org at a given time
lock_id = "sync-organization-lock-{}".format(organization.id)
random_value = str(uuid.uuid4())
with task_lock(lock_id, random_value) as acquired:
if acquired:
_sync_organization(organization)
else:
# sync already running
logger.info(f"Sync for Organization {organization.pk} already in progress.")
def _sync_organization(organization: Organization) -> None:
grafana_api_client = GrafanaAPIClient(api_url=organization.grafana_url, api_token=organization.api_token)
gcom_client = GcomAPIClient(settings.GRAFANA_COM_ADMIN_API_TOKEN)
# check organization API token is valid
_, check_token_call_status = grafana_api_client.check_token()
if not check_token_call_status["connected"]:
organization.api_token_status = Organization.API_TOKEN_STATUS_FAILED
organization.save(update_fields=["api_token_status"])
logger.warning(f"Sync not successful org={organization.pk} token_status=FAILED")
return
rbac_is_enabled = organization.is_rbac_permissions_enabled
# Update organization's RBAC status if it's an open-source instance, or it's an active cloud instance.
# Don't update non-active cloud instances (e.g. paused) as they can return 200 OK but not have RBAC enabled.
if settings.LICENSE == settings.OPEN_SOURCE_LICENSE_NAME or gcom_client.is_stack_active(organization.stack_id):
rbac_enabled_update, server_error = grafana_api_client.is_rbac_enabled_for_organization()
if not server_error: # Only update RBAC status if Grafana didn't return a server error
rbac_is_enabled = rbac_enabled_update
Add RBAC Support (#777) * Modify plugin.json to support RBAC role registration * defines 26 new custom roles in plugin.json. The main roles are: - Admin: read/write access to everything in OnCall - Reader: read access to everything in OnCall - OnCaller : read access to everything in OnCall + edit access to Alert Groups and Schedules - <object-type> Editor: read/write access to everything related to <object-type> - <object-type> Reader: read access for <object-type> - User Settings Admin: read/write access to all user's settings, not just own settings. This is in comparison to User Settings Editor which can only read/write own settings * update changelog and documentation (#686) * implement RBAC for OnCall backend This commit refactors backend authorization. It trys to use RBAC authorization if the org's grafana instance supports it, otherwise it falls back to basic role authorization. * update RBAC backend tests * add tests for RBAC changes - run backend tests as matrix where RBAC is enabled/disabled. When RBAC is enabled, the permissions granted are read from the role grants in the frontend's plugin.json file (instead of relying what we specify in RBACPermission.Permissions) - remove --reuse-db --nomigrations flags from engine/tox.ini - minor autoformatting changes to docker-compose-developer.yml * remove --ds=settings.ci-test from pytest CI command DJANGO_SETTINGS_MODULE is already specified as an env var so this is just unecessary duplication * update gitignore * update github action job name for "test" * RBAC frontend changes * refactors the use of basic roles (ex. Viewer, Editor, Admin) use RBAC permissions (when supported), or falling back to basic roles when RBAC is not supported. - updates the UserAction enum in grafana-plugin/src/state/userAction.ts. Previously this was hardcoded to a list of strings that were being returned by the OnCall API. Now the values here correspond to the permissions in plugin.json (plus a fallback role) * changes per Gabriel's comments: - get rid of group attribute in rbac roles - remove displayName role attribute - remove hidden role attribute - add back role to includes section * don't try to update user timezone if they don't have permission
2022-11-29 09:41:56 +01:00
# get incident plugin settings
grafana_incident_settings, _ = grafana_api_client.get_grafana_incident_plugin_settings()
is_grafana_incident_enabled = False
grafana_incident_backend_url = None
if grafana_incident_settings is not None:
is_grafana_incident_enabled = grafana_incident_settings["enabled"]
grafana_incident_backend_url = (grafana_incident_settings.get("jsonData") or {}).get(
GrafanaAPIClient.GRAFANA_INCIDENT_PLUGIN_BACKEND_URL_KEY
)
# get labels plugin settings
is_grafana_labels_enabled = False
grafana_labels_plugin_settings, _ = grafana_api_client.get_grafana_labels_plugin_settings()
if grafana_labels_plugin_settings is not None:
is_grafana_labels_enabled = grafana_labels_plugin_settings["enabled"]
# get IRM plugin settings
is_grafana_irm_enabled = False
grafana_irm_plugin_settings, _ = grafana_api_client.get_grafana_irm_plugin_settings()
if grafana_irm_plugin_settings is not None:
is_grafana_irm_enabled = grafana_irm_plugin_settings["enabled"]
oncall_api_url = settings.BASE_URL
if settings.LICENSE == CLOUD_LICENSE_NAME:
oncall_api_url = settings.GRAFANA_CLOUD_ONCALL_API_URL
sync_settings = SyncSettings(
stack_id=organization.stack_id,
org_id=organization.org_id,
license=settings.LICENSE,
oncall_api_url=oncall_api_url,
oncall_token=organization.gcom_token,
grafana_url=organization.grafana_url,
grafana_token=organization.api_token,
rbac_enabled=rbac_is_enabled,
incident_enabled=is_grafana_incident_enabled,
incident_backend_url=grafana_incident_backend_url,
labels_enabled=is_grafana_labels_enabled,
irm_enabled=is_grafana_irm_enabled,
)
_sync_organization_data(organization, sync_settings)
if organization.api_token_status == Organization.API_TOKEN_STATUS_OK:
sync_users_and_teams(grafana_api_client, organization)
def _sync_instance_info(organization: Organization) -> None:
if organization.gcom_token:
gcom_client = GcomAPIClient(organization.gcom_token)
instance_info = gcom_client.get_instance_info(organization.stack_id)
if not instance_info or instance_info["orgId"] != organization.org_id:
return
organization.stack_slug = instance_info["slug"]
organization.org_slug = instance_info["orgSlug"]
organization.org_title = instance_info["orgName"]
organization.region_slug = instance_info["regionSlug"]
organization.grafana_url = instance_info["url"]
organization.cluster_slug = instance_info["clusterSlug"]
organization.gcom_token_org_last_time_synced = timezone.now()
def sync_users_and_teams(client: GrafanaAPIClient, organization: Organization) -> None:
sync_users(client, organization)
sync_teams(client, organization)
sync_team_members(client, organization)
def sync_users(client: GrafanaAPIClient, organization: Organization, **kwargs) -> None:
api_users = client.get_users(organization.is_rbac_permissions_enabled, **kwargs)
# check if api_users are shaped correctly. e.g. for paused instance, the response is not a list.
if not api_users or not isinstance(api_users, (tuple, list)):
return
sync_users = [
SyncUser(
id=user["userId"],
name=user["name"],
login=user["login"],
email=user["email"],
role=user["role"],
avatar_url=user["avatarUrl"],
teams=None,
permissions=[SyncPermission(action=permission["action"]) for permission in user["permissions"]],
)
for user in api_users
]
_sync_users_data(organization, sync_users, delete_extra=True)
def sync_teams(client: GrafanaAPIClient, organization: Organization, **kwargs) -> None:
api_teams_result, _ = client.get_teams(**kwargs)
if not api_teams_result:
return
api_teams = api_teams_result["teams"]
sync_teams = [
SyncTeam(
team_id=team["id"],
name=team["name"],
email=team["email"],
avatar_url=team["avatarUrl"],
)
for team in api_teams
]
_sync_teams_data(organization, sync_teams)
def sync_team_members(client: GrafanaAPIClient, organization: Organization) -> None:
team_members = {}
for team in organization.teams.all():
members, _ = client.get_team_members(team.team_id)
if not members:
continue
team_members[team.team_id] = [member["userId"] for member in members]
_sync_teams_members_data(organization, team_members)
def delete_organization_if_needed(organization: Organization) -> bool:
# Organization has a manually set API token, it will not be found within GCOM
# and would need to be deleted manually.
`apps.get_model` -> `import` (#2619) # What this PR does Remove [`apps.get_model`](https://docs.djangoproject.com/en/3.2/ref/applications/#django.apps.apps.get_model) invocations and use inline `import` statements in places where models are imported within functions/methods to avoid circular imports. I believe `import` statements are more appropriate for most use cases as they allow for better static code analysis & formatting, and solve the issue of circular imports without being unnecessarily dynamic as `apps.get_model`. With `import` statements, it's possible to: - Jump to model definitions in most IDEs - Automatically sort inline imports with `isort` - Find import errors faster/easier (most IDEs highlight broken imports) - Have more consistency across regular & inline imports when importing models This PR also adds a flake8 rule to ban imports of `django.apps.apps`, so it's harder to use `apps.get_model` by mistake (it's possible to ignore this rule by using `# noqa: I251`). The rule is not enforced on directories with migration files, because `apps.get_model` is often used to get a historical state of a model, which is useful when writing migrations ([see this SO answer for more details](https://stackoverflow.com/a/37769213)). So `apps.get_model` is considered OK in migrations (even necessary in some cases). ## Checklist - [x] Unit, integration, and e2e (if applicable) tests updated - [x] Documentation added (or `pr:no public docs` PR label added if not required) - [x] `CHANGELOG.md` updated (or `pr:no changelog` PR label added if not required)
2023-07-25 10:43:23 +01:00
from apps.auth_token.models import PluginAuthToken
manually_provisioned_token = PluginAuthToken.objects.filter(organization_id=organization.pk).first()
if manually_provisioned_token:
logger.info(f"Organization {organization.pk} has PluginAuthToken. Probably it's needed to delete org manually.")
return False
# Use common token as organization.gcom_token could be already revoked
client = GcomAPIClient(settings.GRAFANA_COM_ADMIN_API_TOKEN)
is_stack_deleted = client.is_stack_deleted(organization.stack_id)
if not is_stack_deleted:
return False
organization.delete()
return True
def cleanup_organization(organization_pk: int) -> None:
logger.info(f"Start cleanup Organization {organization_pk}")
try:
organization = Organization.objects_with_deleted.get(pk=organization_pk)
from apps.grafana_plugin.tasks.sync import cleanup_empty_deleted_integrations
cleanup_empty_deleted_integrations.apply_async(
(
organization.pk,
False,
),
)
if delete_organization_if_needed(organization):
logger.info(
f"Deleting organization due to stack deletion. "
f"pk: {organization_pk}, stack_id: {organization.stack_id}, org_id: {organization.org_id}"
)
else:
logger.info(f"Organization {organization_pk} not deleted in gcom, no action taken")
except Organization.DoesNotExist:
logger.info(f"Organization {organization_pk} was not found")
def _create_cloud_organization(
org_id: int, stack_id: int, sync_data: SyncData, instance_info: GCOMInstanceInfo
) -> Organization:
client = GcomAPIClient(sync_data.settings.oncall_token)
if not instance_info:
instance_info = client.get_instance_info(stack_id)
if not instance_info or str(instance_info["orgId"]) != org_id:
raise InvalidToken
return Organization.objects.create(
stack_id=str(instance_info["id"]),
stack_slug=instance_info["slug"],
grafana_url=instance_info["url"],
org_id=str(instance_info["orgId"]),
org_slug=instance_info["orgSlug"],
org_title=instance_info["orgName"],
region_slug=instance_info["regionSlug"],
cluster_slug=instance_info["clusterSlug"],
api_token=sync_data.settings.grafana_token,
gcom_token=sync_data.settings.oncall_token,
is_rbac_permissions_enabled=sync_data.settings.rbac_enabled,
defaults={"gcom_token_org_last_time_synced": timezone.now()},
)
def _create_oss_organization(sync_data: SyncData) -> Organization:
return Organization.objects.create(
stack_id=settings.SELF_HOSTED_SETTINGS["STACK_ID"],
stack_slug=settings.SELF_HOSTED_SETTINGS["STACK_SLUG"],
org_id=settings.SELF_HOSTED_SETTINGS["ORG_ID"],
org_slug=settings.SELF_HOSTED_SETTINGS["ORG_SLUG"],
org_title=settings.SELF_HOSTED_SETTINGS["ORG_TITLE"],
region_slug=settings.SELF_HOSTED_SETTINGS["REGION_SLUG"],
cluster_slug=settings.SELF_HOSTED_SETTINGS["CLUSTER_SLUG"],
grafana_url=sync_data.settings.grafana_url,
api_token=sync_data.settings.grafana_token,
is_rbac_permissions_enabled=sync_data.settings.rbac_enabled,
)
def _create_organization(
org_id: int, stack_id: int, sync_data: SyncData, instance_info: GCOMInstanceInfo
) -> typing.Optional[Organization]:
if settings.LICENSE == CLOUD_LICENSE_NAME:
return _create_cloud_organization(org_id, stack_id, sync_data, instance_info)
elif settings.LICENSE == OPEN_SOURCE_LICENSE_NAME:
return _create_oss_organization(sync_data)
return None
def get_or_create_organization(
org_id: int, stack_id: int, sync_data: SyncData = None, instance_info: GCOMInstanceInfo = None
) -> Organization:
organization = Organization.objects.filter(org_id=org_id, stack_id=stack_id).first()
if not organization:
organization = _create_organization(org_id, stack_id, sync_data, instance_info)
return organization
def get_or_create_user(organization: Organization, sync_user: SyncUser) -> User:
_sync_users_data(organization, [sync_user], delete_extra=False)
user = organization.users.get(user_id=sync_user.id)
# update team membership if needed
# (not removing user from teams, assuming this is called on user creation/first login only;
# periodic sync will keep teams updated)
membership = sync_user.teams or []
for team_id in membership:
team = organization.teams.filter(team_id=team_id).first()
if team:
user.teams.add(team)
return user
def _sync_organization_data(organization: Organization, sync_settings: SyncSettings):
organization.is_rbac_permissions_enabled = sync_settings.rbac_enabled
logger.info(f"RBAC status org={organization.pk} rbac_enabled={organization.is_rbac_permissions_enabled}")
organization.is_grafana_irm_enabled = sync_settings.irm_enabled
organization.is_grafana_labels_enabled = sync_settings.labels_enabled
organization.is_grafana_incident_enabled = sync_settings.incident_enabled
organization.grafana_incident_backend_url = sync_settings.incident_backend_url
organization.grafana_url = sync_settings.grafana_url
organization.api_token = sync_settings.grafana_token
organization.last_time_synced = timezone.now()
_sync_instance_info(organization)
grafana_api_client = GrafanaAPIClient(api_url=organization.grafana_url, api_token=organization.api_token)
_, check_token_call_status = grafana_api_client.check_token()
if check_token_call_status["connected"]:
organization.api_token_status = Organization.API_TOKEN_STATUS_OK
organization.last_time_synced = timezone.now()
else:
organization.api_token_status = Organization.API_TOKEN_STATUS_FAILED
logger.warning(f"Sync not successful org={organization.pk} token_status=FAILED")
organization.save(
update_fields=[
"api_token",
"api_token_status",
"cluster_slug",
"stack_slug",
"org_slug",
"org_title",
"region_slug",
"grafana_url",
"last_time_synced",
"gcom_token_org_last_time_synced",
"is_rbac_permissions_enabled",
"is_grafana_incident_enabled",
"is_grafana_labels_enabled",
"is_grafana_irm_enabled",
"grafana_incident_backend_url",
]
)
def _sync_users_data(organization: Organization, sync_users: list[SyncUser], delete_extra=False):
if sync_users is None:
return
users_to_sync = (
User(
organization_id=organization.pk,
user_id=user.id,
email=user.email,
name=user.name,
username=user.login,
role=getattr(LegacyAccessControlRole, user.role.upper(), LegacyAccessControlRole.NONE),
avatar_url=user.avatar_url,
permissions=[{"action": permission.action} for permission in user.permissions] if user.permissions else [],
)
for user in sync_users
)
existing_user_ids = set(organization.users.all().values_list("user_id", flat=True))
kwargs = {}
if settings.DATABASE_TYPE in ("sqlite3", "postgresql"):
# unique_fields is required for sqlite and postgresql setups
kwargs["unique_fields"] = ("organization_id", "user_id", "is_active")
organization.users.bulk_create(
users_to_sync,
update_conflicts=True,
update_fields=("email", "name", "username", "role", "avatar_url", "permissions"),
batch_size=5000,
**kwargs,
)
# Retrieve primary keys for the newly created users
#
# If the models primary key is an AutoField, the primary key attribute can only be retrieved
# on certain databases (currently PostgreSQL, MariaDB 10.5+, and SQLite 3.35+).
# On other databases, it will not be set.
# https://docs.djangoproject.com/en/4.1/ref/models/querysets/#django.db.models.query.QuerySet.bulk_create
created_users = organization.users.exclude(user_id__in=existing_user_ids)
if delete_extra:
# delete removed users
existing_user_ids |= set(u.user_id for u in created_users)
user_ids_to_delete = existing_user_ids - {user.id for user in sync_users}
organization.users.filter(user_id__in=user_ids_to_delete).delete()
def _sync_teams_data(organization: Organization, sync_teams: list[SyncTeam] | None):
if sync_teams is None:
sync_teams = []
# keep existing team names mapping to check for possible metrics cache updates
existing_team_names = {team.team_id: team.name for team in organization.teams.all()}
teams_to_sync = tuple(
Team(
organization_id=organization.pk,
team_id=team.team_id,
name=team.name,
email=team.email,
avatar_url=team.avatar_url,
)
for team in sync_teams
)
# create entries, update if team_id already exists in the organization
kwargs = {}
if settings.DATABASE_TYPE in ("sqlite3", "postgresql"):
# unique_fields is required for sqlite and postgresql setups
kwargs["unique_fields"] = ("organization_id", "team_id")
organization.teams.bulk_create(
teams_to_sync,
batch_size=5000,
update_conflicts=True,
update_fields=("name", "email", "avatar_url"),
**kwargs,
)
# create missing direct paging integrations
AlertReceiveChannel.objects.create_missing_direct_paging_integrations(organization)
# delete removed teams and their direct paging integrations
existing_team_ids = set(organization.teams.all().values_list("team_id", flat=True))
team_ids_to_delete = existing_team_ids - set(t.team_id for t in sync_teams)
organization.alert_receive_channels.filter(
team__team_id__in=team_ids_to_delete, integration=AlertReceiveChannel.INTEGRATION_DIRECT_PAGING
).delete()
organization.teams.filter(team_id__in=team_ids_to_delete).delete()
# collect teams diffs to update metrics cache
metrics_teams_to_update: MetricsCacheManager.TeamsDiffMap = {}
for team_id in team_ids_to_delete:
metrics_teams_to_update = MetricsCacheManager.update_team_diff(metrics_teams_to_update, team_id, deleted=True)
for team in sync_teams:
previous_name = existing_team_names.get(team.team_id)
if previous_name and previous_name != team.name:
metrics_teams_to_update = MetricsCacheManager.update_team_diff(
metrics_teams_to_update, team.team_id, new_name=team.name
)
metrics_bulk_update_team_label_cache(metrics_teams_to_update, organization.id)
def _sync_teams_members_data(organization: Organization, team_members: dict[int, list[int]] | None):
if team_members is None:
return
# set team members
for team_id, members_ids in team_members.items():
team = organization.teams.get(team_id=team_id)
if members_ids:
team.users.set(organization.users.filter(user_id__in=members_ids))
else:
team.users.clear()
def apply_sync_data(organization: Organization, sync_data: SyncData):
# update org + settings
_sync_organization_data(organization, sync_data.settings)
# update or create users
_sync_users_data(organization, sync_data.users, delete_extra=True)
# update or create teams + direct paging integrations
_sync_teams_data(organization, sync_data.teams)
# update team members
_sync_teams_members_data(organization, sync_data.team_members)