diff --git a/engine/apps/alerts/escalation_snapshot/escalation_snapshot_mixin.py b/engine/apps/alerts/escalation_snapshot/escalation_snapshot_mixin.py index 5a294a43..b202ceb0 100644 --- a/engine/apps/alerts/escalation_snapshot/escalation_snapshot_mixin.py +++ b/engine/apps/alerts/escalation_snapshot/escalation_snapshot_mixin.py @@ -1,6 +1,6 @@ import datetime import logging -from typing import Optional +import typing import pytz from celery import uuid as celery_uuid @@ -17,6 +17,9 @@ from apps.alerts.escalation_snapshot.snapshot_classes import ( ) from apps.alerts.tasks import escalate_alert_group +if typing.TYPE_CHECKING: + from apps.alerts.models import ChannelFilter + logger = logging.getLogger(__name__) # Is a delay to prevent intermediate activity by system in case user is doing some multi-step action. @@ -29,6 +32,11 @@ class EscalationSnapshotMixin: Mixin for AlertGroup. It contains methods related with alert group escalation """ + # TODO: add stricter typing + # TODO: should this class actually be an AbstractBaseClass instead? + raw_escalation_snapshot: dict | None + channel_filter: typing.Optional["ChannelFilter"] + def build_raw_escalation_snapshot(self) -> dict: """ Builds new escalation chain in a json serializable format (dict). @@ -91,7 +99,7 @@ class EscalationSnapshotMixin: data = {} if self.escalation_chain_exists: - channel_filter = self.channel_filter + channel_filter: "ChannelFilter" = self.channel_filter escalation_chain = channel_filter.escalation_chain escalation_policies = escalation_chain.escalation_policies.all() @@ -116,7 +124,7 @@ class EscalationSnapshotMixin: return self.escalation_chain_snapshot or (self.channel_filter.escalation_chain if self.channel_filter else None) @cached_property - def channel_filter_snapshot(self) -> Optional[ChannelFilterSnapshot]: + def channel_filter_snapshot(self) -> typing.Optional[ChannelFilterSnapshot]: """ in some cases we need only channel filter and don't want to serialize whole escalation """ @@ -132,7 +140,7 @@ class EscalationSnapshotMixin: return ChannelFilterSnapshot(**channel_filter_snapshot) @cached_property - def escalation_chain_snapshot(self) -> Optional[EscalationChainSnapshot]: + def escalation_chain_snapshot(self) -> typing.Optional[EscalationChainSnapshot]: """ in some cases we need only escalation chain and don't want to serialize whole escalation escalation_chain_snapshot_object = None @@ -149,7 +157,7 @@ class EscalationSnapshotMixin: return EscalationChainSnapshot(**escalation_chain_snapshot) @cached_property - def escalation_snapshot(self) -> Optional[EscalationSnapshot]: + def escalation_snapshot(self) -> typing.Optional[EscalationSnapshot]: raw_escalation_snapshot = self.raw_escalation_snapshot if raw_escalation_snapshot: try: @@ -207,7 +215,7 @@ class EscalationSnapshotMixin: return self.raw_escalation_snapshot.get("pause_escalation", False) @property - def next_step_eta(self) -> Optional[datetime.datetime]: + def next_step_eta(self) -> typing.Optional[datetime.datetime]: """ get next_step_eta field directly to avoid serialization overhead """ diff --git a/engine/apps/alerts/escalation_snapshot/snapshot_classes/escalation_policy_snapshot.py b/engine/apps/alerts/escalation_snapshot/snapshot_classes/escalation_policy_snapshot.py index 06b04e45..ef5c202e 100644 --- a/engine/apps/alerts/escalation_snapshot/snapshot_classes/escalation_policy_snapshot.py +++ b/engine/apps/alerts/escalation_snapshot/snapshot_classes/escalation_policy_snapshot.py @@ -117,7 +117,7 @@ class EscalationPolicySnapshot: return next_user def execute(self, alert_group: "AlertGroup", reason) -> StepExecutionResultData: - action_map: typing.Dict[typing.Union[int, None], EscalationPolicySnapshot.StepExecutionFunc] = { + action_map: typing.Dict[typing.Optional[int], EscalationPolicySnapshot.StepExecutionFunc] = { EscalationPolicy.STEP_WAIT: self._escalation_step_wait, EscalationPolicy.STEP_FINAL_NOTIFYALL: self._escalation_step_notify_all, EscalationPolicy.STEP_REPEAT_ESCALATION_N_TIMES: self._escalation_step_repeat_escalation_n_times, diff --git a/engine/apps/alerts/escalation_snapshot/snapshot_classes/escalation_snapshot.py b/engine/apps/alerts/escalation_snapshot/snapshot_classes/escalation_snapshot.py index c338b53d..aac6a5ec 100644 --- a/engine/apps/alerts/escalation_snapshot/snapshot_classes/escalation_snapshot.py +++ b/engine/apps/alerts/escalation_snapshot/snapshot_classes/escalation_snapshot.py @@ -92,7 +92,7 @@ class EscalationSnapshot: return [self.escalation_policies_snapshots[0]] return self.escalation_policies_snapshots[: self.last_active_escalation_policy_order] - def next_step_eta_is_valid(self) -> typing.Union[None, bool]: + def next_step_eta_is_valid(self) -> typing.Optional[bool]: """ `next_step_eta` should never be less than the current time (with a 5 minute buffer provided) as this field should be updated as the escalation policy is executed over time. If it is, this means that @@ -109,7 +109,8 @@ class EscalationSnapshot: self.alert_group.raw_escalation_snapshot = self.convert_to_dict() self.alert_group.save(update_fields=["raw_escalation_snapshot"]) - def convert_to_dict(self) -> dict: + # TODO: update the typing here, be more strict about what this returns + def convert_to_dict(self): return self.serializer(self).data def execute_actual_escalation_step(self) -> None: diff --git a/engine/apps/alerts/grafana_alerting_sync_manager/grafana_alerting_sync.py b/engine/apps/alerts/grafana_alerting_sync_manager/grafana_alerting_sync.py index 030daed5..109b2427 100644 --- a/engine/apps/alerts/grafana_alerting_sync_manager/grafana_alerting_sync.py +++ b/engine/apps/alerts/grafana_alerting_sync_manager/grafana_alerting_sync.py @@ -11,7 +11,8 @@ from apps.grafana_plugin.helpers import GrafanaAPIClient logger = logging.getLogger(__name__) if TYPE_CHECKING: - from apps.alerts.models import GrafanaAlertingContactPoint + from apps.alerts.models import AlertReceiveChannel, GrafanaAlertingContactPoint + from apps.user_management.models import Organization class GrafanaAlertingSyncManager: @@ -24,7 +25,7 @@ class GrafanaAlertingSyncManager: ALERTING_DATASOURCE = "alertmanager" IS_GRAFANA_VERSION_GRE_9 = None - def __init__(self, alert_receive_channel): + def __init__(self, alert_receive_channel: "AlertReceiveChannel") -> None: self.alert_receive_channel = alert_receive_channel self.client = GrafanaAPIClient( api_url=self.alert_receive_channel.organization.grafana_url, @@ -33,7 +34,7 @@ class GrafanaAlertingSyncManager: self.receiver_name = self.alert_receive_channel.emojized_verbal_name @classmethod - def check_for_connection_errors(cls, organization) -> Optional[str]: + def check_for_connection_errors(cls, organization: "Organization") -> Optional[str]: """Check if it possible to connect to alerting, otherwise return error message""" client = GrafanaAPIClient(api_url=organization.grafana_url, api_token=organization.api_token) recipient = cls.GRAFANA_CONTACT_POINT @@ -561,7 +562,7 @@ class GrafanaAlertingSyncManager: break return name_in_alerting - def get_datasource_name(self, contact_point) -> str: + def get_datasource_name(self, contact_point: "GrafanaAlertingContactPoint") -> str: datasource_id = contact_point.datasource_id datasource_uid = contact_point.datasource_uid datasource, response_info = self.client.get_datasource(datasource_uid) diff --git a/engine/apps/alerts/incident_appearance/templaters/alert_templater.py b/engine/apps/alerts/incident_appearance/templaters/alert_templater.py index b5074ef6..0adb0879 100644 --- a/engine/apps/alerts/incident_appearance/templaters/alert_templater.py +++ b/engine/apps/alerts/incident_appearance/templaters/alert_templater.py @@ -65,10 +65,10 @@ class TemplateLoader: @dataclass class TemplatedAlert: - title: str = None - message: str = None - image_url: str = None - source_link: str = None + title: str | None = None + message: str | None = None + image_url: str | None = None + source_link: str | None = None class AlertTemplater(ABC): @@ -160,7 +160,7 @@ class AlertTemplater(ABC): return templated_alert - def _render_attribute_with_template(self, attr, data, channel, templated_alert): + def _render_attribute_with_template(self, attr, data, channel, templated_alert: TemplatedAlert) -> str | None: """ Get attr template and then apply it. If attr template is None or invalid will return None. @@ -212,5 +212,5 @@ class AlertTemplater(ABC): return None @abstractmethod - def _render_for(self): + def _render_for(self) -> str: raise NotImplementedError diff --git a/engine/apps/alerts/models/alert_group.py b/engine/apps/alerts/models/alert_group.py index 3e732c28..a9552849 100644 --- a/engine/apps/alerts/models/alert_group.py +++ b/engine/apps/alerts/models/alert_group.py @@ -1,10 +1,10 @@ import datetime import logging +import typing import urllib from collections import namedtuple -from typing import Optional, TypedDict from urllib.parse import urljoin -from uuid import uuid1 +from uuid import UUID, uuid1 from celery import uuid as celery_uuid from django.apps import apps @@ -33,6 +33,11 @@ from common.utils import clean_markup, str_or_backup from .alert_group_counter import AlertGroupCounter +if typing.TYPE_CHECKING: + from django.db.models.manager import RelatedManager + + from apps.alerts.models import AlertGroupLogRecord + logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -51,9 +56,9 @@ def generate_public_primary_key_for_alert_group(): return new_public_primary_key -class Permalinks(TypedDict): - slack: Optional[str] - telegram: Optional[str] +class Permalinks(typing.TypedDict): + slack: typing.Optional[str] + telegram: typing.Optional[str] web: str @@ -133,6 +138,8 @@ class AlertGroupSlackRenderingMixin: class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.Model): + log_records: "RelatedManager['AlertGroupLogRecord']" + all_objects = AlertGroupQuerySet.as_manager() unarchived_objects = UnarchivedAlertGroupQuerySet.as_manager() @@ -324,7 +331,9 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. cached_render_for_web = models.JSONField(default=dict) active_cache_for_web_calculation_id = models.CharField(max_length=100, null=True, default=None) - last_unique_unacknowledge_process_id = models.CharField(max_length=100, null=True, default=None) + # NOTE: we should probably migrate this field to models.UUIDField as it's ONLY ever being + # set to the result of uuid.uuid1 + last_unique_unacknowledge_process_id: UUID | None = models.CharField(max_length=100, null=True, default=None) is_archived = models.BooleanField(default=False) wiped_at = models.DateTimeField(null=True, default=None) @@ -457,11 +466,11 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. raise NotImplementedError @property - def slack_permalink(self) -> Optional[str]: + def slack_permalink(self) -> typing.Optional[str]: return None if self.slack_message is None else self.slack_message.permalink @property - def telegram_permalink(self) -> Optional[str]: + def telegram_permalink(self) -> typing.Optional[str]: """ This property will attempt to access an attribute, `prefetched_telegram_messages`, representing a list of prefetched telegram messages. If this attribute does not exist, it falls back to performing a query. @@ -529,7 +538,7 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. started_at=self.started_at, ) - def acknowledge_by_user(self, user: User, action_source: Optional[str] = None) -> None: + def acknowledge_by_user(self, user: User, action_source: typing.Optional[str] = None) -> None: AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord") initial_state = self.state logger.debug(f"Started acknowledge_by_user for alert_group {self.pk}") @@ -611,7 +620,7 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. for dependent_alert_group in self.dependent_alert_groups.all(): dependent_alert_group.acknowledge_by_source() - def un_acknowledge_by_user(self, user: User, action_source: Optional[str] = None) -> None: + def un_acknowledge_by_user(self, user: User, action_source: typing.Optional[str] = None) -> None: AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord") initial_state = self.state logger.debug(f"Started un_acknowledge_by_user for alert_group {self.pk}") @@ -639,7 +648,7 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. dependent_alert_group.un_acknowledge_by_user(user, action_source=action_source) logger.debug(f"Finished un_acknowledge_by_user for alert_group {self.pk}") - def resolve_by_user(self, user: User, action_source: Optional[str] = None) -> None: + def resolve_by_user(self, user: User, action_source: typing.Optional[str] = None) -> None: AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord") initial_state = self.state @@ -786,7 +795,7 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. for dependent_alert_group in self.dependent_alert_groups.all(): dependent_alert_group.resolve_by_disable_maintenance() - def un_resolve_by_user(self, user: User, action_source: Optional[str] = None) -> None: + def un_resolve_by_user(self, user: User, action_source: typing.Optional[str] = None) -> None: AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord") if self.wiped_at is None: @@ -815,7 +824,9 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. for dependent_alert_group in self.dependent_alert_groups.all(): dependent_alert_group.un_resolve_by_user(user, action_source=action_source) - def attach_by_user(self, user: User, root_alert_group: "AlertGroup", action_source: Optional[str] = None) -> None: + def attach_by_user( + self, user: User, root_alert_group: "AlertGroup", action_source: typing.Optional[str] = None + ) -> None: AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord") if root_alert_group.root_alert_group is None and not root_alert_group.resolved: @@ -891,10 +902,10 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. action_source=action_source, ) - def un_attach_by_user(self, user: User, action_source: Optional[str] = None) -> None: + def un_attach_by_user(self, user: User, action_source: typing.Optional[str] = None) -> None: AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord") - root_alert_group = self.root_alert_group + root_alert_group: AlertGroup = self.root_alert_group self.root_alert_group = None self.save(update_fields=["root_alert_group"]) @@ -963,7 +974,9 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. action_source=None, ) - def silence_by_user(self, user: User, silence_delay: Optional[int], action_source: Optional[str] = None) -> None: + def silence_by_user( + self, user: User, silence_delay: typing.Optional[int], action_source: typing.Optional[str] = None + ) -> None: AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord") initial_state = self.state @@ -1020,7 +1033,7 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. for dependent_alert_group in self.dependent_alert_groups.all(): dependent_alert_group.silence_by_user(user, silence_delay, action_source) - def un_silence_by_user(self, user: User, action_source: Optional[str] = None) -> None: + def un_silence_by_user(self, user: User, action_source: typing.Optional[str] = None) -> None: AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord") initial_state = self.state @@ -1322,7 +1335,10 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models. if not root_alert_groups_to_resolve.exists(): return - organization = root_alert_groups_to_resolve.first().channel.organization + # we know this is an AlertGroup because of the .exists() check just above + first_alert_group: AlertGroup = root_alert_groups_to_resolve.first() + + organization = first_alert_group.channel.organization if organization.is_resolution_note_required: root_alert_groups_to_resolve = root_alert_groups_to_resolve.filter( Q(resolution_notes__isnull=False, resolution_notes__deleted_at=None) diff --git a/engine/apps/alerts/models/alert_receive_channel.py b/engine/apps/alerts/models/alert_receive_channel.py index 4f3222c4..5d17e86f 100644 --- a/engine/apps/alerts/models/alert_receive_channel.py +++ b/engine/apps/alerts/models/alert_receive_channel.py @@ -1,4 +1,5 @@ import logging +import typing from functools import cached_property from urllib.parse import urljoin @@ -37,6 +38,11 @@ from common.insight_log import EntityEvent, write_resource_insight_log from common.jinja_templater import jinja_template_env from common.public_primary_keys import generate_public_primary_key, increase_public_primary_key_length +if typing.TYPE_CHECKING: + from django.db.models.manager import RelatedManager + + from apps.alerts.models import GrafanaAlertingContactPoint + logger = logging.getLogger(__name__) @@ -108,6 +114,8 @@ class AlertReceiveChannel(IntegrationOptionsMixin, MaintainableObject): Channel generated by user to receive Alerts to. """ + contact_points: "RelatedManager['GrafanaAlertingContactPoint']" + objects = AlertReceiveChannelManager() objects_with_maintenance = AlertReceiveChannelManagerWithMaintenance() objects_with_deleted = models.Manager() @@ -609,7 +617,9 @@ class AlertReceiveChannel(IntegrationOptionsMixin, MaintainableObject): @receiver(post_save, sender=AlertReceiveChannel) -def listen_for_alertreceivechannel_model_save(sender, instance, created, *args, **kwargs): +def listen_for_alertreceivechannel_model_save( + sender: AlertReceiveChannel, instance: AlertReceiveChannel, created: bool, *args, **kwargs +) -> None: ChannelFilter = apps.get_model("alerts", "ChannelFilter") IntegrationHeartBeat = apps.get_model("heartbeat", "IntegrationHeartBeat") diff --git a/engine/apps/alerts/models/escalation_policy.py b/engine/apps/alerts/models/escalation_policy.py index 3ae20958..1bd70f37 100644 --- a/engine/apps/alerts/models/escalation_policy.py +++ b/engine/apps/alerts/models/escalation_policy.py @@ -1,7 +1,8 @@ +import datetime + from django.conf import settings from django.core.validators import MinLengthValidator from django.db import models -from django.utils import timezone from ordered_model.models import OrderedModel from common.public_primary_keys import generate_public_primary_key, increase_public_primary_key_length @@ -271,13 +272,13 @@ class EscalationPolicy(OrderedModel): null=True, ) - ONE_MINUTE = timezone.timedelta(minutes=1) - FIVE_MINUTES = timezone.timedelta(minutes=5) - FIFTEEN_MINUTES = timezone.timedelta(minutes=15) - THIRTY_MINUTES = timezone.timedelta(minutes=30) - HOUR = timezone.timedelta(minutes=60) + ONE_MINUTE = datetime.timedelta(minutes=1) + FIVE_MINUTES = datetime.timedelta(minutes=5) + FIFTEEN_MINUTES = datetime.timedelta(minutes=15) + THIRTY_MINUTES = datetime.timedelta(minutes=30) + HOUR = datetime.timedelta(minutes=60) - DEFAULT_WAIT_DELAY = timezone.timedelta(minutes=5) + DEFAULT_WAIT_DELAY = datetime.timedelta(minutes=5) DURATION_CHOICES = ( (ONE_MINUTE, "1 min"), diff --git a/engine/apps/alerts/models/maintainable_object.py b/engine/apps/alerts/models/maintainable_object.py index 299c59b9..f32f6c35 100644 --- a/engine/apps/alerts/models/maintainable_object.py +++ b/engine/apps/alerts/models/maintainable_object.py @@ -1,3 +1,4 @@ +import datetime from uuid import uuid4 import humanize @@ -14,11 +15,11 @@ class MaintainableObject(models.Model): class Meta: abstract = True - DURATION_ONE_HOUR = timezone.timedelta(hours=1) - DURATION_THREE_HOURS = timezone.timedelta(hours=3) - DURATION_SIX_HOURS = timezone.timedelta(hours=6) - DURATION_TWELVE_HOURS = timezone.timedelta(hours=12) - DURATION_TWENTY_FOUR_HOURS = timezone.timedelta(hours=24) + DURATION_ONE_HOUR = datetime.timedelta(hours=1) + DURATION_THREE_HOURS = datetime.timedelta(hours=3) + DURATION_SIX_HOURS = datetime.timedelta(hours=6) + DURATION_TWELVE_HOURS = datetime.timedelta(hours=12) + DURATION_TWENTY_FOUR_HOURS = datetime.timedelta(hours=24) MAINTENANCE_DURATION_CHOICES = ( (DURATION_ONE_HOUR, "1 hour"), @@ -97,7 +98,7 @@ class MaintainableObject(models.Model): maintenance_uuid = _self.start_disable_maintenance_task(maintenance_duration) - _self.maintenance_duration = timezone.timedelta(seconds=maintenance_duration) + _self.maintenance_duration = datetime.timedelta(seconds=maintenance_duration) _self.maintenance_uuid = maintenance_uuid _self.maintenance_mode = mode _self.maintenance_started_at = timezone.now() diff --git a/engine/apps/alerts/tasks/escalate_alert_group.py b/engine/apps/alerts/tasks/escalate_alert_group.py index 2ac2942f..52caa588 100644 --- a/engine/apps/alerts/tasks/escalate_alert_group.py +++ b/engine/apps/alerts/tasks/escalate_alert_group.py @@ -1,7 +1,7 @@ from django.apps import apps from django.conf import settings from django.db import transaction -from kombu import uuid as celery_uuid +from kombu.utils.uuid import uuid as celery_uuid from common.custom_celery_tasks import shared_dedicated_queue_retry_task diff --git a/engine/apps/alerts/tasks/notify_user.py b/engine/apps/alerts/tasks/notify_user.py index e41a4bcf..58422b9a 100644 --- a/engine/apps/alerts/tasks/notify_user.py +++ b/engine/apps/alerts/tasks/notify_user.py @@ -4,7 +4,7 @@ from django.apps import apps from django.conf import settings from django.db import transaction from django.utils import timezone -from kombu import uuid as celery_uuid +from kombu.utils.uuid import uuid as celery_uuid from apps.alerts.constants import NEXT_ESCALATION_DELAY from apps.alerts.signals import user_notification_action_triggered_signal diff --git a/engine/apps/api/permissions.py b/engine/apps/api/permissions.py index 0d77ec8d..7b87fc30 100644 --- a/engine/apps/api/permissions.py +++ b/engine/apps/api/permissions.py @@ -2,6 +2,7 @@ import enum import typing from django.conf import settings +from django.contrib.auth.models import AbstractUser from rest_framework import permissions from rest_framework.authentication import BasicAuthentication, SessionAuthentication from rest_framework.request import Request @@ -10,6 +11,9 @@ from rest_framework.viewsets import ViewSet, ViewSetMixin from common.utils import getattrd +if typing.TYPE_CHECKING: + from apps.user_management.models import User + ACTION_PREFIX = "grafana-oncall-app" RBAC_PERMISSIONS_ATTR = "rbac_permissions" RBAC_OBJECT_PERMISSIONS_ATTR = "rbac_object_permissions" @@ -17,6 +21,31 @@ RBAC_OBJECT_PERMISSIONS_ATTR = "rbac_object_permissions" ViewSetOrAPIView = typing.Union[ViewSet, APIView] +class AuthenticatedRequest(Request): + """ + Use this for typing, instead of rest_framework.request.Request, when you KNOW that the user is authenticated. + ex. In the RBACPermission class below, we know that the user is authenticated because this is handled by the + `authentication_classes` attribute on views. + + https://github.com/typeddjango/django-stubs#how-can-i-create-a-httprequest-thats-guaranteed-to-have-an-authenticated-user + """ + + # see comment above, this is safe. without the type-ignore comment, mypy complains + # expression has type "User", base class "Request" defined the type as "Union[AbstractBaseUser, AnonymousUser]" + user: "User" # type: ignore[assignment] + + +class AuthenticatedDjangoAdminRequest(Request): + """ + Use this for typing, instead of rest_framework.request.Request, when you KNOW that the user is authenticated via + Django admin user authentication. + + https://github.com/typeddjango/django-stubs#how-can-i-create-a-httprequest-thats-guaranteed-to-have-an-authenticated-user + """ + + user: AbstractUser + + class GrafanaAPIPermission(typing.TypedDict): action: str @@ -62,9 +91,12 @@ class LegacyAccessControlCompatiblePermission: self.fallback_role = fallback_role -def get_most_authorized_role( - permissions: typing.List[LegacyAccessControlCompatiblePermission], -) -> LegacyAccessControlRole: +LegacyAccessControlCompatiblePermissions = typing.List[LegacyAccessControlCompatiblePermission] +RBACPermissionsAttribute = typing.Dict[str, LegacyAccessControlCompatiblePermissions] +RBACObjectPermissionsAttribute = typing.Dict[permissions.BasePermission, typing.List[str]] + + +def get_most_authorized_role(permissions: LegacyAccessControlCompatiblePermissions) -> LegacyAccessControlRole: if not permissions: return LegacyAccessControlRole.VIEWER @@ -72,22 +104,18 @@ def get_most_authorized_role( return min({p.fallback_role for p in permissions}, key=lambda r: r.value) -def user_is_authorized(user, required_permissions: typing.List[LegacyAccessControlCompatiblePermission]) -> bool: +def user_is_authorized(user: "User", required_permissions: LegacyAccessControlCompatiblePermissions) -> bool: """ This function checks whether `user` has all permissions in `required_permissions`. RBAC permissions are used if RBAC is enabled for the organization, otherwise the fallback basic role is checked. - Parameters - ---------- - user : apps.user_management.models.user.User - The user to check permissions for - required_permissions : typing.List[LegacyAccessControlCompatiblePermission] - A list of permissions that a user must have to be considered authorized + user - The user to check permissions for + required_permissions - A list of permissions that a user must have to be considered authorized """ if user.organization.is_rbac_permissions_enabled: user_permissions = [u["action"] for u in user.permissions] - required_permissions = [p.value for p in required_permissions] - return all(permission in user_permissions for permission in required_permissions) + required_permission_values = [p.value for p in required_permissions] + return all(permission in user_permissions for permission in required_permission_values) return user.role <= get_most_authorized_role(required_permissions).value @@ -187,15 +215,18 @@ class RBACPermission(permissions.BasePermission): ) @staticmethod - def _get_view_action(request: Request, view: ViewSetOrAPIView) -> str: + def _get_view_action(request: AuthenticatedRequest, view: ViewSetOrAPIView) -> str: """ For right now this needs to support being used in both a ViewSet as well as APIView, we use both interchangably Note: `request.method` is returned uppercase """ - return view.action if isinstance(view, ViewSetMixin) else request.method.lower() + return view.action if isinstance(view, ViewSetMixin) else (request.method or "").lower() - def has_permission(self, request: Request, view: ViewSetOrAPIView) -> bool: + # mypy complains about "Liskov substitution principle" here because request is `AuthenticatedRequest` object + # and not rest_framework.request.Request + # https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides + def has_permission(self, request: AuthenticatedRequest, view: ViewSetOrAPIView) -> bool: # type: ignore[override] # the django-debug-toolbar UI makes OPTIONS calls. Without this statement the debug UI can't gather the # necessary info it needs to work properly if settings.DEBUG and request.method == "OPTIONS": @@ -203,14 +234,14 @@ class RBACPermission(permissions.BasePermission): action = self._get_view_action(request, view) - rbac_permissions: RBACPermissionsAttribute = getattr(view, RBAC_PERMISSIONS_ATTR, None) + rbac_permissions: typing.Optional[RBACPermissionsAttribute] = getattr(view, RBAC_PERMISSIONS_ATTR, None) # first check that the rbac_permissions dict attribute is defined assert ( rbac_permissions is not None ), f"Must define a {RBAC_PERMISSIONS_ATTR} dict on the ViewSet that is consuming the RBACPermission class" - action_required_permissions: typing.Union[None, typing.List] = rbac_permissions.get(action, None) + action_required_permissions: typing.Optional[typing.List] = rbac_permissions.get(action, None) # next check that the action in question is defined within the rbac_permissions dict attribute assert ( @@ -220,8 +251,13 @@ class RBACPermission(permissions.BasePermission): return user_is_authorized(request.user, action_required_permissions) - def has_object_permission(self, request: Request, view: ViewSetOrAPIView, obj: typing.Any) -> bool: - rbac_object_permissions: RBACObjectPermissionsAttribute = getattr(view, RBAC_OBJECT_PERMISSIONS_ATTR, None) + # mypy complains about "Liskov substitution principle" here because request is `AuthenticatedRequest` object + # and not rest_framework.request.Request + # https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides + def has_object_permission(self, request: AuthenticatedRequest, view: ViewSetOrAPIView, obj: typing.Any) -> bool: # type: ignore[override] + rbac_object_permissions: typing.Optional[RBACObjectPermissionsAttribute] = getattr( + view, RBAC_OBJECT_PERMISSIONS_ATTR, None + ) if rbac_object_permissions: action = self._get_view_action(request, view) @@ -250,35 +286,45 @@ def get_permission_from_permission_string(perm: str) -> typing.Optional[LegacyAc for permission_class in ALL_PERMISSION_CLASSES: if permission_class.value == perm: return permission_class + return None class IsOwner(permissions.BasePermission): def __init__(self, ownership_field: typing.Optional[str] = None) -> None: self.ownership_field = ownership_field - def has_object_permission(self, request: Request, _view: ViewSet, obj: typing.Any) -> bool: + # mypy complains about "Liskov substitution principle" here because request is `AuthenticatedRequest` object + # and not rest_framework.request.Request + # https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides + def has_object_permission(self, request: AuthenticatedRequest, _view: ViewSetOrAPIView, obj: typing.Any) -> bool: # type: ignore[override] owner = obj if self.ownership_field is None else getattrd(obj, self.ownership_field) return owner == request.user class HasRBACPermissions(permissions.BasePermission): - def __init__(self, required_permissions: typing.List[LegacyAccessControlCompatiblePermission]) -> None: + def __init__(self, required_permissions: LegacyAccessControlCompatiblePermissions) -> None: self.required_permissions = required_permissions - def has_object_permission(self, request: Request, _view: ViewSetOrAPIView, _obj: typing.Any) -> bool: + # mypy complains about "Liskov substitution principle" here because request is `AuthenticatedRequest` object + # and not rest_framework.request.Request + # https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides + def has_object_permission(self, request: AuthenticatedRequest, _view: ViewSetOrAPIView, _obj: typing.Any) -> bool: # type: ignore[override] return user_is_authorized(request.user, self.required_permissions) class IsOwnerOrHasRBACPermissions(permissions.BasePermission): def __init__( self, - required_permissions: typing.List[LegacyAccessControlCompatiblePermission], + required_permissions: LegacyAccessControlCompatiblePermissions, ownership_field: typing.Optional[str] = None, ) -> None: self.IsOwner = IsOwner(ownership_field) self.HasRBACPermissions = HasRBACPermissions(required_permissions) - def has_object_permission(self, request: Request, view: ViewSetOrAPIView, obj: typing.Any) -> bool: + # mypy complains about "Liskov substitution principle" here because request is `AuthenticatedRequest` object + # and not rest_framework.request.Request + # https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides + def has_object_permission(self, request: AuthenticatedRequest, view: ViewSetOrAPIView, obj: typing.Any) -> bool: # type: ignore[override] return self.IsOwner.has_object_permission(request, view, obj) or self.HasRBACPermissions.has_object_permission( request, view, obj ) @@ -287,14 +333,13 @@ class IsOwnerOrHasRBACPermissions(permissions.BasePermission): class IsStaff(permissions.BasePermission): STAFF_AUTH_CLASSES = [BasicAuthentication, SessionAuthentication] - def has_permission(self, request: Request, _view: ViewSet) -> bool: + # mypy complains about "Liskov substitution principle" here because request is `AuthenticatedRequest` object + # and not rest_framework.request.Request + # https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides + def has_permission(self, request: AuthenticatedDjangoAdminRequest, _view: ViewSet) -> bool: # type: ignore[override] user = request.user if not any(isinstance(request._authenticator, x) for x in self.STAFF_AUTH_CLASSES): return False if user and user.is_authenticated: return user.is_staff return False - - -RBACPermissionsAttribute = typing.Dict[str, typing.List[LegacyAccessControlCompatiblePermission]] -RBACObjectPermissionsAttribute = typing.Dict[permissions.BasePermission, typing.List[str]] diff --git a/engine/apps/api/serializers/escalation_policy.py b/engine/apps/api/serializers/escalation_policy.py index 0286105a..58f822ae 100644 --- a/engine/apps/api/serializers/escalation_policy.py +++ b/engine/apps/api/serializers/escalation_policy.py @@ -101,7 +101,7 @@ class EscalationPolicySerializer(EagerLoadingMixin, serializers.ModelSerializer) "notify_to_group", "important", ] - read_only_fields = ("order",) + read_only_fields = ["order"] SELECT_RELATED = [ "escalation_chain", @@ -199,7 +199,7 @@ class EscalationPolicySerializer(EagerLoadingMixin, serializers.ModelSerializer) class EscalationPolicyCreateSerializer(EscalationPolicySerializer): class Meta(EscalationPolicySerializer.Meta): - read_only_fields = ("order",) + read_only_fields = ["order"] extra_kwargs = {"escalation_chain": {"required": True, "allow_null": False}} def create(self, validated_data): @@ -212,7 +212,7 @@ class EscalationPolicyUpdateSerializer(EscalationPolicySerializer): escalation_chain = serializers.CharField(read_only=True, source="escalation_chain.public_primary_key") class Meta(EscalationPolicySerializer.Meta): - read_only_fields = ("order", "escalation_chain") + read_only_fields = ["order", "escalation_chain"] def update(self, instance, validated_data): step = validated_data.get("step", instance.step) diff --git a/engine/apps/api/serializers/on_call_shifts.py b/engine/apps/api/serializers/on_call_shifts.py index 197797c4..db411f8c 100644 --- a/engine/apps/api/serializers/on_call_shifts.py +++ b/engine/apps/api/serializers/on_call_shifts.py @@ -213,7 +213,7 @@ class OnCallShiftUpdateSerializer(OnCallShiftSerializer): type = serializers.ReadOnlyField() class Meta(OnCallShiftSerializer.Meta): - read_only_fields = ("schedule", "type") + read_only_fields = ["schedule", "type"] def update(self, instance, validated_data): validated_data = self._correct_validated_data(instance.type, validated_data) diff --git a/engine/apps/api/serializers/team.py b/engine/apps/api/serializers/team.py index 5b42a29f..f62af606 100644 --- a/engine/apps/api/serializers/team.py +++ b/engine/apps/api/serializers/team.py @@ -16,9 +16,9 @@ class TeamSerializer(serializers.ModelSerializer): "is_sharing_resources_to_all", ) - read_only_fields = ( + read_only_fields = [ "id", "name", "email", "avatar_url", - ) + ] diff --git a/engine/apps/api/serializers/user_notification_policy.py b/engine/apps/api/serializers/user_notification_policy.py index ba3e7172..c9acb7f0 100644 --- a/engine/apps/api/serializers/user_notification_policy.py +++ b/engine/apps/api/serializers/user_notification_policy.py @@ -100,7 +100,7 @@ class UserNotificationPolicyUpdateSerializer(UserNotificationPolicyBaseSerialize ) class Meta(UserNotificationPolicyBaseSerializer.Meta): - read_only_fields = ("order", "user", "important") + read_only_fields = ["order", "user", "important"] def update(self, instance, validated_data): self_or_admin = instance.user.self_or_admin( diff --git a/engine/apps/auth_token/models/schedule_export_auth_token.py b/engine/apps/auth_token/models/schedule_export_auth_token.py index e141b39f..ffae910f 100644 --- a/engine/apps/auth_token/models/schedule_export_auth_token.py +++ b/engine/apps/auth_token/models/schedule_export_auth_token.py @@ -1,4 +1,4 @@ -from typing import Tuple +import typing from django.db import models @@ -25,8 +25,8 @@ class ScheduleExportAuthToken(BaseAuthToken): @classmethod def create_auth_token( - cls, user: User, organization: Organization, schedule: OnCallSchedule = None - ) -> Tuple["ScheduleExportAuthToken", str]: + cls, user: User, organization: Organization, schedule: typing.Optional[OnCallSchedule] = None + ) -> typing.Tuple["ScheduleExportAuthToken", str]: token_string = crypto.generate_schedule_token_string() digest = crypto.hash_token_string(token_string) diff --git a/engine/apps/base/models/user_notification_policy.py b/engine/apps/base/models/user_notification_policy.py index 7e93af63..3a39b96f 100644 --- a/engine/apps/base/models/user_notification_policy.py +++ b/engine/apps/base/models/user_notification_policy.py @@ -163,7 +163,7 @@ class UserNotificationPolicy(OrderedModel): return f"{self.pk}: {self.short_verbal}" @classmethod - def get_short_verbals_for_user(cls, user: User) -> Tuple[Tuple[str], Tuple[str]]: + def get_short_verbals_for_user(cls, user: User) -> Tuple[Tuple[str, ...], Tuple[str, ...]]: is_wait_step = Q(step=cls.Step.WAIT) is_wait_step_configured = Q(wait_delay__isnull=False) diff --git a/engine/apps/grafana_plugin/helpers/client.py b/engine/apps/grafana_plugin/helpers/client.py index 628244da..c02cddbc 100644 --- a/engine/apps/grafana_plugin/helpers/client.py +++ b/engine/apps/grafana_plugin/helpers/client.py @@ -1,20 +1,19 @@ import json import logging import time -from typing import Dict, List, Optional, Tuple, TypedDict +import typing from urllib.parse import urljoin import requests from django.conf import settings from rest_framework import status -from rest_framework.response import Response from apps.api.permissions import ACTION_PREFIX, GrafanaAPIPermission logger = logging.getLogger(__name__) -class GrafanaUser(TypedDict): +class GrafanaUser(typing.TypedDict): orgId: int userId: int email: str @@ -27,18 +26,22 @@ class GrafanaUser(TypedDict): class GrafanaUserWithPermissions(GrafanaUser): - permissions: List[GrafanaAPIPermission] + permissions: typing.List[GrafanaAPIPermission] -class GCOMInstanceInfoConfigFeatureToggles(TypedDict): +GrafanaUsersWithPermissions = typing.List[GrafanaUserWithPermissions] +UserPermissionsDict = typing.Dict[str, typing.List[GrafanaAPIPermission]] + + +class GCOMInstanceInfoConfigFeatureToggles(typing.TypedDict): accessControlOnCall: str -class GCOMInstanceInfoConfig(TypedDict): +class GCOMInstanceInfoConfig(typing.TypedDict): feature_toggles: GCOMInstanceInfoConfigFeatureToggles -class GCOMInstanceInfo(TypedDict): +class GCOMInstanceInfo(typing.TypedDict): id: int orgId: int slug: str @@ -47,26 +50,66 @@ class GCOMInstanceInfo(TypedDict): url: str status: str clusterSlug: str - config: Optional[GCOMInstanceInfoConfig] + config: GCOMInstanceInfoConfig | None + + +class ApiClientResponseCallStatus(typing.TypedDict): + url: str + connected: bool + status_code: int + message: str + + +# TODO: come back and make the typing.Dict strongly typed once we switch to Python 3.12 +# which has better support for generics +_APIClientResponse = typing.Optional[typing.Dict | typing.List] +APIClientResponse = typing.Tuple[_APIClientResponse, ApiClientResponseCallStatus] + + +# can't define this using class syntax because one of the keys contains a dash +# https://docs.python.org/3/library/typing.html#typing.TypedDict:~:text=The%20functional%20syntax%20should%20also%20be%20used%20when%20any%20of%20the%20keys%20are%20not%20valid%20identifiers%2C%20for%20example%20because%20they%20are%20keywords%20or%20contain%20hyphens.%20Example%3A +APIRequestHeaders = typing.TypedDict( + "APIRequestHeaders", + { + "User-Agent": str, + "Authorization": str, + }, +) + + +class HttpMethod(typing.Protocol): + """ + TODO: can probably replace this with something from the requests library? + https://github.com/psf/requests/blob/main/requests/api.py#L14 + """ + + @property + def __name__(self) -> str: + ... + + def __call__(self, *args, **kwargs) -> requests.Response: + ... class APIClient: - def __init__(self, api_url: str, api_token: str): + def __init__(self, api_url: str, api_token: str) -> None: self.api_url = api_url self.api_token = api_token - def api_head(self, endpoint: str, body: dict = None, **kwargs) -> Tuple[Optional[Response], dict]: + def api_head(self, endpoint: str, body: typing.Optional[typing.Dict] = None, **kwargs) -> APIClientResponse: return self.call_api(endpoint, requests.head, body, **kwargs) - def api_get(self, endpoint: str, **kwargs) -> Tuple[Optional[Response], dict]: + def api_get(self, endpoint: str, **kwargs) -> APIClientResponse: return self.call_api(endpoint, requests.get, **kwargs) - def api_post(self, endpoint: str, body: dict = None, **kwargs) -> Tuple[Optional[Response], dict]: + def api_post(self, endpoint: str, body: typing.Optional[typing.Dict] = None, **kwargs) -> APIClientResponse: return self.call_api(endpoint, requests.post, body, **kwargs) - def call_api(self, endpoint: str, http_method, body: dict = None, **kwargs) -> Tuple[Optional[Response], dict]: + def call_api( + self, endpoint: str, http_method: HttpMethod, body: typing.Optional[typing.Dict] = None, **kwargs + ) -> APIClientResponse: request_start = time.perf_counter() - call_status = { + call_status: ApiClientResponseCallStatus = { "url": urljoin(self.api_url, endpoint), "connected": False, "status_code": status.HTTP_503_SERVICE_UNAVAILABLE, @@ -108,20 +151,20 @@ class APIClient: return None, call_status @property - def request_headers(self) -> dict: + def request_headers(self) -> APIRequestHeaders: return {"User-Agent": settings.GRAFANA_COM_USER_AGENT, "Authorization": f"Bearer {self.api_token}"} class GrafanaAPIClient(APIClient): USER_PERMISSION_ENDPOINT = f"api/access-control/users/permissions/search?actionPrefix={ACTION_PREFIX}" - def __init__(self, api_url: str, api_token: str): + def __init__(self, api_url: str, api_token: str) -> None: super().__init__(api_url, api_token) - def check_token(self) -> Tuple[Optional[Response], dict]: + def check_token(self) -> APIClientResponse: return self.api_head("api/org") - def get_users_permissions(self, rbac_is_enabled_for_org: bool) -> Dict[str, List[GrafanaAPIPermission]]: + def get_users_permissions(self, rbac_is_enabled_for_org: bool) -> UserPermissionsDict: """ It is possible that this endpoint may not be available for certain Grafana orgs. Ex: for Grafana Cloud orgs whom have pinned their Grafana version to an earlier version @@ -141,11 +184,15 @@ class GrafanaAPIClient(APIClient): """ if not rbac_is_enabled_for_org: return {} - data, _ = self.api_get(self.USER_PERMISSION_ENDPOINT) - if data is None: + response, _ = self.api_get(self.USER_PERMISSION_ENDPOINT) + if response is None: + return {} + elif isinstance(response, list): return {} - all_users_permissions = {} + data: typing.Dict[str, typing.Dict[str, typing.List[str]]] = response + + all_users_permissions: UserPermissionsDict = {} for user_id, user_permissions in data.items(): all_users_permissions[user_id] = [GrafanaAPIPermission(action=key) for key, _ in user_permissions.items()] @@ -155,11 +202,15 @@ class GrafanaAPIClient(APIClient): _, resp_status = self.api_head(self.USER_PERMISSION_ENDPOINT) return resp_status["connected"] - def get_users(self, rbac_is_enabled_for_org: bool, **kwargs) -> List[GrafanaUserWithPermissions]: - users, _ = self.api_get("api/org/users", **kwargs) + def get_users(self, rbac_is_enabled_for_org: bool, **kwargs) -> GrafanaUsersWithPermissions: + users_response, _ = self.api_get("api/org/users", **kwargs) - if not users: + if not users_response: return [] + elif isinstance(users_response, dict): + return [] + + users: GrafanaUsersWithPermissions = users_response user_permissions = self.get_users_permissions(rbac_is_enabled_for_org) @@ -168,32 +219,32 @@ class GrafanaAPIClient(APIClient): user["permissions"] = user_permissions.get(str(user["userId"]), []) return users - def get_teams(self, **kwargs): + def get_teams(self, **kwargs) -> APIClientResponse: return self.api_get("api/teams/search?perpage=1000000", **kwargs) - def get_team_members(self, team_id): + def get_team_members(self, team_id: int) -> APIClientResponse: return self.api_get(f"api/teams/{team_id}/members") - def get_datasources(self): + def get_datasources(self) -> APIClientResponse: return self.api_get("api/datasources") - def get_datasource_by_id(self, datasource_id): + def get_datasource_by_id(self, datasource_id) -> APIClientResponse: # This endpoint is deprecated for Grafana version >= 9. Use get_datasource instead return self.api_get(f"api/datasources/{datasource_id}") - def get_datasource(self, datasource_uid): + def get_datasource(self, datasource_uid) -> APIClientResponse: return self.api_get(f"api/datasources/uid/{datasource_uid}") - def get_alertmanager_status_with_config(self, recipient): + def get_alertmanager_status_with_config(self, recipient) -> APIClientResponse: return self.api_get(f"api/alertmanager/{recipient}/api/v2/status") - def get_alerting_config(self, recipient): + def get_alerting_config(self, recipient: str) -> APIClientResponse: return self.api_get(f"api/alertmanager/{recipient}/config/api/v1/alerts") - def update_alerting_config(self, recipient, config): + def update_alerting_config(self, recipient, config) -> APIClientResponse: return self.api_post(f"api/alertmanager/{recipient}/config/api/v1/alerts", config) - def get_grafana_plugin_settings(self, recipient): + def get_grafana_plugin_settings(self, recipient: str) -> APIClientResponse: return self.api_get(f"api/plugins/{recipient}/settings") @@ -203,10 +254,12 @@ class GcomAPIClient(APIClient): STACK_STATUS_DELETED = "deleted" STACK_STATUS_ACTIVE = "active" - def __init__(self, api_token: str): + def __init__(self, api_token: str) -> None: super().__init__(settings.GRAFANA_COM_API_URL, api_token) - def get_instance_info(self, stack_id: str, include_config_query_param: bool = False) -> Optional[GCOMInstanceInfo]: + def get_instance_info( + self, stack_id: str, include_config_query_param: bool = False + ) -> typing.Optional[GCOMInstanceInfo]: """ NOTE: in order to use ?config=true, an "Admin" GCOM token must be used to make the API call """ @@ -222,7 +275,11 @@ class GcomAPIClient(APIClient): there are two ways that feature toggles can be enabled, this method takes into account both https://grafana.com/docs/grafana/latest/setup-grafana/configure-grafana/#enable """ - instance_feature_toggles = instance_info.get("config", {}).get("feature_toggles", {}) + instance_info_config = instance_info.get("config", {}) + if not instance_info_config: + return False + + instance_feature_toggles = instance_info_config.get("feature_toggles", {}) if not instance_feature_toggles: return False @@ -251,8 +308,8 @@ class GcomAPIClient(APIClient): instance_infos, _ = self.api_get(url) return instance_infos["items"] and instance_infos["items"][0].get("status") == self.STACK_STATUS_DELETED - def post_active_users(self, body): + def post_active_users(self, body) -> APIClientResponse: return self.api_post("app-active-users", body) - def get_stack_regions(self): + def get_stack_regions(self) -> APIClientResponse: return self.api_get("stack-regions") diff --git a/engine/apps/grafana_plugin/views/self_hosted_install.py b/engine/apps/grafana_plugin/views/self_hosted_install.py index 4ad2b580..6784f700 100644 --- a/engine/apps/grafana_plugin/views/self_hosted_install.py +++ b/engine/apps/grafana_plugin/views/self_hosted_install.py @@ -5,7 +5,7 @@ from rest_framework.response import Response from rest_framework.views import APIView from apps.grafana_plugin.helpers import GrafanaAPIClient -from apps.user_management.models.organization import Organization, ProvisionedPlugin +from apps.user_management.models.organization import Organization from apps.user_management.sync import sync_organization from common.api_helpers.mixins import GrafanaHeadersMixin @@ -23,7 +23,7 @@ class SelfHostedInstallView(GrafanaHeadersMixin, APIView): grafana_url = settings.SELF_HOSTED_SETTINGS["GRAFANA_API_URL"] grafana_api_token = self.instance_context["grafana_token"] - provisioning_info: ProvisionedPlugin = {"error": None} + provisioning_info = {"error": None} if settings.LICENSE != settings.OPEN_SOURCE_LICENSE_NAME: provisioning_info["error"] = f"License type not authorized" diff --git a/engine/apps/metrics_exporter/constants.py b/engine/apps/metrics_exporter/constants.py index 5d235987..66619b16 100644 --- a/engine/apps/metrics_exporter/constants.py +++ b/engine/apps/metrics_exporter/constants.py @@ -1,7 +1,6 @@ +import datetime import typing -from django.utils import timezone - class AlertGroupsTotalMetricsDict(typing.TypedDict): integration_name: str @@ -39,7 +38,7 @@ class RecalculateOrgMetricsDict(typing.TypedDict): ALERT_GROUPS_TOTAL = "oncall_alert_groups_total" ALERT_GROUPS_RESPONSE_TIME = "oncall_alert_groups_response_time_seconds" -METRICS_RESPONSE_TIME_CALCULATION_PERIOD = timezone.timedelta(days=7) +METRICS_RESPONSE_TIME_CALCULATION_PERIOD = datetime.timedelta(days=7) METRICS_CACHE_LIFETIME = 93600 # 26 hours. Should be higher than METRICS_RECALCULATE_CACHE_TIMEOUT diff --git a/engine/apps/metrics_exporter/helpers.py b/engine/apps/metrics_exporter/helpers.py index 443dd933..a8ebb066 100644 --- a/engine/apps/metrics_exporter/helpers.py +++ b/engine/apps/metrics_exporter/helpers.py @@ -1,3 +1,4 @@ +import datetime import random import typing @@ -20,6 +21,9 @@ from apps.metrics_exporter.constants import ( AlertGroupsTotalMetricsDict, ) +if typing.TYPE_CHECKING: + from apps.alerts.models import AlertReceiveChannel + def get_organization_ids_from_db(): AlertReceiveChannel = apps.get_model("alerts", "AlertReceiveChannel") @@ -42,12 +46,12 @@ def get_organization_ids(): return organizations_ids -def get_response_time_period(): +def get_response_time_period() -> datetime.datetime: """Returns period for response time calculation""" return timezone.now() - METRICS_RESPONSE_TIME_CALCULATION_PERIOD -def get_metrics_recalculation_timeout(): +def get_metrics_recalculation_timeout() -> int: """ Returns timeout when metrics should be recalculated. Add some dispersion to avoid starting recalculation tasks for all organizations at the same time. @@ -66,7 +70,7 @@ def get_metrics_cache_timeout(organization_id): return metrics_cache_timeout -def get_metrics_cache_timer_key(organization_id): +def get_metrics_cache_timer_key(organization_id) -> str: return f"{METRICS_CACHE_TIMER}_{organization_id}" @@ -75,15 +79,15 @@ def get_metrics_cache_timer_for_organization(organization_id): return cache.get(key) -def get_metric_alert_groups_total_key(organization_id): +def get_metric_alert_groups_total_key(organization_id) -> str: return f"{ALERT_GROUPS_TOTAL}_{organization_id}" -def get_metric_alert_groups_response_time_key(organization_id): +def get_metric_alert_groups_response_time_key(organization_id) -> str: return f"{ALERT_GROUPS_RESPONSE_TIME}_{organization_id}" -def metrics_update_integration_cache(integration): +def metrics_update_integration_cache(integration: "AlertReceiveChannel") -> None: """Update integration data in metrics cache""" metrics_cache_timeout = get_metrics_cache_timeout(integration.organization_id) metric_alert_groups_total_key = get_metric_alert_groups_total_key(integration.organization_id) @@ -105,7 +109,7 @@ def metrics_update_integration_cache(integration): cache.set(metric_key, metric_cache, timeout=metrics_cache_timeout) -def metrics_remove_deleted_integration_from_cache(integration): +def metrics_remove_deleted_integration_from_cache(integration: "AlertReceiveChannel"): """Remove data related to deleted integration from metrics cache""" metrics_cache_timeout = get_metrics_cache_timeout(integration.organization_id) metric_alert_groups_total_key = get_metric_alert_groups_total_key(integration.organization_id) @@ -118,7 +122,7 @@ def metrics_remove_deleted_integration_from_cache(integration): cache.set(metric_key, metric_cache, timeout=metrics_cache_timeout) -def metrics_add_integration_to_cache(integration): +def metrics_add_integration_to_cache(integration: "AlertReceiveChannel"): """Add new integration data to metrics cache""" metrics_cache_timeout = get_metrics_cache_timeout(integration.organization_id) metric_alert_groups_total_key = get_metric_alert_groups_total_key(integration.organization_id) diff --git a/engine/apps/oss_installation/models/cloud_connector.py b/engine/apps/oss_installation/models/cloud_connector.py index 3a828a51..3ea43808 100644 --- a/engine/apps/oss_installation/models/cloud_connector.py +++ b/engine/apps/oss_installation/models/cloud_connector.py @@ -1,4 +1,5 @@ import logging +import typing from urllib.parse import urljoin import requests @@ -51,7 +52,7 @@ class CloudConnector(models.Model): return sync_status, error_msg - def sync_users_with_cloud(self) -> tuple[bool, str]: + def sync_users_with_cloud(self) -> typing.Tuple[bool, typing.Optional[str]]: sync_status = False error_msg = None diff --git a/engine/apps/public_api/serializers/escalation_policies.py b/engine/apps/public_api/serializers/escalation_policies.py index 0193e5d3..ca7e9f14 100644 --- a/engine/apps/public_api/serializers/escalation_policies.py +++ b/engine/apps/public_api/serializers/escalation_policies.py @@ -276,7 +276,7 @@ class EscalationPolicyUpdateSerializer(EscalationPolicySerializer): type = EscalationPolicyTypeField(required=False, source="step", allow_null=True) class Meta(EscalationPolicySerializer.Meta): - read_only_fields = ("route_id",) + read_only_fields = ["route_id"] def update(self, instance, validated_data): if "step" in validated_data: diff --git a/engine/apps/public_api/serializers/routes.py b/engine/apps/public_api/serializers/routes.py index 07d2b398..1125b447 100644 --- a/engine/apps/public_api/serializers/routes.py +++ b/engine/apps/public_api/serializers/routes.py @@ -175,7 +175,7 @@ class ChannelFilterSerializer(BaseChannelFilterSerializer): "telegram", "manual_order", ] - read_only_fields = ("is_the_last_route",) + read_only_fields = ["is_the_last_route"] def create(self, validated_data): validated_data = self._correct_validated_data(validated_data) diff --git a/engine/apps/schedules/ical_utils.py b/engine/apps/schedules/ical_utils.py index 7cd5a627..39d2340d 100644 --- a/engine/apps/schedules/ical_utils.py +++ b/engine/apps/schedules/ical_utils.py @@ -13,6 +13,7 @@ from django.apps import apps from django.db.models import Q from django.utils import timezone from icalendar import Calendar +from icalendar import Event as IcalEvent from apps.api.permissions import RBACPermission from apps.schedules.constants import ( @@ -37,7 +38,8 @@ This is a hack to allow us to load models for type checking without circular dep This module likely needs to refactored to be part of the OnCallSchedule module. """ if TYPE_CHECKING: - from apps.schedules.models import OnCallSchedule + from apps.schedules.models import CustomOnCallShift, OnCallSchedule + from apps.schedules.models.on_call_schedule import OnCallScheduleQuerySet from apps.user_management.models import Organization, User from apps.user_management.models.user import UserQuerySet @@ -45,14 +47,26 @@ logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) +EmptyShift = namedtuple( + "EmptyShift", + ["start", "end", "summary", "description", "attendee", "all_day", "calendar_type", "calendar_tz", "shift_pk"], +) +EmptyShifts = typing.List[EmptyShift] + +DatetimeInterval = namedtuple("DatetimeInterval", ["start", "end"]) +DatetimeIntervals = typing.List[DatetimeInterval] + +IcalEvents = typing.List[IcalEvent] + + def users_in_ical( usernames_from_ical: typing.List[str], - organization: Organization, + organization: "Organization", include_viewers=False, - users_to_filter: typing.Optional[UserQuerySet] = None, -) -> UserQuerySet: + users_to_filter: typing.Optional["UserQuerySet"] = None, +) -> typing.Sequence["User"]: """ - This method returns a `UserQuerySet`, filtered by users whose username, or case-insensitive e-mail, + This method returns a sequence of `User` objects, filtered by users whose username, or case-insensitive e-mail, is present in `usernames_from_ical`. If `include_viewers` is set to `True`, users are further filtered down based on their granted permissions. @@ -95,21 +109,23 @@ def users_in_ical( @timed_lru_cache(timeout=100) -def memoized_users_in_ical(usernames_from_ical: typing.List[str], organization: Organization) -> UserQuerySet: +def memoized_users_in_ical( + usernames_from_ical: typing.List[str], organization: "Organization" +) -> typing.Sequence["User"]: # using in-memory cache instead of redis to avoid pickling python objects return users_in_ical(usernames_from_ical, organization) # used for display schedule events on web def list_of_oncall_shifts_from_ical( - schedule, - date, - user_timezone="UTC", - with_empty_shifts=False, - with_gaps=False, - days=1, - filter_by=None, - from_cached_final=False, + schedule: "OnCallSchedule", + date: datetime.date, + user_timezone: str = "UTC", + with_empty_shifts: bool = False, + with_gaps: bool = False, + days: int = 1, + filter_by: str | None = None, + from_cached_final: bool = False, ): """ Parse the ical file and return list of events with users @@ -130,14 +146,19 @@ def list_of_oncall_shifts_from_ical( # get list of iCalendars from current iCal files. If there is more than one calendar, primary calendar will always # be the first + calendars: typing.Tuple[typing.Optional[Calendar], ...] + if from_cached_final: - calendars = [Calendar.from_ical(schedule.cached_ical_final_schedule)] + calendars = (Calendar.from_ical(schedule.cached_ical_final_schedule),) else: calendars = schedule.get_icalendars() # TODO: Review offset usage pytz_tz = pytz.timezone(user_timezone) - user_timezone_offset = datetime.datetime.now().astimezone(pytz_tz).utcoffset() + + # utcoffset can technically return None, but we're confident it is a timedelta here + user_timezone_offset: datetime.timedelta = datetime.datetime.now().astimezone(pytz_tz).utcoffset() # type: ignore[assignment] + datetime_min = datetime.datetime.combine(date, datetime.time.min) + datetime.timedelta(milliseconds=1) datetime_start = (datetime_min - user_timezone_offset).astimezone(pytz.UTC) datetime_end = datetime_start + datetime.timedelta(days=days - 1, hours=23, minutes=59, seconds=59) @@ -147,6 +168,8 @@ def list_of_oncall_shifts_from_ical( for idx, calendar in enumerate(calendars): if calendar is not None: + calendar_type: str | int + if from_cached_final: calendar_type = CALENDAR_TYPE_FINAL elif idx == 0: @@ -193,7 +216,14 @@ def list_of_oncall_shifts_from_ical( return result or None -def get_shifts_dict(calendar, calendar_type, schedule, datetime_start, datetime_end, with_empty_shifts=False): +def get_shifts_dict( + calendar: Calendar, + calendar_type: str | int, + schedule: "OnCallSchedule", + datetime_start: datetime.datetime, + datetime_end: datetime.datetime, + with_empty_shifts: bool = False, +): events = ical_events.get_events_from_ical_between(calendar, datetime_start, datetime_end) result_datetime = [] result_date = [] @@ -244,22 +274,15 @@ def get_shifts_dict(calendar, calendar_type, schedule, datetime_start, datetime_ return result_datetime, result_date -EmptyShift = namedtuple( - "EmptyShift", - ["start", "end", "summary", "description", "attendee", "all_day", "calendar_type", "calendar_tz", "shift_pk"], -) - - -def list_of_empty_shifts_in_schedule(schedule, start_date, end_date): - """ - Parse the ical file and return list of EmptyShift. - """ +def list_of_empty_shifts_in_schedule( + schedule: "OnCallSchedule", start_date: datetime.date, end_date: datetime.date +) -> EmptyShifts: # Calculate lookup window in schedule's tz # If we can't get tz from ical use UTC OnCallSchedule = apps.get_model("schedules", "OnCallSchedule") calendars = schedule.get_icalendars() - empty_shifts = [] + empty_shifts: EmptyShifts = [] for idx, calendar in enumerate(calendars): if calendar is not None: if idx == 0: @@ -269,7 +292,9 @@ def list_of_empty_shifts_in_schedule(schedule, start_date, end_date): calendar_tz = get_icalendar_tz_or_utc(calendar) - schedule_timezone_offset = datetime.datetime.now().astimezone(calendar_tz).utcoffset() + # utcoffset can technically return None, but we're confident it is a timedelta here + schedule_timezone_offset: datetime.timedelta = datetime.datetime.now().astimezone(calendar_tz).utcoffset() # type: ignore[assignment] + start_datetime = datetime.datetime.combine(start_date, datetime.time.min) + datetime.timedelta( milliseconds=1 ) @@ -322,8 +347,11 @@ def list_of_empty_shifts_in_schedule(schedule, start_date, end_date): def list_users_to_notify_from_ical( - schedule, events_datetime=None, include_viewers=False, users_to_filter=None -) -> UserQuerySet: + schedule: "OnCallSchedule", + events_datetime: typing.Optional[datetime.datetime] = None, + include_viewers: bool = False, + users_to_filter: typing.Optional["UserQuerySet"] = None, +) -> typing.Sequence["User"]: """ Retrieve on-call users for the current time """ @@ -338,24 +366,25 @@ def list_users_to_notify_from_ical( def list_users_to_notify_from_ical_for_period( - schedule, - start_datetime, - end_datetime, + schedule: "OnCallSchedule", + start_datetime: datetime.datetime, + end_datetime: datetime.datetime, include_viewers=False, users_to_filter=None, -) -> UserQuerySet: +) -> typing.Sequence["User"]: # get list of iCalendars from current iCal files. If there is more than one calendar, primary calendar will always # be the first calendars = schedule.get_icalendars() # reverse calendars to make overrides calendar the first, if schedule is iCal calendars = calendars[::-1] - users_found_in_ical = [] + users_found_in_ical: typing.Sequence["User"] = [] # at first check overrides calendar and return users from it if it exists and on-call users are found for calendar in calendars: if calendar is None: continue events = ical_events.get_events_from_ical_between(calendar, start_datetime, end_datetime) - parsed_ical_events = {} # event info where key is event priority and value list of found usernames {0:["alex"]} + + parsed_ical_events: typing.Dict[int, typing.List[str]] = {} for event in events: current_usernames, current_priority = get_usernames_from_ical_event(event) parsed_ical_events.setdefault(current_priority, []).extend(current_usernames) @@ -373,8 +402,8 @@ def list_users_to_notify_from_ical_for_period( def get_oncall_users_for_multiple_schedules( - schedules, events_datetime=None -) -> typing.Dict[OnCallSchedule, typing.List[User]]: + schedules: "OnCallScheduleQuerySet", events_datetime=None +) -> typing.Dict["OnCallSchedule", typing.List[User]]: from apps.user_management.models import User if events_datetime is None: @@ -418,7 +447,7 @@ def get_oncall_users_for_multiple_schedules( return oncall_users -def parse_username_from_string(string): +def parse_username_from_string(string: str) -> str: """ Parse on-call shift user from the given string Example input: @@ -429,7 +458,7 @@ def parse_username_from_string(string): return re.sub(RE_PRIORITY, "", string.strip(), 1).strip() -def parse_priority_from_string(string): +def parse_priority_from_string(string: str) -> int: """ Parse on-call shift priority from the given string Example input: @@ -437,17 +466,16 @@ def parse_priority_from_string(string): Example output: 1 """ - priority = re.findall(RE_PRIORITY, string.strip()) - if len(priority) > 0: - priority = int(priority[0]) + priority = 0 + priority_matches = re.findall(RE_PRIORITY, string.strip()) + if len(priority_matches) > 0: + priority = int(priority_matches[0]) if priority < 1: priority = 0 - else: - priority = 0 return priority -def parse_event_uid(string): +def parse_event_uid(string: str): pk = None source = None source_verbal = None @@ -467,8 +495,8 @@ def parse_event_uid(string): if source is not None: source = int(source) - CustomOnCallShift = apps.get_model("schedules", "CustomOnCallShift") - source_verbal = CustomOnCallShift.SOURCE_CHOICES[source][1] + OnCallShift: "CustomOnCallShift" = apps.get_model("schedules", "CustomOnCallShift") + source_verbal = OnCallShift.SOURCE_CHOICES[source][1] return pk, source_verbal @@ -489,7 +517,7 @@ def get_usernames_from_ical_event(event): return usernames_found, priority -def get_missing_users_from_ical_event(event, organization): +def get_missing_users_from_ical_event(event, organization: "Organization"): all_usernames, _ = get_usernames_from_ical_event(event) users = list(get_users_from_ical_event(event, organization)) found_usernames = [u.username for u in users] @@ -497,7 +525,7 @@ def get_missing_users_from_ical_event(event, organization): return [u for u in all_usernames if u != "" and u not in found_usernames and u.lower() not in found_emails] -def get_users_from_ical_event(event, organization): +def get_users_from_ical_event(event, organization: "Organization") -> typing.Sequence["User"]: usernames_from_ical, _ = get_usernames_from_ical_event(event) users = [] if len(usernames_from_ical) != 0: @@ -587,9 +615,9 @@ def get_icalendar_tz_or_utc(icalendar): return pytz.timezone(converted_timezone) -def fetch_ical_file_or_get_error(ical_url): - cached_ical_file = None - ical_file_error = None +def fetch_ical_file_or_get_error(ical_url: str) -> typing.Tuple[str | None, str | None]: + cached_ical_file: str | None = None + ical_file_error: str | None = None try: new_ical_file = fetch_ical_file(ical_url) Calendar.from_ical(new_ical_file) @@ -602,13 +630,12 @@ def fetch_ical_file_or_get_error(ical_url): return cached_ical_file, ical_file_error -def fetch_ical_file(ical_url): +def fetch_ical_file(ical_url: str) -> str: # without user-agent header google calendar sometimes returns text/html instead of text/calendar headers = {"User-Agent": "Grafana OnCall"} r = requests.get(ical_url, headers=headers, timeout=10) logger.info(f"fetch_ical_file: content-type={r.headers.get('Content-Type')}") - ical_file = r.text - return ical_file + return r.text def create_base_icalendar(name: str) -> Calendar: @@ -624,77 +651,56 @@ def create_base_icalendar(name: str) -> Calendar: return cal -def get_events_from_calendars(ical_obj: Calendar, calendars: tuple) -> None: - for calendar in calendars: - if calendar: - for component in calendar.walk(): - if component.name == "VEVENT": +def get_user_events_from_calendars( + ical_obj: Calendar, calendar: Calendar, user: User, name: typing.Optional[str] = None +) -> None: + if calendar: + for component in calendar.walk(): + if component.name == "VEVENT": + event_user = get_usernames_from_ical_event(component) + event_user_value = event_user[0][0] + if event_user_value == user.username or event_user_value.lower() == user.email.lower(): + if name: + component["SUMMARY"] = "{}: {}".format(name, component["SUMMARY"]) ical_obj.add_component(component) -def get_user_events_from_calendars(ical_obj: Calendar, calendars: tuple, user: User, name: str = None) -> None: - for calendar in calendars: - if calendar: - for component in calendar.walk(): - if component.name == "VEVENT": - event_user = get_usernames_from_ical_event(component) - event_user_value = event_user[0][0] - if event_user_value == user.username or event_user_value.lower() == user.email.lower(): - if name: - component["SUMMARY"] = "{}: {}".format(name, component["SUMMARY"]) - ical_obj.add_component(component) - - -def _is_final_export_enabled(schedule: OnCallSchedule) -> bool: - DynamicSetting = apps.get_model("base", "DynamicSetting") - enabled_final_export = DynamicSetting.objects.get_or_create( - name="enabled_final_schedule_export", - defaults={ - "json_value": { - "schedule_ids": [], - } - }, - )[0] - return schedule.public_primary_key in enabled_final_export.json_value["schedule_ids"] - - -def _get_ical_data_final_schedule(schedule: OnCallSchedule) -> str: +def _get_ical_data_final_schedule(schedule: "OnCallSchedule") -> str | None: ical_data = schedule.cached_ical_final_schedule if ical_data is None: schedule.refresh_ical_final_schedule() - ical_data = schedule.cached_ical_final_schedule + # typing is safe here. cached_ical_final_schedule is updated inside of refresh_ical_final_schedule + ical_data: str = schedule.cached_ical_final_schedule return ical_data -def ical_export_from_schedule(schedule: OnCallSchedule) -> bytes: +def ical_export_from_schedule(schedule: "OnCallSchedule") -> bytes: ical_data = _get_ical_data_final_schedule(schedule) return ical_data.encode() -def user_ical_export(user: User, schedules: list[OnCallSchedule]) -> bytes: +def user_ical_export(user: "User", schedules: "OnCallScheduleQuerySet") -> bytes: schedule_name = "On-Call Schedule for {0}".format(user.username) ical_obj = create_base_icalendar(schedule_name) for schedule in schedules: name = schedule.name ical_data = _get_ical_data_final_schedule(schedule) - calendars = [Calendar.from_ical(ical_data)] - get_user_events_from_calendars(ical_obj, calendars, user, name=name) + get_user_events_from_calendars(ical_obj, Calendar.from_ical(ical_data), user, name=name) return ical_obj.to_ical() -DatetimeInterval = namedtuple("DatetimeInterval", ["start", "end"]) - - -def list_of_gaps_in_schedule(schedule, start_date, end_date): +def list_of_gaps_in_schedule( + schedule: "OnCallSchedule", start_date: datetime.date, end_date: datetime.date +) -> DatetimeIntervals: calendars = schedule.get_icalendars() - intervals = [] + intervals: DatetimeIntervals = [] start_datetime = datetime.datetime.combine(start_date, datetime.time.min) + datetime.timedelta(milliseconds=1) start_datetime = start_datetime.astimezone(pytz.UTC) end_datetime = datetime.datetime.combine(end_date, datetime.time.max).astimezone(pytz.UTC) - for idx, calendar in enumerate(calendars): + for calendar in calendars: if calendar is not None: calendar_tz = get_icalendar_tz_or_utc(calendar) events = ical_events.get_events_from_ical_between( @@ -708,8 +714,8 @@ def list_of_gaps_in_schedule(schedule, start_date, end_date): return detect_gaps(intervals, start_datetime, end_datetime) -def detect_gaps(intervals, start, end): - gaps = [] +def detect_gaps(intervals: DatetimeIntervals, start: datetime.datetime, end: datetime.datetime) -> DatetimeIntervals: + gaps: DatetimeIntervals = [] intervals = sorted(intervals, key=lambda dt: dt.start) if len(intervals) > 0: base_interval = intervals[0] @@ -725,7 +731,7 @@ def detect_gaps(intervals, start, end): return gaps -def merge_if_overlaps(a: DatetimeInterval, b: DatetimeInterval): +def merge_if_overlaps(a: DatetimeInterval, b: DatetimeInterval) -> typing.Tuple[bool, DatetimeInterval]: if a.end >= b.end: return True, DatetimeInterval(a.start, a.end) if b.start - a.end < datetime.timedelta(minutes=1): @@ -734,13 +740,13 @@ def merge_if_overlaps(a: DatetimeInterval, b: DatetimeInterval): return False, DatetimeInterval(b.start, b.end) -def start_end_with_respect_to_all_day(event, calendar_tz): +def start_end_with_respect_to_all_day(event: IcalEvent, calendar_tz): start, _ = ical_date_to_datetime(event[ICAL_DATETIME_START].dt, calendar_tz, start=True) end, _ = ical_date_to_datetime(event[ICAL_DATETIME_END].dt, calendar_tz, start=False) return start, end -def event_start_end_all_day_with_respect_to_type(event, calendar_tz): +def event_start_end_all_day_with_respect_to_type(event: IcalEvent, calendar_tz): all_day = False if type(event[ICAL_DATETIME_START].dt) == datetime.date: start, end = start_end_with_respect_to_all_day(event, calendar_tz) @@ -750,7 +756,7 @@ def event_start_end_all_day_with_respect_to_type(event, calendar_tz): return start, end, all_day -def convert_windows_timezone_to_iana(tz_name): +def convert_windows_timezone_to_iana(tz_name: str) -> str | None: """ Conversion info taken from https://raw.githubusercontent.com/unicode-org/cldr/main/common/supplemental/windowsZones.xml Also see https://gist.github.com/mrled/8d29fde758cfc7dd0b52f3bbf2b8f06e diff --git a/engine/apps/schedules/models/on_call_schedule.py b/engine/apps/schedules/models/on_call_schedule.py index 043a0e21..f759811f 100644 --- a/engine/apps/schedules/models/on_call_schedule.py +++ b/engine/apps/schedules/models/on_call_schedule.py @@ -67,10 +67,14 @@ class QualityReportOverloadedUser(typing.TypedDict): score: int +QualityReportOverloadedUsers = typing.List[QualityReportOverloadedUser] +QualityReportComments = typing.List[QualityReportComment] + + class QualityReport(typing.TypedDict): total_score: int - comments: typing.List[QualityReportComment] - overloaded_users: typing.List[QualityReportOverloadedUser] + comments: QualityReportComments + overloaded_users: QualityReportOverloadedUsers class ScheduleEventUser(typing.TypedDict): @@ -89,9 +93,9 @@ class ScheduleEvent(typing.TypedDict): end: datetime.datetime users: typing.List[ScheduleEventUser] missing_users: typing.List[str] - priority_level: typing.Union[int, None] - source: typing.Union[str, None] - calendar_type: typing.Union[int, None] + priority_level: typing.Optional[int] + source: typing.Optional[str] + calendar_type: typing.Optional[int] is_empty: bool is_gap: bool is_override: bool @@ -109,6 +113,7 @@ class ScheduleFinalShift(typing.TypedDict): ScheduleEvents = typing.List[ScheduleEvent] ScheduleEventIntervals = typing.List[typing.List[datetime.datetime]] ScheduleFinalShifts = typing.List[ScheduleFinalShift] +DurationMap = typing.Dict[str, datetime.timedelta] def generate_public_primary_key_for_oncall_schedule_channel(): @@ -217,14 +222,14 @@ class OnCallSchedule(PolymorphicModel): has_empty_shifts = models.BooleanField(default=False) empty_shifts_report_sent_at = models.DateField(null=True, default=None) - def get_icalendars(self): + def get_icalendars(self) -> typing.Tuple[typing.Optional[icalendar.Calendar], typing.Optional[icalendar.Calendar]]: """Returns list of calendars. Primary calendar should always be the first""" - calendar_primary = None - calendar_overrides = None + calendar_primary: typing.Optional[icalendar.Calendar] = None + calendar_overrides: typing.Optional[icalendar.Calendar] = None # if self._ical_file_(primary|overrides) is None -> no cache, will trigger a refresh # if self._ical_file_(primary|overrides) == "" -> cached value for an empty schedule if self._ical_file_primary: - calendar_primary = icalendar.Calendar.from_ical(self._ical_file_primary) + calendar_primary: icalendar.Calendar = icalendar.Calendar.from_ical(self._ical_file_primary) if self._ical_file_overrides: calendar_overrides = icalendar.Calendar.from_ical(self._ical_file_overrides) return calendar_primary, calendar_overrides @@ -260,9 +265,11 @@ class OnCallSchedule(PolymorphicModel): self._refresh_primary_ical_file() self._refresh_overrides_ical_file() + @property def _ical_file_primary(self): raise NotImplementedError + @property def _ical_file_overrides(self): raise NotImplementedError @@ -468,7 +475,7 @@ class OnCallSchedule(PolymorphicModel): events = self.final_events(user_tz="UTC", starting_date=date, days=days) # an event is “good” if it's not a gap and not empty - good_events = [event for event in events if not event["is_gap"] and not event["is_empty"]] + good_events: ScheduleEvents = [event for event in events if not event["is_gap"] and not event["is_empty"]] if not good_events: return { "total_score": 0, @@ -476,7 +483,7 @@ class OnCallSchedule(PolymorphicModel): "overloaded_users": [], } - def event_duration(ev: dict) -> datetime.timedelta: + def event_duration(ev: ScheduleEvent) -> datetime.timedelta: return ev["end"] - ev["start"] def timedelta_sum(deltas: typing.Iterable[datetime.timedelta]) -> datetime.timedelta: @@ -485,9 +492,9 @@ class OnCallSchedule(PolymorphicModel): def score_to_percent(value: float) -> int: return round(value * 100) - def get_duration_map(evs: list[dict]) -> dict[str, datetime.timedelta]: + def get_duration_map(evs: ScheduleEvents) -> DurationMap: """Return a map of user PKs to total duration of events they are in.""" - result = defaultdict(datetime.timedelta) + result: DurationMap = defaultdict(datetime.timedelta) for ev in evs: for user in ev["users"]: user_pk = user["pk"] @@ -495,7 +502,7 @@ class OnCallSchedule(PolymorphicModel): return result - def get_balance_score_by_duration_map(dur_map: dict[str, datetime.timedelta]) -> float: + def get_balance_score_by_duration_map(dur_map: DurationMap) -> float: """ Return a score between 0 and 1, based on how balanced the durations are in the duration map. The formula is taken from https://github.com/grafana/oncall/issues/118#issuecomment-1161787854. @@ -503,7 +510,7 @@ class OnCallSchedule(PolymorphicModel): if len(dur_map) <= 1: return 1 - result = 0 + result = 0.0 for key_1, key_2 in itertools.combinations(dur_map, 2): duration_1 = dur_map[key_1] duration_2 = dur_map[key_2] @@ -524,9 +531,10 @@ class OnCallSchedule(PolymorphicModel): balance_score = score_to_percent(balance_score) # calculate overloaded users + overloaded_users: QualityReportOverloadedUsers = [] + if balance_score >= 95: # tolerate minor imbalance balance_score = 100 - overloaded_users = [] else: average_duration = timedelta_sum(duration_map.values()) / len(duration_map) overloaded_user_pks = [ @@ -540,7 +548,6 @@ class OnCallSchedule(PolymorphicModel): "public_primary_key", "username" ) } - overloaded_users = [] for user_pk in overloaded_user_pks: score = score_to_percent(duration_map[user_pk] / average_duration) - 100 username = usernames.get(user_pk) or "unknown" # fallback to "unknown" if user is not found @@ -550,7 +557,7 @@ class OnCallSchedule(PolymorphicModel): overloaded_users.sort(key=lambda u: (-u["score"], u["username"])) # generate comments regarding gaps - comments = [] + comments: QualityReportComments = [] if good_event_score == 100: comments.append({"type": QualityReportCommentType.INFO, "text": "Schedule has no gaps"}) else: @@ -628,8 +635,8 @@ class OnCallSchedule(PolymorphicModel): resolved: ScheduleEvents = [] pending: ScheduleEvents = events current_interval_idx = 0 # current scheduled interval being checked - current_type = OnCallSchedule.TYPE_ICAL_OVERRIDES # current calendar type - current_priority = None # current priority level being resolved + current_type: typing.Optional[int] = OnCallSchedule.TYPE_ICAL_OVERRIDES # current calendar type + current_priority: typing.Optional[int] = None # current priority level being resolved while pending: ev = pending.pop(0) diff --git a/engine/apps/slack/constants.py b/engine/apps/slack/constants.py index 34f7a3c6..398acaf8 100644 --- a/engine/apps/slack/constants.py +++ b/engine/apps/slack/constants.py @@ -1,4 +1,4 @@ -from django.utils import timezone +import datetime SLACK_BOT_ID = "USLACKBOT" SLACK_INVALID_AUTH_RESPONSE = "no_enough_permissions_to_retrieve" @@ -6,7 +6,7 @@ PLACEHOLDER = "Placeholder" SLACK_WRONG_TEAM_NAMES = [SLACK_INVALID_AUTH_RESPONSE, PLACEHOLDER] -SLACK_RATE_LIMIT_TIMEOUT = timezone.timedelta(minutes=5) +SLACK_RATE_LIMIT_TIMEOUT = datetime.timedelta(minutes=5) SLACK_RATE_LIMIT_DELAY = 10 CACHE_UPDATE_INCIDENT_SLACK_MESSAGE_LIFETIME = 60 * 10 diff --git a/engine/apps/slack/scenarios/step_mixins.py b/engine/apps/slack/scenarios/step_mixins.py index cf677001..6cd7ca86 100644 --- a/engine/apps/slack/scenarios/step_mixins.py +++ b/engine/apps/slack/scenarios/step_mixins.py @@ -4,6 +4,7 @@ import logging from apps.alerts.models import AlertGroup from apps.api.permissions import user_is_authorized from apps.slack.models import SlackMessage, SlackTeamIdentity +from apps.user_management.models import User logger = logging.getLogger(__name__) @@ -13,6 +14,8 @@ class AlertGroupActionsMixin: Mixin for alert group actions (ack, resolve, etc.). Intended to be used as a mixin along with ScenarioStep. """ + user: User | None + REQUIRED_PERMISSIONS = [] def get_alert_group(self, slack_team_identity: SlackTeamIdentity, payload: dict) -> AlertGroup: diff --git a/engine/apps/slack/slack_formatter.py b/engine/apps/slack/slack_formatter.py index 706e4ee8..c2e3e8b4 100644 --- a/engine/apps/slack/slack_formatter.py +++ b/engine/apps/slack/slack_formatter.py @@ -2,10 +2,10 @@ import re import emoji from django.apps import apps -from slackviewer.formatter import SlackFormatter +from slackviewer.formatter import SlackFormatter as SlackFormatterBase -class SlackFormatter(SlackFormatter): +class SlackFormatter(SlackFormatterBase): _LINK_PAT = re.compile(r"<(https|http|mailto):[A-Za-z0-9_\.\-\/\?\,\=\#\:\@\& ]+\|[^>]+>") def __init__(self, organization): diff --git a/engine/apps/telegram/client.py b/engine/apps/telegram/client.py index b5733bca..33e70072 100644 --- a/engine/apps/telegram/client.py +++ b/engine/apps/telegram/client.py @@ -100,8 +100,8 @@ class TelegramClient: message_id: Union[int, str], text: str, keyboard: Optional[InlineKeyboardMarkup] = None, - ) -> Message: - message = self.api_client.edit_message_text( + ) -> Union[Message, bool]: + return self.api_client.edit_message_text( chat_id=chat_id, message_id=message_id, text=text, @@ -109,7 +109,6 @@ class TelegramClient: parse_mode=self.PARSE_MODE, disable_web_page_preview=False, ) - return message @staticmethod def _get_message_and_keyboard( diff --git a/engine/apps/user_management/models/organization.py b/engine/apps/user_management/models/organization.py index 151bfb34..749d5a5d 100644 --- a/engine/apps/user_management/models/organization.py +++ b/engine/apps/user_management/models/organization.py @@ -18,6 +18,12 @@ from common.insight_log import ChatOpsEvent, ChatOpsTypePlug, write_chatops_insi from common.oncall_gateway import create_oncall_connector, delete_oncall_connector, delete_slack_connector from common.public_primary_keys import generate_public_primary_key, increase_public_primary_key_length +if typing.TYPE_CHECKING: + from django.db.models.manager import RelatedManager + + from apps.schedules.models import OnCallSchedule + from apps.user_management.models import User + logger = logging.getLogger(__name__) @@ -36,7 +42,6 @@ def generate_public_primary_key_for_organization(): class ProvisionedPlugin(typing.TypedDict): - error: typing.Union[str, None] stackId: int orgId: int onCallToken: str @@ -64,6 +69,8 @@ class OrganizationManager(models.Manager): class Organization(MaintainableObject): + users: "RelatedManager['User']" + oncall_schedules: "RelatedManager['OnCallSchedule']" objects = OrganizationManager() objects_with_deleted = models.Manager() diff --git a/engine/apps/user_management/models/user.py b/engine/apps/user_management/models/user.py index fbcdb96c..778291dd 100644 --- a/engine/apps/user_management/models/user.py +++ b/engine/apps/user_management/models/user.py @@ -313,7 +313,7 @@ class User(models.Model): # TODO: check whether this signal can be moved to save method of the model @receiver(post_save, sender=User) -def listen_for_user_model_save(sender, instance, created, *args, **kwargs): +def listen_for_user_model_save(sender: User, instance: User, created: bool, *args, **kwargs) -> None: if created: instance.notification_policies.create_default_policies_for_user(instance) instance.notification_policies.create_important_policies_for_user(instance) diff --git a/engine/common/admin.py b/engine/common/admin.py index 70b1c758..8077a89c 100644 --- a/engine/common/admin.py +++ b/engine/common/admin.py @@ -1,3 +1,5 @@ +import typing + from django.contrib import admin from django.core.exceptions import FieldDoesNotExist from django.db.models import ForeignKey, Model @@ -7,7 +9,7 @@ class RawForeignKeysMixin: model: Model @property - def raw_id_fields(self) -> tuple[str]: + def raw_id_fields(self) -> typing.Tuple[str, ...]: fields = self.model._meta.fields fk_field_names = tuple(str(field.name) for field in fields if isinstance(field, ForeignKey)) @@ -18,13 +20,13 @@ class SearchableByIdsMixin: model: Model @property - def search_fields(self) -> tuple[str]: + def search_fields(self) -> typing.Tuple[str, ...]: search_fields = ( "id", "public_primary_key", ) - existing_fields = [] + existing_fields: typing.List[str] = [] for field in search_fields: try: @@ -39,10 +41,10 @@ class SearchableByIdsMixin: class SelectRelatedMixin: model: Model - list_display: tuple[str] + list_display: typing.Tuple[str, ...] @property - def list_select_related(self) -> tuple[str]: + def list_select_related(self) -> typing.Tuple[str, ...]: fk_field_names = [] for field_name in self.list_display: diff --git a/engine/common/api_helpers/mixins.py b/engine/common/api_helpers/mixins.py index 78055985..0e41111d 100644 --- a/engine/common/api_helpers/mixins.py +++ b/engine/common/api_helpers/mixins.py @@ -1,5 +1,6 @@ import json import math +import typing from django.core.exceptions import ObjectDoesNotExist from django.db.models import Q @@ -7,6 +8,7 @@ from django.utils.functional import cached_property from rest_framework import status from rest_framework.decorators import action from rest_framework.exceptions import NotFound, Throttled +from rest_framework.request import Request from rest_framework.response import Response from apps.alerts.incident_appearance.templaters import ( @@ -377,11 +379,25 @@ class PreviewTemplateMixin: return destination, attr_name +class GrafanaContext(typing.TypedDict): + IsAnonymous: bool + + +class InstanceContext(typing.TypedDict): + stack_id: int + org_id: int + grafana_token: str + + class GrafanaHeadersMixin: - @cached_property - def grafana_context(self) -> dict: - return json.loads(self.request.headers.get("X-Grafana-Context")) + request: Request @cached_property - def instance_context(self) -> dict: - return json.loads(self.request.headers["X-Instance-Context"]) + def grafana_context(self) -> GrafanaContext: + grafana_context: GrafanaContext = json.loads(self.request.headers["X-Grafana-Context"]) + return grafana_context + + @cached_property + def instance_context(self) -> InstanceContext: + instance_context: InstanceContext = json.loads(self.request.headers["X-Instance-Context"]) + return instance_context diff --git a/engine/common/insight_log/resource_insight_logs.py b/engine/common/insight_log/resource_insight_logs.py index 7f5361fa..1af73388 100644 --- a/engine/common/insight_log/resource_insight_logs.py +++ b/engine/common/insight_log/resource_insight_logs.py @@ -19,7 +19,12 @@ class EntityEvent(enum.Enum): class InsightLoggable(ABC): @property @abstractmethod - def public_primary_key(self): + def id(self) -> int: + pass + + @property + @abstractmethod + def public_primary_key(self) -> str: pass @property @@ -65,7 +70,7 @@ def write_resource_insight_log(instance: InsightLoggable, author, event: EntityE author = json.dumps(author.username) entity_type = instance.insight_logs_type_verbal try: - entity_id = instance.public_primary_key + entity_id: str | int = instance.public_primary_key except AttributeError: # Fallback for entities which have no public_primary_key, E.g. public api token, schedule export token entity_id = instance.id diff --git a/engine/engine/management/commands/setup_end_to_end_test.py b/engine/engine/management/commands/setup_end_to_end_test.py deleted file mode 100644 index d1bd0fb8..00000000 --- a/engine/engine/management/commands/setup_end_to_end_test.py +++ /dev/null @@ -1,52 +0,0 @@ -from django.core.management import BaseCommand -from django.db.models.signals import post_save -from django.urls import reverse - -from apps.alerts.models import Alert, AlertGroup, AlertReceiveChannel, listen_for_alertreceivechannel_model_save -from apps.alerts.tests.factories import AlertReceiveChannelFactory -from apps.user_management.tests.factories import OrganizationFactory - - -class Command(BaseCommand): - def add_arguments(self, parser): - group = parser.add_mutually_exclusive_group(required=True) - group.add_argument( - "--bootstrap_integration", - action="store_true", - help="Create random formatted webhook integration", - ) - - group.add_argument( - "--return_results_for_test_id", - type=str, - help="Count alert groups with specific text in the title and their alerts", - ) - - def handle(self, *args, **options): - if options["bootstrap_integration"]: - organization = OrganizationFactory() - - def _make_alert_receive_channel(organization, **kwargs): - if "integration" not in kwargs: - kwargs["integration"] = "formatted_webhook" - post_save.disconnect(listen_for_alertreceivechannel_model_save, sender=AlertReceiveChannel) - alert_receive_channel = AlertReceiveChannelFactory(organization=organization, **kwargs) - post_save.connect(listen_for_alertreceivechannel_model_save, sender=AlertReceiveChannel) - return alert_receive_channel - - integration = _make_alert_receive_channel( - organization, integration=AlertReceiveChannel.INTEGRATION_FORMATTED_WEBHOOK - ) - url = reverse( - "integrations:universal", - kwargs={ - "integration_type": AlertReceiveChannel.INTEGRATION_FORMATTED_WEBHOOK, - "alert_channel_key": integration.token, - }, - ) - return url - elif test_id := options["return_results_for_test_id"]: - alert_groups_pks = list(AlertGroup.all_objects.filter(web_title_cache=test_id).values_list("id", flat=True)) - alert_groups_count = len(alert_groups_pks) - alerts_count = Alert.objects.filter(group_id__in=alert_groups_pks).count() - return f"{alert_groups_count}, {alerts_count}" diff --git a/engine/pyproject.toml b/engine/pyproject.toml index 7d2415ad..12c0cabd 100644 --- a/engine/pyproject.toml +++ b/engine/pyproject.toml @@ -12,6 +12,7 @@ target-version = ["py39"] force-exclude = "migrations" [tool.mypy] +mypy_path = "$MYPY_CONFIG_FILE_DIR/type_stubs" implicit_reexport = true plugins = [ "mypy_django_plugin.main", @@ -39,7 +40,7 @@ module = [ "fcm_django.*", "firebase_admin.*", "humanize.*", - "icalendar.*", + "ipware.*", "markdown2.*", "mirage.*", "ordered_model.*", @@ -50,6 +51,7 @@ module = [ "recurring_ical_events.*", "rest_polymorphic.*", "slackclient.*", + "slackviewer.*", "social_core.*", "social_django.*", "twilio.*", diff --git a/engine/requirements-dev.txt b/engine/requirements-dev.txt index 361b80bc..112438d9 100644 --- a/engine/requirements-dev.txt +++ b/engine/requirements-dev.txt @@ -1,4 +1,4 @@ -celery-types==0.17.0 +celery-types==0.18.0 django-filter-stubs==0.1.3 django-stubs[compatible-mypy]==4.2.1 djangorestframework-stubs[compatible-mypy]==3.14.1 @@ -10,3 +10,4 @@ pytest_factoryboy==2.5.1 types-beautifulsoup4==4.12.0.5 types-PyMySQL==1.0.19.7 types-python-dateutil==2.8.19.13 +types-requests==2.31.0.1 diff --git a/engine/settings/base.py b/engine/settings/base.py index f62acdb4..30a62bf2 100644 --- a/engine/settings/base.py +++ b/engine/settings/base.py @@ -1,6 +1,7 @@ import base64 import json import os +import typing from random import randrange from celery.schedules import crontab @@ -88,8 +89,8 @@ DANGEROUS_WEBHOOKS_ENABLED = getenv_boolean("DANGEROUS_WEBHOOKS_ENABLED", defaul WEBHOOK_RESPONSE_LIMIT = 50000 # Multiregion settings -ONCALL_GATEWAY_URL = os.environ.get("ONCALL_GATEWAY_URL") -ONCALL_GATEWAY_API_TOKEN = os.environ.get("ONCALL_GATEWAY_API_TOKEN") +ONCALL_GATEWAY_URL = os.environ.get("ONCALL_GATEWAY_URL", "") +ONCALL_GATEWAY_API_TOKEN = os.environ.get("ONCALL_GATEWAY_API_TOKEN", "") ONCALL_BACKEND_REGION = os.environ.get("ONCALL_BACKEND_REGION") # Prometheus exporter metrics endpoint auth @@ -125,7 +126,9 @@ assert DATABASE_TYPE in {DatabaseTypes.MYSQL, DatabaseTypes.POSTGRESQL, Database DATABASE_ENGINE = f"django.db.backends.{DATABASE_TYPE}" -DATABASE_CONFIGS = { +DatabaseConfig = typing.Dict[str, typing.Dict[str, typing.Any]] + +DATABASE_CONFIGS: DatabaseConfig = { DatabaseTypes.SQLITE3: { "ENGINE": DATABASE_ENGINE, "NAME": DATABASE_NAME or "/var/lib/oncall/oncall.db", @@ -152,6 +155,7 @@ DATABASE_CONFIGS = { }, } +READONLY_DATABASES: DatabaseConfig = {} DATABASES = { "default": DATABASE_CONFIGS[DATABASE_TYPE], } @@ -570,7 +574,7 @@ SOCIAL_AUTH_PIPELINE = ( "apps.social_auth.pipeline.delete_slack_auth_token", ) -SOCIAL_AUTH_FIELDS_STORED_IN_SESSION = [] +SOCIAL_AUTH_FIELDS_STORED_IN_SESSION: typing.List[str] = [] SOCIAL_AUTH_REDIRECT_IS_HTTPS = getenv_boolean("SOCIAL_AUTH_REDIRECT_IS_HTTPS", default=True) SOCIAL_AUTH_SLUGIFY_USERNAMES = True diff --git a/engine/settings/dev.py b/engine/settings/dev.py index 48a40ba5..087f013f 100644 --- a/engine/settings/dev.py +++ b/engine/settings/dev.py @@ -70,8 +70,12 @@ INTERNAL_IPS = [ "127.0.0.1", ] -# # the below two lines make it possible to use django-debug-toolbar inside of docker locally -# # https://knasmueller.net/fix-djangos-debug-toolbar-not-showing-inside-docker -# # https://stackoverflow.com/questions/10517765/django-debug-toolbar-not-showing-up -hostname, _, ips = socket.gethostbyname_ex(socket.gethostname()) -INTERNAL_IPS += [".".join(ip.split(".")[:-1] + ["1"]) for ip in ips] +try: + # # the below two lines make it possible to use django-debug-toolbar inside of docker locally + # # https://knasmueller.net/fix-djangos-debug-toolbar-not-showing-inside-docker + # # https://stackoverflow.com/questions/10517765/django-debug-toolbar-not-showing-up + hostname, _, ips = socket.gethostbyname_ex(socket.gethostname()) + INTERNAL_IPS += [".".join(ip.split(".")[:-1] + ["1"]) for ip in ips] +except OSError: + # usually raised if this is being run outside of a docker container context + INTERNAL_IPS = [] diff --git a/engine/type_stubs/icalendar/__init__.pyi b/engine/type_stubs/icalendar/__init__.pyi new file mode 100644 index 00000000..a751526e --- /dev/null +++ b/engine/type_stubs/icalendar/__init__.pyi @@ -0,0 +1,34 @@ +from icalendar.cal import Alarm as Alarm +from icalendar.cal import Calendar as Calendar +from icalendar.cal import ComponentFactory as ComponentFactory +from icalendar.cal import Event as Event +from icalendar.cal import FreeBusy as FreeBusy +from icalendar.cal import Journal as Journal +from icalendar.cal import Timezone as Timezone +from icalendar.cal import TimezoneDaylight as TimezoneDaylight +from icalendar.cal import TimezoneStandard as TimezoneStandard +from icalendar.cal import Todo as Todo +from icalendar.parser import Parameters as Parameters +from icalendar.parser import q_join as q_join +from icalendar.parser import q_split as q_split +from icalendar.prop import FixedOffset as FixedOffset +from icalendar.prop import LocalTimezone as LocalTimezone +from icalendar.prop import TypesFactory as TypesFactory +from icalendar.prop import vBinary as vBinary +from icalendar.prop import vBoolean as vBoolean +from icalendar.prop import vCalAddress as vCalAddress +from icalendar.prop import vDate as vDate +from icalendar.prop import vDatetime as vDatetime +from icalendar.prop import vDDDTypes as vDDDTypes +from icalendar.prop import vDuration as vDuration +from icalendar.prop import vFloat as vFloat +from icalendar.prop import vFrequency as vFrequency +from icalendar.prop import vGeo as vGeo +from icalendar.prop import vInt as vInt +from icalendar.prop import vPeriod as vPeriod +from icalendar.prop import vRecur as vRecur +from icalendar.prop import vText as vText +from icalendar.prop import vTime as vTime +from icalendar.prop import vUri as vUri +from icalendar.prop import vUTCOffset as vUTCOffset +from icalendar.prop import vWeekday as vWeekday diff --git a/engine/type_stubs/icalendar/cal.pyi b/engine/type_stubs/icalendar/cal.pyi new file mode 100644 index 00000000..e45173a2 --- /dev/null +++ b/engine/type_stubs/icalendar/cal.pyi @@ -0,0 +1,109 @@ +from _typeshed import Incomplete +from icalendar.caselessdict import CaselessDict as CaselessDict +from icalendar.compat import unicode_type as unicode_type +from icalendar.parser import Contentline as Contentline +from icalendar.parser import Contentlines as Contentlines +from icalendar.parser import Parameters as Parameters +from icalendar.parser import q_join as q_join +from icalendar.parser import q_split as q_split +from icalendar.parser_tools import DEFAULT_ENCODING as DEFAULT_ENCODING +from icalendar.prop import TypesFactory as TypesFactory +from icalendar.prop import vDDDLists as vDDDLists +from icalendar.prop import vText as vText + +class ComponentFactory(CaselessDict): + def __init__(self, *args, **kwargs) -> None: ... + +INLINE: Incomplete + +class Component(CaselessDict): + name: Incomplete + required: Incomplete + singletons: Incomplete + multiple: Incomplete + exclusive: Incomplete + inclusive: Incomplete + ignore_exceptions: bool + subcomponents: Incomplete + errors: Incomplete + def __init__(self, *args, **kwargs) -> None: ... + def __bool__(self) -> bool: ... + __nonzero__ = __bool__ + def is_empty(self): ... + @property + def is_broken(self): ... + def add(self, name, value, parameters: Incomplete | None = ..., encode: int = ...) -> None: ... + def decoded(self, name, default=...): ... + def get_inline(self, name, decode: int = ...): ... + def set_inline(self, name, values, encode: int = ...) -> None: ... + def add_component(self, component) -> None: ... + def walk(self, name: Incomplete | None = ...): ... + def property_items(self, recursive: bool = ..., sorted: bool = ...): ... + @classmethod + def from_ical(cls, st, multiple: bool = ...): ... + def content_line(self, name, value, sorted: bool = ...): ... + def content_lines(self, sorted: bool = ...): ... + def to_ical(self, sorted: bool = ...): ... + +class Event(Component): + name: str + canonical_order: Incomplete + required: Incomplete + singletons: Incomplete + exclusive: Incomplete + multiple: Incomplete + ignore_exceptions: bool + +class Todo(Component): + name: str + required: Incomplete + singletons: Incomplete + exclusive: Incomplete + multiple: Incomplete + +class Journal(Component): + name: str + required: Incomplete + singletons: Incomplete + multiple: Incomplete + +class FreeBusy(Component): + name: str + required: Incomplete + singletons: Incomplete + multiple: Incomplete + +class Timezone(Component): + name: str + canonical_order: Incomplete + required: Incomplete + singletons: Incomplete + def to_tz(self): ... + +class TimezoneStandard(Component): + name: str + required: Incomplete + singletons: Incomplete + multiple: Incomplete + +class TimezoneDaylight(Component): + name: str + required: Incomplete + singletons: Incomplete + multiple: Incomplete + +class Alarm(Component): + name: str + required: Incomplete + singletons: Incomplete + inclusive: Incomplete + multiple: Incomplete + +class Calendar(Component): + name: str + canonical_order: Incomplete + required: Incomplete + singletons: Incomplete + +types_factory: Incomplete +component_factory: Incomplete diff --git a/engine/type_stubs/icalendar/caselessdict.pyi b/engine/type_stubs/icalendar/caselessdict.pyi new file mode 100644 index 00000000..56066922 --- /dev/null +++ b/engine/type_stubs/icalendar/caselessdict.pyi @@ -0,0 +1,26 @@ +from collections import OrderedDict + +from _typeshed import Incomplete +from icalendar.compat import iteritems as iteritems +from icalendar.parser_tools import to_unicode as to_unicode + +def canonsort_keys(keys, canonical_order: Incomplete | None = ...): ... +def canonsort_items(dict1, canonical_order: Incomplete | None = ...): ... + +class CaselessDict(OrderedDict): + def __init__(self, *args, **kwargs) -> None: ... + def __getitem__(self, key): ... + def __setitem__(self, key, value) -> None: ... + def __delitem__(self, key) -> None: ... + def __contains__(self, key) -> bool: ... + def get(self, key, default: Incomplete | None = ...): ... + def setdefault(self, key, value: Incomplete | None = ...): ... + def pop(self, key, default: Incomplete | None = ...): ... + def popitem(self): ... + def has_key(self, key): ... + def update(self, *args, **kwargs) -> None: ... + def copy(self): ... + def __eq__(self, other): ... + canonical_order: Incomplete + def sorted_keys(self): ... + def sorted_items(self): ... diff --git a/engine/type_stubs/icalendar/cli.pyi b/engine/type_stubs/icalendar/cli.pyi new file mode 100644 index 00000000..d1c48003 --- /dev/null +++ b/engine/type_stubs/icalendar/cli.pyi @@ -0,0 +1,4 @@ +from . import Calendar as Calendar + +def view(input_handle, output_handle) -> None: ... +def main() -> None: ... diff --git a/engine/type_stubs/icalendar/compat.pyi b/engine/type_stubs/icalendar/compat.pyi new file mode 100644 index 00000000..767a2ea2 --- /dev/null +++ b/engine/type_stubs/icalendar/compat.pyi @@ -0,0 +1,5 @@ +from _typeshed import Incomplete + +unicode_type = str +bytes_type = bytes +iteritems: Incomplete diff --git a/engine/type_stubs/icalendar/parser.pyi b/engine/type_stubs/icalendar/parser.pyi new file mode 100644 index 00000000..f5453df8 --- /dev/null +++ b/engine/type_stubs/icalendar/parser.pyi @@ -0,0 +1,54 @@ +from _typeshed import Incomplete +from icalendar import compat as compat +from icalendar.caselessdict import CaselessDict as CaselessDict +from icalendar.parser_tools import DEFAULT_ENCODING as DEFAULT_ENCODING +from icalendar.parser_tools import SEQUENCE_TYPES as SEQUENCE_TYPES +from icalendar.parser_tools import to_unicode as to_unicode +from icalendar.prop import vText as vText + +def escape_char(text): ... +def unescape_char(text): ... +def tzid_from_dt(dt): ... +def foldline(line, limit: int = ..., fold_sep: str = ...): ... +def param_value(value): ... + +NAME: Incomplete +UNSAFE_CHAR: Incomplete +QUNSAFE_CHAR: Incomplete +FOLD: Incomplete +uFOLD: Incomplete +NEWLINE: Incomplete + +def validate_token(name) -> None: ... +def validate_param_value(value, quoted: bool = ...) -> None: ... + +QUOTABLE: Incomplete + +def dquote(val): ... +def q_split(st, sep: str = ..., maxsplit: int = ...): ... +def q_join(lst, sep: str = ...): ... + +class Parameters(CaselessDict): + def params(self): ... + def to_ical(self, sorted: bool = ...): ... + @classmethod + def from_ical(cls, st, strict: bool = ...): ... + +def escape_string(val): ... +def unescape_string(val): ... +def unescape_list_or_string(val): ... + +class Contentline(compat.unicode_type): + strict: Incomplete + def __new__(cls, value, strict: bool = ..., encoding=...): ... + @classmethod + def from_parts(cls, name, params, values, sorted: bool = ...): ... + def parts(self): ... + @classmethod + def from_ical(cls, ical, strict: bool = ...): ... + def to_ical(self): ... + +class Contentlines(list): + def to_ical(self): ... + @classmethod + def from_ical(cls, st): ... diff --git a/engine/type_stubs/icalendar/parser_tools.pyi b/engine/type_stubs/icalendar/parser_tools.pyi new file mode 100644 index 00000000..11f710ed --- /dev/null +++ b/engine/type_stubs/icalendar/parser_tools.pyi @@ -0,0 +1,8 @@ +from _typeshed import Incomplete +from icalendar import compat as compat + +SEQUENCE_TYPES: Incomplete +DEFAULT_ENCODING: str + +def to_unicode(value, encoding: str = ...): ... +def data_encode(data, encoding=...): ... diff --git a/engine/type_stubs/icalendar/prop.pyi b/engine/type_stubs/icalendar/prop.pyi new file mode 100644 index 00000000..81ab0538 --- /dev/null +++ b/engine/type_stubs/icalendar/prop.pyi @@ -0,0 +1,219 @@ +from datetime import tzinfo + +from _typeshed import Incomplete +from icalendar import compat as compat +from icalendar.caselessdict import CaselessDict as CaselessDict +from icalendar.parser import Parameters as Parameters +from icalendar.parser import escape_char as escape_char +from icalendar.parser import tzid_from_dt as tzid_from_dt +from icalendar.parser import unescape_char as unescape_char +from icalendar.parser_tools import DEFAULT_ENCODING as DEFAULT_ENCODING +from icalendar.parser_tools import SEQUENCE_TYPES as SEQUENCE_TYPES +from icalendar.parser_tools import to_unicode as to_unicode +from icalendar.windows_to_olson import WINDOWS_TO_OLSON as WINDOWS_TO_OLSON + +DATE_PART: str +TIME_PART: str +DATETIME_PART: Incomplete +WEEKS_PART: str +DURATION_REGEX: Incomplete +WEEKDAY_RULE: Incomplete +ZERO: Incomplete +HOUR: Incomplete +STDOFFSET: Incomplete +DSTOFFSET: Incomplete +DSTOFFSET = STDOFFSET +DSTDIFF: Incomplete + +class FixedOffset(tzinfo): + def __init__(self, offset, name) -> None: ... + def utcoffset(self, dt): ... + def tzname(self, dt): ... + def dst(self, dt): ... + +class LocalTimezone(tzinfo): + def utcoffset(self, dt): ... + def dst(self, dt): ... + def tzname(self, dt): ... + +class vBinary: + obj: Incomplete + params: Incomplete + def __init__(self, obj) -> None: ... + def to_ical(self): ... + @staticmethod + def from_ical(ical): ... + +class vBoolean(int): + BOOL_MAP: Incomplete + params: Incomplete + def __new__(cls, *args, **kwargs): ... + def to_ical(self): ... + @classmethod + def from_ical(cls, ical): ... + +class vCalAddress(compat.unicode_type): + params: Incomplete + def __new__(cls, value, encoding=...): ... + def to_ical(self): ... + @classmethod + def from_ical(cls, ical): ... + +class vFloat(float): + params: Incomplete + def __new__(cls, *args, **kwargs): ... + def to_ical(self): ... + @classmethod + def from_ical(cls, ical): ... + +class vInt(int): + params: Incomplete + def __new__(cls, *args, **kwargs): ... + def to_ical(self): ... + @classmethod + def from_ical(cls, ical): ... + +class vDDDLists: + params: Incomplete + dts: Incomplete + def __init__(self, dt_list) -> None: ... + def to_ical(self): ... + @staticmethod + def from_ical(ical, timezone: Incomplete | None = ...): ... + +class vCategory: + cats: Incomplete + def __init__(self, c_list) -> None: ... + def to_ical(self): ... + @staticmethod + def from_ical(ical, timezone: Incomplete | None = ...): ... + +class vDDDTypes: + params: Incomplete + dt: Incomplete + def __init__(self, dt) -> None: ... + def to_ical(self): ... + @classmethod + def from_ical(cls, ical, timezone: Incomplete | None = ...): ... + +class vDate: + dt: Incomplete + params: Incomplete + def __init__(self, dt) -> None: ... + def to_ical(self): ... + @staticmethod + def from_ical(ical): ... + +class vDatetime: + dt: Incomplete + params: Incomplete + def __init__(self, dt) -> None: ... + def to_ical(self): ... + @staticmethod + def from_ical(ical, timezone: Incomplete | None = ...): ... + +class vDuration: + td: Incomplete + params: Incomplete + def __init__(self, td) -> None: ... + def to_ical(self): ... + @staticmethod + def from_ical(ical): ... + +class vPeriod: + params: Incomplete + start: Incomplete + end: Incomplete + by_duration: Incomplete + duration: Incomplete + def __init__(self, per) -> None: ... + def __cmp__(self, other): ... + def overlaps(self, other): ... + def to_ical(self): ... + @staticmethod + def from_ical(ical): ... + +class vWeekday(compat.unicode_type): + week_days: Incomplete + relative: Incomplete + params: Incomplete + def __new__(cls, value, encoding=...): ... + def to_ical(self): ... + @classmethod + def from_ical(cls, ical): ... + +class vFrequency(compat.unicode_type): + frequencies: Incomplete + params: Incomplete + def __new__(cls, value, encoding=...): ... + def to_ical(self): ... + @classmethod + def from_ical(cls, ical): ... + +class vRecur(CaselessDict): + frequencies: Incomplete + canonical_order: Incomplete + types: Incomplete + params: Incomplete + def __init__(self, *args, **kwargs) -> None: ... + def to_ical(self): ... + @classmethod + def parse_type(cls, key, values): ... + @classmethod + def from_ical(cls, ical): ... + +class vText(compat.unicode_type): + encoding: Incomplete + params: Incomplete + def __new__(cls, value, encoding=...): ... + def to_ical(self): ... + @classmethod + def from_ical(cls, ical): ... + +class vTime: + dt: Incomplete + params: Incomplete + def __init__(self, *args) -> None: ... + def to_ical(self): ... + @staticmethod + def from_ical(ical): ... + +class vUri(compat.unicode_type): + params: Incomplete + def __new__(cls, value, encoding=...): ... + def to_ical(self): ... + @classmethod + def from_ical(cls, ical): ... + +class vGeo: + latitude: Incomplete + longitude: Incomplete + params: Incomplete + def __init__(self, geo) -> None: ... + def to_ical(self): ... + @staticmethod + def from_ical(ical): ... + +class vUTCOffset: + ignore_exceptions: bool + td: Incomplete + params: Incomplete + def __init__(self, td) -> None: ... + def to_ical(self): ... + @classmethod + def from_ical(cls, ical): ... + +class vInline(compat.unicode_type): + params: Incomplete + def __new__(cls, value, encoding=...): ... + def to_ical(self): ... + @classmethod + def from_ical(cls, ical): ... + +class TypesFactory(CaselessDict): + all_types: Incomplete + def __init__(self, *args, **kwargs) -> None: ... + types_map: Incomplete + def for_property(self, name): ... + def to_ical(self, name, value): ... + def from_ical(self, name, value): ... diff --git a/engine/type_stubs/icalendar/timezone_cache.pyi b/engine/type_stubs/icalendar/timezone_cache.pyi new file mode 100644 index 00000000..e69de29b diff --git a/engine/type_stubs/icalendar/tools.pyi b/engine/type_stubs/icalendar/tools.pyi new file mode 100644 index 00000000..b01e5408 --- /dev/null +++ b/engine/type_stubs/icalendar/tools.pyi @@ -0,0 +1,9 @@ +from _typeshed import Incomplete +from icalendar.parser_tools import to_unicode as to_unicode +from icalendar.prop import vDatetime as vDatetime +from icalendar.prop import vText as vText + +class UIDGenerator: + chars: Incomplete + def rnd_string(self, length: int = ...): ... + def uid(self, host_name: str = ..., unique: str = ...): ... diff --git a/engine/type_stubs/icalendar/windows_to_olson.pyi b/engine/type_stubs/icalendar/windows_to_olson.pyi new file mode 100644 index 00000000..3567f9a1 --- /dev/null +++ b/engine/type_stubs/icalendar/windows_to_olson.pyi @@ -0,0 +1,3 @@ +from _typeshed import Incomplete + +WINDOWS_TO_OLSON: Incomplete