The query for checking a user permission (used to get users from a Slack usergroup, for example) is timing out (and generating retries, besides affecting some use cases: [logs](https://ops.grafana-ops.net/explore?panes=%7B%22FCQ%22:%7B%22datasource%22:%22c-R8UWvVk%22,%22queries%22:%5B%7B%22refId%22:%22A%22,%22expr%22:%22%7Bnamespace%3D%5C%22amixr-prod%5C%22,%20cluster%3D%5C%22prod-us-central-0%5C%22%7D%20%7C%3D%20%5C%22Timeout%20exceeded%20in%20regular%20expression%20match.%5C%22%22,%22queryType%22:%22range%22,%22datasource%22:%7B%22type%22:%22loki%22,%22uid%22:%22c-R8UWvVk%22%7D,%22editorMode%22:%22code%22%7D%5D,%22range%22:%7B%22from%22:%22now-6h%22,%22to%22:%22now%22%7D%7D%7D&schemaVersion=1&orgId=1)): `django.db.utils.OperationalError: (3699, 'Timeout exceeded in regular expression match.')` Change to a `contains` query except for SQLite (not supported), where a simplified version of the original regex query is used.
461 lines
18 KiB
Python
461 lines
18 KiB
Python
import datetime
|
||
import json
|
||
import logging
|
||
import re
|
||
import typing
|
||
from urllib.parse import urljoin
|
||
|
||
import pytz
|
||
from django.conf import settings
|
||
from django.core.validators import MinLengthValidator
|
||
from django.db import models, transaction
|
||
from django.db.models import Q
|
||
from django.db.models.signals import post_save
|
||
from django.dispatch import receiver
|
||
from emoji import demojize
|
||
|
||
from apps.api.permissions import (
|
||
LegacyAccessControlCompatiblePermission,
|
||
LegacyAccessControlRole,
|
||
RBACPermission,
|
||
user_is_authorized,
|
||
)
|
||
from apps.schedules.tasks import drop_cached_ical_for_custom_events_for_organization
|
||
from common.public_primary_keys import generate_public_primary_key, increase_public_primary_key_length
|
||
|
||
if typing.TYPE_CHECKING:
|
||
from django.db.models.manager import RelatedManager
|
||
|
||
from apps.alerts.models import AlertGroup, EscalationPolicy
|
||
from apps.auth_token.models import ApiAuthToken, ScheduleExportAuthToken, UserScheduleExportAuthToken
|
||
from apps.base.models import UserNotificationPolicy
|
||
from apps.slack.models import SlackUserIdentity
|
||
from apps.user_management.models import Organization, Team
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class PermissionsQuery(typing.TypedDict):
|
||
permissions__contains: typing.Dict
|
||
|
||
|
||
class PermissionsRegexQuery(typing.TypedDict):
|
||
permissions__regex: str
|
||
|
||
|
||
class RoleInQuery(typing.TypedDict):
|
||
role__in: typing.List[int]
|
||
|
||
|
||
def generate_public_primary_key_for_user():
|
||
prefix = "U"
|
||
new_public_primary_key = generate_public_primary_key(prefix)
|
||
|
||
failure_counter = 0
|
||
while User.objects.filter(public_primary_key=new_public_primary_key).exists():
|
||
new_public_primary_key = increase_public_primary_key_length(
|
||
failure_counter=failure_counter, prefix=prefix, model_name="User"
|
||
)
|
||
failure_counter += 1
|
||
|
||
return new_public_primary_key
|
||
|
||
|
||
def default_working_hours():
|
||
weekdays = ["monday", "tuesday", "wednesday", "thursday", "friday"]
|
||
weekends = ["saturday", "sunday"]
|
||
|
||
working_hours = {day: [{"start": "09:00:00", "end": "17:00:00"}] for day in weekdays}
|
||
working_hours |= {day: [] for day in weekends}
|
||
|
||
return working_hours
|
||
|
||
|
||
class UserManager(models.Manager["User"]):
|
||
@staticmethod
|
||
def sync_for_team(team, api_members: list[dict]):
|
||
user_ids = tuple(member["userId"] for member in api_members)
|
||
users = team.organization.users.filter(user_id__in=user_ids)
|
||
team.users.set(users)
|
||
|
||
@staticmethod
|
||
def sync_for_organization(organization, api_users: list[dict]):
|
||
from apps.base.models import UserNotificationPolicy
|
||
|
||
grafana_users = {user["userId"]: user for user in api_users}
|
||
existing_user_ids = set(organization.users.all().values_list("user_id", flat=True))
|
||
|
||
# create missing users
|
||
users_to_create = tuple(
|
||
User(
|
||
organization_id=organization.pk,
|
||
user_id=user["userId"],
|
||
email=user["email"],
|
||
name=user["name"],
|
||
username=user["login"],
|
||
role=getattr(LegacyAccessControlRole, user["role"].upper(), LegacyAccessControlRole.NONE),
|
||
avatar_url=user["avatarUrl"],
|
||
permissions=user["permissions"],
|
||
)
|
||
for user in grafana_users.values()
|
||
if user["userId"] not in existing_user_ids
|
||
)
|
||
|
||
with transaction.atomic():
|
||
organization.users.bulk_create(users_to_create, batch_size=5000)
|
||
# Retrieve primary keys for the newly created users
|
||
#
|
||
# If the model’s primary key is an AutoField, the primary key attribute can only be retrieved
|
||
# on certain databases (currently PostgreSQL, MariaDB 10.5+, and SQLite 3.35+).
|
||
# On other databases, it will not be set.
|
||
# https://docs.djangoproject.com/en/4.1/ref/models/querysets/#django.db.models.query.QuerySet.bulk_create
|
||
created_users = organization.users.exclude(user_id__in=existing_user_ids)
|
||
|
||
policies_to_create = ()
|
||
for user in created_users:
|
||
policies_to_create = policies_to_create + user.default_notification_policies_defaults
|
||
policies_to_create = policies_to_create + user.important_notification_policies_defaults
|
||
UserNotificationPolicy.objects.bulk_create(policies_to_create, batch_size=5000)
|
||
|
||
# delete excess users
|
||
user_ids_to_delete = existing_user_ids - grafana_users.keys()
|
||
organization.users.filter(user_id__in=user_ids_to_delete).delete()
|
||
|
||
# update existing users if any fields have changed
|
||
users_to_update = []
|
||
for user in organization.users.filter(user_id__in=existing_user_ids):
|
||
grafana_user = grafana_users[user.user_id]
|
||
g_user_role = getattr(LegacyAccessControlRole, grafana_user["role"].upper(), LegacyAccessControlRole.NONE)
|
||
|
||
if (
|
||
user.email != grafana_user["email"]
|
||
or user.name != grafana_user["name"]
|
||
or user.username != grafana_user["login"]
|
||
or user.role != g_user_role
|
||
or user.avatar_url != grafana_user["avatarUrl"]
|
||
# instead of looping through the array of permission objects, simply take the hash
|
||
# of the string representation of the data structures and compare.
|
||
# Need to first convert the lists of objects to strings because lists/dicts are not hashable
|
||
# (because lists and dicts are not hashable.. as they are mutable)
|
||
# https://stackoverflow.com/a/22003440
|
||
or hash(json.dumps(user.permissions)) != hash(json.dumps(grafana_user["permissions"]))
|
||
):
|
||
user.email = grafana_user["email"]
|
||
user.name = grafana_user["name"]
|
||
user.username = grafana_user["login"]
|
||
user.role = g_user_role
|
||
user.avatar_url = grafana_user["avatarUrl"]
|
||
user.permissions = grafana_user["permissions"]
|
||
users_to_update.append(user)
|
||
|
||
organization.users.bulk_update(
|
||
users_to_update, ["email", "name", "username", "role", "avatar_url", "permissions"], batch_size=5000
|
||
)
|
||
|
||
|
||
class UserQuerySet(models.QuerySet):
|
||
def filter(self, *args, **kwargs):
|
||
return super().filter(*args, **kwargs, is_active=True)
|
||
|
||
def filter_with_deleted(self, *args, **kwargs):
|
||
return super().filter(*args, **kwargs)
|
||
|
||
def delete(self):
|
||
# is_active = None is used to be able to have multiple deleted users with the same user_id
|
||
return super().update(is_active=None)
|
||
|
||
def hard_delete(self):
|
||
return super().delete()
|
||
|
||
|
||
class User(models.Model):
|
||
acknowledged_alert_groups: "RelatedManager['AlertGroup']"
|
||
auth_tokens: "RelatedManager['ApiAuthToken']"
|
||
current_team: typing.Optional["Team"]
|
||
escalation_policy_notify_queues: "RelatedManager['EscalationPolicy']"
|
||
last_notified_in_escalation_policies: "RelatedManager['EscalationPolicy']"
|
||
notification_policies: "RelatedManager['UserNotificationPolicy']"
|
||
organization: "Organization"
|
||
resolved_alert_groups: "RelatedManager['AlertGroup']"
|
||
schedule_export_token: "RelatedManager['ScheduleExportAuthToken']"
|
||
silenced_alert_groups: "RelatedManager['AlertGroup']"
|
||
slack_user_identity: typing.Optional["SlackUserIdentity"]
|
||
teams: "RelatedManager['Team']"
|
||
user_schedule_export_token: "RelatedManager['UserScheduleExportAuthToken']"
|
||
wiped_alert_groups: "RelatedManager['AlertGroup']"
|
||
|
||
objects = UserManager.from_queryset(UserQuerySet)()
|
||
|
||
class Meta:
|
||
# For some reason there are cases when Grafana user gets deleted,
|
||
# and then new Grafana user is created with the same user_id
|
||
# Including is_active to unique_together and setting is_active to None allows to
|
||
# have multiple deleted users with the same user_id, but user_id is unique among active users
|
||
unique_together = ("user_id", "organization", "is_active")
|
||
indexes = [
|
||
models.Index(fields=["is_active", "organization", "username"]),
|
||
models.Index(fields=["is_active", "organization", "email"]),
|
||
]
|
||
|
||
public_primary_key = models.CharField(
|
||
max_length=20,
|
||
validators=[MinLengthValidator(settings.PUBLIC_PRIMARY_KEY_MIN_LENGTH + 1)],
|
||
unique=True,
|
||
default=generate_public_primary_key_for_user,
|
||
)
|
||
|
||
user_id = models.PositiveIntegerField()
|
||
organization = models.ForeignKey(to="user_management.Organization", on_delete=models.CASCADE, related_name="users")
|
||
current_team = models.ForeignKey(
|
||
to="user_management.Team", null=True, default=None, on_delete=models.SET_NULL, related_name="current_team_users"
|
||
)
|
||
|
||
email = models.EmailField()
|
||
name = models.CharField(max_length=300)
|
||
username = models.CharField(max_length=300)
|
||
role = models.PositiveSmallIntegerField(choices=LegacyAccessControlRole.choices())
|
||
avatar_url = models.URLField()
|
||
|
||
# don't use "_timezone" directly, use the "timezone" property since it can be populated via slack user identity
|
||
_timezone = models.CharField(max_length=50, null=True, default=None)
|
||
working_hours = models.JSONField(null=True, default=default_working_hours)
|
||
|
||
notification = models.ManyToManyField(
|
||
"alerts.AlertGroup", through="alerts.UserHasNotification", related_name="users"
|
||
)
|
||
|
||
unverified_phone_number = models.CharField(max_length=20, null=True, default=None)
|
||
_verified_phone_number = models.CharField(max_length=20, null=True, default=None)
|
||
hide_phone_number = models.BooleanField(default=False)
|
||
|
||
slack_user_identity = models.ForeignKey(
|
||
"slack.SlackUserIdentity", on_delete=models.PROTECT, null=True, default=None, related_name="users"
|
||
)
|
||
|
||
# is_active = None is used to be able to have multiple deleted users with the same user_id
|
||
is_active = models.BooleanField(null=True, default=True)
|
||
permissions = models.JSONField(null=False, default=list)
|
||
|
||
def __str__(self):
|
||
return f"{self.pk}: {self.username}"
|
||
|
||
@property
|
||
def available_teams(self):
|
||
if self.role == LegacyAccessControlRole.ADMIN:
|
||
return self.organization.teams.all()
|
||
return self.organization.teams.filter(Q(is_sharing_resources_to_all=True) | Q(users=self)).distinct()
|
||
|
||
@property
|
||
def is_authenticated(self):
|
||
return True
|
||
|
||
@property
|
||
def avatar_full_url(self):
|
||
return urljoin(self.organization.grafana_url, self.avatar_url)
|
||
|
||
@property
|
||
def verified_phone_number(self):
|
||
"""
|
||
Use property to highlight that _verified_phone_number should not be modified directly
|
||
"""
|
||
return self._verified_phone_number
|
||
|
||
def save_verified_phone_number(self, phone_number: str) -> None:
|
||
self._verified_phone_number = phone_number
|
||
self.save(update_fields=["_verified_phone_number"])
|
||
|
||
def clear_phone_numbers(self) -> None:
|
||
self.unverified_phone_number = None
|
||
self._verified_phone_number = None
|
||
self.save(update_fields=["unverified_phone_number", "_verified_phone_number"])
|
||
|
||
# TODO: move to telegram app
|
||
def is_telegram_connected(self):
|
||
return hasattr(self, "telegram_connection")
|
||
|
||
def self_or_admin(self, user_to_check, organization) -> bool:
|
||
has_admin_permission = user_is_authorized(user_to_check, [RBACPermission.Permissions.USER_SETTINGS_ADMIN])
|
||
return user_to_check.pk == self.pk or (
|
||
has_admin_permission and organization.pk == user_to_check.organization_id
|
||
)
|
||
|
||
@property
|
||
def is_notification_allowed(self):
|
||
return user_is_authorized(self, [RBACPermission.Permissions.NOTIFICATIONS_READ])
|
||
|
||
def get_username_with_slack_verbal(self, mention=False) -> str:
|
||
slack_verbal = None
|
||
|
||
if self.slack_user_identity:
|
||
slack_verbal = (
|
||
f"<@{self.slack_user_identity.slack_id}>"
|
||
if mention
|
||
else f"@{self.slack_user_identity.profile_display_name or self.slack_user_identity.slack_verbal}"
|
||
)
|
||
|
||
if slack_verbal:
|
||
return f"{self.username} ({slack_verbal})"
|
||
|
||
return self.username
|
||
|
||
@property
|
||
def timezone(self) -> typing.Optional[str]:
|
||
if self._timezone:
|
||
return self._timezone
|
||
|
||
if self.slack_user_identity:
|
||
return self.slack_user_identity.timezone
|
||
|
||
return None
|
||
|
||
@timezone.setter
|
||
def timezone(self, value):
|
||
self._timezone = value
|
||
|
||
def is_in_working_hours(self, dt: datetime.datetime, tz: typing.Optional[str] = None) -> bool:
|
||
assert dt.tzinfo == datetime.timezone.utc, "dt must be in UTC"
|
||
|
||
# Default to user's timezone
|
||
if not tz:
|
||
tz = self.timezone
|
||
|
||
# If user has no timezone set, any time is considered non-working hours
|
||
if not tz:
|
||
return False
|
||
|
||
# Convert to user's timezone and get day name (e.g. monday)
|
||
dt = dt.astimezone(pytz.timezone(tz))
|
||
day_name = dt.date().strftime("%A").lower()
|
||
|
||
# If no working hours for the day, return False
|
||
if day_name not in self.working_hours or not self.working_hours[day_name]:
|
||
return False
|
||
|
||
# Extract start and end time for the day from working hours
|
||
day_start_time_str = self.working_hours[day_name][0]["start"]
|
||
day_start_time = datetime.time.fromisoformat(day_start_time_str)
|
||
|
||
day_end_time_str = self.working_hours[day_name][0]["end"]
|
||
day_end_time = datetime.time.fromisoformat(day_end_time_str)
|
||
|
||
# Calculate day start and end datetime
|
||
day_start = dt.replace(
|
||
hour=day_start_time.hour, minute=day_start_time.minute, second=day_start_time.second, microsecond=0
|
||
)
|
||
day_end = dt.replace(
|
||
hour=day_end_time.hour, minute=day_end_time.minute, second=day_end_time.second, microsecond=0
|
||
)
|
||
|
||
return day_start <= dt <= day_end
|
||
|
||
def short(self):
|
||
return {
|
||
"username": self.username,
|
||
"pk": self.public_primary_key,
|
||
"avatar": self.avatar_url,
|
||
"avatar_full": self.avatar_full_url,
|
||
}
|
||
|
||
# Insight logs
|
||
@property
|
||
def insight_logs_type_verbal(self):
|
||
return "user"
|
||
|
||
@property
|
||
def insight_logs_verbal(self):
|
||
return self.username
|
||
|
||
@property
|
||
def insight_logs_serialized(self):
|
||
from apps.base.models import UserNotificationPolicy
|
||
|
||
default, important = UserNotificationPolicy.get_short_verbals_for_user(user=self)
|
||
notification_policies_verbal = f"default: {' - '.join(default)}, important: {' - '.join(important)}"
|
||
notification_policies_verbal = demojize(notification_policies_verbal)
|
||
|
||
result = {
|
||
"username": self.username,
|
||
# LEGACY.. role should get removed eventually.. it's probably safe to remove it now?
|
||
"role": self.get_role_display(),
|
||
"notification_policies": notification_policies_verbal,
|
||
}
|
||
if self.verified_phone_number:
|
||
result["verified_phone_number"] = self.unverified_phone_number
|
||
if self.unverified_phone_number:
|
||
result["unverified_phone_number"] = self.unverified_phone_number
|
||
return result
|
||
|
||
@property
|
||
def insight_logs_metadata(self):
|
||
return {}
|
||
|
||
@staticmethod
|
||
def build_permissions_query(
|
||
permission: LegacyAccessControlCompatiblePermission, organization
|
||
) -> typing.Union[PermissionsQuery, PermissionsRegexQuery, RoleInQuery]:
|
||
"""
|
||
This method returns a django query filter that is compatible with RBAC
|
||
as well as legacy "basic" role based authorization. If a permission is provided we simply do
|
||
a regex search where the permission column contains the permission value (need to use regex because
|
||
the JSON contains method is not supported by sqlite)
|
||
|
||
If RBAC is not supported for the org, we make the assumption that we are looking for any users with AT LEAST
|
||
the fallback role. Ex: if the fallback role were editor than we would get editors and admins.
|
||
"""
|
||
if organization.is_rbac_permissions_enabled:
|
||
# https://stackoverflow.com/a/50251879
|
||
if settings.DATABASE_TYPE == settings.DATABASE_TYPES.SQLITE3:
|
||
# https://docs.djangoproject.com/en/4.2/topics/db/queries/#contains
|
||
return PermissionsRegexQuery(permissions__regex=re.escape(permission.value))
|
||
required_permission = {"action": permission.value}
|
||
return PermissionsQuery(permissions__contains=[required_permission])
|
||
return RoleInQuery(role__lte=permission.fallback_role.value)
|
||
|
||
def get_or_create_notification_policies(self, important=False):
|
||
if not self.notification_policies.filter(important=important).exists():
|
||
if important:
|
||
self.notification_policies.create_important_policies_for_user(self)
|
||
else:
|
||
self.notification_policies.create_default_policies_for_user(self)
|
||
notification_policies = self.notification_policies.filter(important=important)
|
||
return notification_policies
|
||
|
||
@property
|
||
def default_notification_policies_defaults(self):
|
||
from apps.base.models import UserNotificationPolicy
|
||
|
||
print(self)
|
||
|
||
return (
|
||
UserNotificationPolicy(
|
||
user=self,
|
||
step=UserNotificationPolicy.Step.NOTIFY,
|
||
notify_by=settings.EMAIL_BACKEND_INTERNAL_ID,
|
||
order=0,
|
||
),
|
||
)
|
||
|
||
@property
|
||
def important_notification_policies_defaults(self):
|
||
from apps.base.models import UserNotificationPolicy
|
||
|
||
return (
|
||
UserNotificationPolicy(
|
||
user=self,
|
||
step=UserNotificationPolicy.Step.NOTIFY,
|
||
notify_by=settings.EMAIL_BACKEND_INTERNAL_ID,
|
||
important=True,
|
||
order=0,
|
||
),
|
||
)
|
||
|
||
|
||
# TODO: check whether this signal can be moved to save method of the model
|
||
@receiver(post_save, sender=User)
|
||
def listen_for_user_model_save(sender: User, instance: User, created: bool, *args, **kwargs) -> None:
|
||
if created:
|
||
instance.notification_policies.create_default_policies_for_user(instance)
|
||
instance.notification_policies.create_important_policies_for_user(instance)
|
||
drop_cached_ical_for_custom_events_for_organization.apply_async(
|
||
(instance.organization_id,),
|
||
)
|