continue addressing mypy violations (#2170)

# What this PR does

See #2173 

Also, closes #2187 . All of the new files under `type_stubs/icalendar`
were autogenerated by running:

```bash
stubgen -p icalendar -o type_stubs
```

## Checklist

- [ ] Unit, integration, and e2e (if applicable) tests updated
- [ ] Documentation added (or `pr:no public docs` PR label added if not
required)
- [ ] `CHANGELOG.md` updated (or `pr:no changelog` PR label added if not
required)
This commit is contained in:
Joey Orlando 2023-06-27 12:23:08 +02:00 committed by GitHub
parent 951a2a5d45
commit 75028d0427
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
52 changed files with 976 additions and 357 deletions

View file

@ -1,6 +1,6 @@
import datetime
import logging
from typing import Optional
import typing
import pytz
from celery import uuid as celery_uuid
@ -17,6 +17,9 @@ from apps.alerts.escalation_snapshot.snapshot_classes import (
)
from apps.alerts.tasks import escalate_alert_group
if typing.TYPE_CHECKING:
from apps.alerts.models import ChannelFilter
logger = logging.getLogger(__name__)
# Is a delay to prevent intermediate activity by system in case user is doing some multi-step action.
@ -29,6 +32,11 @@ class EscalationSnapshotMixin:
Mixin for AlertGroup. It contains methods related with alert group escalation
"""
# TODO: add stricter typing
# TODO: should this class actually be an AbstractBaseClass instead?
raw_escalation_snapshot: dict | None
channel_filter: typing.Optional["ChannelFilter"]
def build_raw_escalation_snapshot(self) -> dict:
"""
Builds new escalation chain in a json serializable format (dict).
@ -91,7 +99,7 @@ class EscalationSnapshotMixin:
data = {}
if self.escalation_chain_exists:
channel_filter = self.channel_filter
channel_filter: "ChannelFilter" = self.channel_filter
escalation_chain = channel_filter.escalation_chain
escalation_policies = escalation_chain.escalation_policies.all()
@ -116,7 +124,7 @@ class EscalationSnapshotMixin:
return self.escalation_chain_snapshot or (self.channel_filter.escalation_chain if self.channel_filter else None)
@cached_property
def channel_filter_snapshot(self) -> Optional[ChannelFilterSnapshot]:
def channel_filter_snapshot(self) -> typing.Optional[ChannelFilterSnapshot]:
"""
in some cases we need only channel filter and don't want to serialize whole escalation
"""
@ -132,7 +140,7 @@ class EscalationSnapshotMixin:
return ChannelFilterSnapshot(**channel_filter_snapshot)
@cached_property
def escalation_chain_snapshot(self) -> Optional[EscalationChainSnapshot]:
def escalation_chain_snapshot(self) -> typing.Optional[EscalationChainSnapshot]:
"""
in some cases we need only escalation chain and don't want to serialize whole escalation
escalation_chain_snapshot_object = None
@ -149,7 +157,7 @@ class EscalationSnapshotMixin:
return EscalationChainSnapshot(**escalation_chain_snapshot)
@cached_property
def escalation_snapshot(self) -> Optional[EscalationSnapshot]:
def escalation_snapshot(self) -> typing.Optional[EscalationSnapshot]:
raw_escalation_snapshot = self.raw_escalation_snapshot
if raw_escalation_snapshot:
try:
@ -207,7 +215,7 @@ class EscalationSnapshotMixin:
return self.raw_escalation_snapshot.get("pause_escalation", False)
@property
def next_step_eta(self) -> Optional[datetime.datetime]:
def next_step_eta(self) -> typing.Optional[datetime.datetime]:
"""
get next_step_eta field directly to avoid serialization overhead
"""

View file

@ -117,7 +117,7 @@ class EscalationPolicySnapshot:
return next_user
def execute(self, alert_group: "AlertGroup", reason) -> StepExecutionResultData:
action_map: typing.Dict[typing.Union[int, None], EscalationPolicySnapshot.StepExecutionFunc] = {
action_map: typing.Dict[typing.Optional[int], EscalationPolicySnapshot.StepExecutionFunc] = {
EscalationPolicy.STEP_WAIT: self._escalation_step_wait,
EscalationPolicy.STEP_FINAL_NOTIFYALL: self._escalation_step_notify_all,
EscalationPolicy.STEP_REPEAT_ESCALATION_N_TIMES: self._escalation_step_repeat_escalation_n_times,

View file

@ -92,7 +92,7 @@ class EscalationSnapshot:
return [self.escalation_policies_snapshots[0]]
return self.escalation_policies_snapshots[: self.last_active_escalation_policy_order]
def next_step_eta_is_valid(self) -> typing.Union[None, bool]:
def next_step_eta_is_valid(self) -> typing.Optional[bool]:
"""
`next_step_eta` should never be less than the current time (with a 5 minute buffer provided)
as this field should be updated as the escalation policy is executed over time. If it is, this means that
@ -109,7 +109,8 @@ class EscalationSnapshot:
self.alert_group.raw_escalation_snapshot = self.convert_to_dict()
self.alert_group.save(update_fields=["raw_escalation_snapshot"])
def convert_to_dict(self) -> dict:
# TODO: update the typing here, be more strict about what this returns
def convert_to_dict(self):
return self.serializer(self).data
def execute_actual_escalation_step(self) -> None:

View file

@ -11,7 +11,8 @@ from apps.grafana_plugin.helpers import GrafanaAPIClient
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from apps.alerts.models import GrafanaAlertingContactPoint
from apps.alerts.models import AlertReceiveChannel, GrafanaAlertingContactPoint
from apps.user_management.models import Organization
class GrafanaAlertingSyncManager:
@ -24,7 +25,7 @@ class GrafanaAlertingSyncManager:
ALERTING_DATASOURCE = "alertmanager"
IS_GRAFANA_VERSION_GRE_9 = None
def __init__(self, alert_receive_channel):
def __init__(self, alert_receive_channel: "AlertReceiveChannel") -> None:
self.alert_receive_channel = alert_receive_channel
self.client = GrafanaAPIClient(
api_url=self.alert_receive_channel.organization.grafana_url,
@ -33,7 +34,7 @@ class GrafanaAlertingSyncManager:
self.receiver_name = self.alert_receive_channel.emojized_verbal_name
@classmethod
def check_for_connection_errors(cls, organization) -> Optional[str]:
def check_for_connection_errors(cls, organization: "Organization") -> Optional[str]:
"""Check if it possible to connect to alerting, otherwise return error message"""
client = GrafanaAPIClient(api_url=organization.grafana_url, api_token=organization.api_token)
recipient = cls.GRAFANA_CONTACT_POINT
@ -561,7 +562,7 @@ class GrafanaAlertingSyncManager:
break
return name_in_alerting
def get_datasource_name(self, contact_point) -> str:
def get_datasource_name(self, contact_point: "GrafanaAlertingContactPoint") -> str:
datasource_id = contact_point.datasource_id
datasource_uid = contact_point.datasource_uid
datasource, response_info = self.client.get_datasource(datasource_uid)

View file

@ -65,10 +65,10 @@ class TemplateLoader:
@dataclass
class TemplatedAlert:
title: str = None
message: str = None
image_url: str = None
source_link: str = None
title: str | None = None
message: str | None = None
image_url: str | None = None
source_link: str | None = None
class AlertTemplater(ABC):
@ -160,7 +160,7 @@ class AlertTemplater(ABC):
return templated_alert
def _render_attribute_with_template(self, attr, data, channel, templated_alert):
def _render_attribute_with_template(self, attr, data, channel, templated_alert: TemplatedAlert) -> str | None:
"""
Get attr template and then apply it.
If attr template is None or invalid will return None.
@ -212,5 +212,5 @@ class AlertTemplater(ABC):
return None
@abstractmethod
def _render_for(self):
def _render_for(self) -> str:
raise NotImplementedError

View file

@ -1,10 +1,10 @@
import datetime
import logging
import typing
import urllib
from collections import namedtuple
from typing import Optional, TypedDict
from urllib.parse import urljoin
from uuid import uuid1
from uuid import UUID, uuid1
from celery import uuid as celery_uuid
from django.apps import apps
@ -33,6 +33,11 @@ from common.utils import clean_markup, str_or_backup
from .alert_group_counter import AlertGroupCounter
if typing.TYPE_CHECKING:
from django.db.models.manager import RelatedManager
from apps.alerts.models import AlertGroupLogRecord
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
@ -51,9 +56,9 @@ def generate_public_primary_key_for_alert_group():
return new_public_primary_key
class Permalinks(TypedDict):
slack: Optional[str]
telegram: Optional[str]
class Permalinks(typing.TypedDict):
slack: typing.Optional[str]
telegram: typing.Optional[str]
web: str
@ -133,6 +138,8 @@ class AlertGroupSlackRenderingMixin:
class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.Model):
log_records: "RelatedManager['AlertGroupLogRecord']"
all_objects = AlertGroupQuerySet.as_manager()
unarchived_objects = UnarchivedAlertGroupQuerySet.as_manager()
@ -324,7 +331,9 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
cached_render_for_web = models.JSONField(default=dict)
active_cache_for_web_calculation_id = models.CharField(max_length=100, null=True, default=None)
last_unique_unacknowledge_process_id = models.CharField(max_length=100, null=True, default=None)
# NOTE: we should probably migrate this field to models.UUIDField as it's ONLY ever being
# set to the result of uuid.uuid1
last_unique_unacknowledge_process_id: UUID | None = models.CharField(max_length=100, null=True, default=None)
is_archived = models.BooleanField(default=False)
wiped_at = models.DateTimeField(null=True, default=None)
@ -457,11 +466,11 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
raise NotImplementedError
@property
def slack_permalink(self) -> Optional[str]:
def slack_permalink(self) -> typing.Optional[str]:
return None if self.slack_message is None else self.slack_message.permalink
@property
def telegram_permalink(self) -> Optional[str]:
def telegram_permalink(self) -> typing.Optional[str]:
"""
This property will attempt to access an attribute, `prefetched_telegram_messages`, representing a list of
prefetched telegram messages. If this attribute does not exist, it falls back to performing a query.
@ -529,7 +538,7 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
started_at=self.started_at,
)
def acknowledge_by_user(self, user: User, action_source: Optional[str] = None) -> None:
def acknowledge_by_user(self, user: User, action_source: typing.Optional[str] = None) -> None:
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
initial_state = self.state
logger.debug(f"Started acknowledge_by_user for alert_group {self.pk}")
@ -611,7 +620,7 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
for dependent_alert_group in self.dependent_alert_groups.all():
dependent_alert_group.acknowledge_by_source()
def un_acknowledge_by_user(self, user: User, action_source: Optional[str] = None) -> None:
def un_acknowledge_by_user(self, user: User, action_source: typing.Optional[str] = None) -> None:
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
initial_state = self.state
logger.debug(f"Started un_acknowledge_by_user for alert_group {self.pk}")
@ -639,7 +648,7 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
dependent_alert_group.un_acknowledge_by_user(user, action_source=action_source)
logger.debug(f"Finished un_acknowledge_by_user for alert_group {self.pk}")
def resolve_by_user(self, user: User, action_source: Optional[str] = None) -> None:
def resolve_by_user(self, user: User, action_source: typing.Optional[str] = None) -> None:
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
initial_state = self.state
@ -786,7 +795,7 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
for dependent_alert_group in self.dependent_alert_groups.all():
dependent_alert_group.resolve_by_disable_maintenance()
def un_resolve_by_user(self, user: User, action_source: Optional[str] = None) -> None:
def un_resolve_by_user(self, user: User, action_source: typing.Optional[str] = None) -> None:
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
if self.wiped_at is None:
@ -815,7 +824,9 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
for dependent_alert_group in self.dependent_alert_groups.all():
dependent_alert_group.un_resolve_by_user(user, action_source=action_source)
def attach_by_user(self, user: User, root_alert_group: "AlertGroup", action_source: Optional[str] = None) -> None:
def attach_by_user(
self, user: User, root_alert_group: "AlertGroup", action_source: typing.Optional[str] = None
) -> None:
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
if root_alert_group.root_alert_group is None and not root_alert_group.resolved:
@ -891,10 +902,10 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
action_source=action_source,
)
def un_attach_by_user(self, user: User, action_source: Optional[str] = None) -> None:
def un_attach_by_user(self, user: User, action_source: typing.Optional[str] = None) -> None:
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
root_alert_group = self.root_alert_group
root_alert_group: AlertGroup = self.root_alert_group
self.root_alert_group = None
self.save(update_fields=["root_alert_group"])
@ -963,7 +974,9 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
action_source=None,
)
def silence_by_user(self, user: User, silence_delay: Optional[int], action_source: Optional[str] = None) -> None:
def silence_by_user(
self, user: User, silence_delay: typing.Optional[int], action_source: typing.Optional[str] = None
) -> None:
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
initial_state = self.state
@ -1020,7 +1033,7 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
for dependent_alert_group in self.dependent_alert_groups.all():
dependent_alert_group.silence_by_user(user, silence_delay, action_source)
def un_silence_by_user(self, user: User, action_source: Optional[str] = None) -> None:
def un_silence_by_user(self, user: User, action_source: typing.Optional[str] = None) -> None:
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
initial_state = self.state
@ -1322,7 +1335,10 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
if not root_alert_groups_to_resolve.exists():
return
organization = root_alert_groups_to_resolve.first().channel.organization
# we know this is an AlertGroup because of the .exists() check just above
first_alert_group: AlertGroup = root_alert_groups_to_resolve.first()
organization = first_alert_group.channel.organization
if organization.is_resolution_note_required:
root_alert_groups_to_resolve = root_alert_groups_to_resolve.filter(
Q(resolution_notes__isnull=False, resolution_notes__deleted_at=None)

View file

@ -1,4 +1,5 @@
import logging
import typing
from functools import cached_property
from urllib.parse import urljoin
@ -37,6 +38,11 @@ from common.insight_log import EntityEvent, write_resource_insight_log
from common.jinja_templater import jinja_template_env
from common.public_primary_keys import generate_public_primary_key, increase_public_primary_key_length
if typing.TYPE_CHECKING:
from django.db.models.manager import RelatedManager
from apps.alerts.models import GrafanaAlertingContactPoint
logger = logging.getLogger(__name__)
@ -108,6 +114,8 @@ class AlertReceiveChannel(IntegrationOptionsMixin, MaintainableObject):
Channel generated by user to receive Alerts to.
"""
contact_points: "RelatedManager['GrafanaAlertingContactPoint']"
objects = AlertReceiveChannelManager()
objects_with_maintenance = AlertReceiveChannelManagerWithMaintenance()
objects_with_deleted = models.Manager()
@ -609,7 +617,9 @@ class AlertReceiveChannel(IntegrationOptionsMixin, MaintainableObject):
@receiver(post_save, sender=AlertReceiveChannel)
def listen_for_alertreceivechannel_model_save(sender, instance, created, *args, **kwargs):
def listen_for_alertreceivechannel_model_save(
sender: AlertReceiveChannel, instance: AlertReceiveChannel, created: bool, *args, **kwargs
) -> None:
ChannelFilter = apps.get_model("alerts", "ChannelFilter")
IntegrationHeartBeat = apps.get_model("heartbeat", "IntegrationHeartBeat")

View file

@ -1,7 +1,8 @@
import datetime
from django.conf import settings
from django.core.validators import MinLengthValidator
from django.db import models
from django.utils import timezone
from ordered_model.models import OrderedModel
from common.public_primary_keys import generate_public_primary_key, increase_public_primary_key_length
@ -271,13 +272,13 @@ class EscalationPolicy(OrderedModel):
null=True,
)
ONE_MINUTE = timezone.timedelta(minutes=1)
FIVE_MINUTES = timezone.timedelta(minutes=5)
FIFTEEN_MINUTES = timezone.timedelta(minutes=15)
THIRTY_MINUTES = timezone.timedelta(minutes=30)
HOUR = timezone.timedelta(minutes=60)
ONE_MINUTE = datetime.timedelta(minutes=1)
FIVE_MINUTES = datetime.timedelta(minutes=5)
FIFTEEN_MINUTES = datetime.timedelta(minutes=15)
THIRTY_MINUTES = datetime.timedelta(minutes=30)
HOUR = datetime.timedelta(minutes=60)
DEFAULT_WAIT_DELAY = timezone.timedelta(minutes=5)
DEFAULT_WAIT_DELAY = datetime.timedelta(minutes=5)
DURATION_CHOICES = (
(ONE_MINUTE, "1 min"),

View file

@ -1,3 +1,4 @@
import datetime
from uuid import uuid4
import humanize
@ -14,11 +15,11 @@ class MaintainableObject(models.Model):
class Meta:
abstract = True
DURATION_ONE_HOUR = timezone.timedelta(hours=1)
DURATION_THREE_HOURS = timezone.timedelta(hours=3)
DURATION_SIX_HOURS = timezone.timedelta(hours=6)
DURATION_TWELVE_HOURS = timezone.timedelta(hours=12)
DURATION_TWENTY_FOUR_HOURS = timezone.timedelta(hours=24)
DURATION_ONE_HOUR = datetime.timedelta(hours=1)
DURATION_THREE_HOURS = datetime.timedelta(hours=3)
DURATION_SIX_HOURS = datetime.timedelta(hours=6)
DURATION_TWELVE_HOURS = datetime.timedelta(hours=12)
DURATION_TWENTY_FOUR_HOURS = datetime.timedelta(hours=24)
MAINTENANCE_DURATION_CHOICES = (
(DURATION_ONE_HOUR, "1 hour"),
@ -97,7 +98,7 @@ class MaintainableObject(models.Model):
maintenance_uuid = _self.start_disable_maintenance_task(maintenance_duration)
_self.maintenance_duration = timezone.timedelta(seconds=maintenance_duration)
_self.maintenance_duration = datetime.timedelta(seconds=maintenance_duration)
_self.maintenance_uuid = maintenance_uuid
_self.maintenance_mode = mode
_self.maintenance_started_at = timezone.now()

View file

@ -1,7 +1,7 @@
from django.apps import apps
from django.conf import settings
from django.db import transaction
from kombu import uuid as celery_uuid
from kombu.utils.uuid import uuid as celery_uuid
from common.custom_celery_tasks import shared_dedicated_queue_retry_task

View file

@ -4,7 +4,7 @@ from django.apps import apps
from django.conf import settings
from django.db import transaction
from django.utils import timezone
from kombu import uuid as celery_uuid
from kombu.utils.uuid import uuid as celery_uuid
from apps.alerts.constants import NEXT_ESCALATION_DELAY
from apps.alerts.signals import user_notification_action_triggered_signal

View file

@ -2,6 +2,7 @@ import enum
import typing
from django.conf import settings
from django.contrib.auth.models import AbstractUser
from rest_framework import permissions
from rest_framework.authentication import BasicAuthentication, SessionAuthentication
from rest_framework.request import Request
@ -10,6 +11,9 @@ from rest_framework.viewsets import ViewSet, ViewSetMixin
from common.utils import getattrd
if typing.TYPE_CHECKING:
from apps.user_management.models import User
ACTION_PREFIX = "grafana-oncall-app"
RBAC_PERMISSIONS_ATTR = "rbac_permissions"
RBAC_OBJECT_PERMISSIONS_ATTR = "rbac_object_permissions"
@ -17,6 +21,31 @@ RBAC_OBJECT_PERMISSIONS_ATTR = "rbac_object_permissions"
ViewSetOrAPIView = typing.Union[ViewSet, APIView]
class AuthenticatedRequest(Request):
"""
Use this for typing, instead of rest_framework.request.Request, when you KNOW that the user is authenticated.
ex. In the RBACPermission class below, we know that the user is authenticated because this is handled by the
`authentication_classes` attribute on views.
https://github.com/typeddjango/django-stubs#how-can-i-create-a-httprequest-thats-guaranteed-to-have-an-authenticated-user
"""
# see comment above, this is safe. without the type-ignore comment, mypy complains
# expression has type "User", base class "Request" defined the type as "Union[AbstractBaseUser, AnonymousUser]"
user: "User" # type: ignore[assignment]
class AuthenticatedDjangoAdminRequest(Request):
"""
Use this for typing, instead of rest_framework.request.Request, when you KNOW that the user is authenticated via
Django admin user authentication.
https://github.com/typeddjango/django-stubs#how-can-i-create-a-httprequest-thats-guaranteed-to-have-an-authenticated-user
"""
user: AbstractUser
class GrafanaAPIPermission(typing.TypedDict):
action: str
@ -62,9 +91,12 @@ class LegacyAccessControlCompatiblePermission:
self.fallback_role = fallback_role
def get_most_authorized_role(
permissions: typing.List[LegacyAccessControlCompatiblePermission],
) -> LegacyAccessControlRole:
LegacyAccessControlCompatiblePermissions = typing.List[LegacyAccessControlCompatiblePermission]
RBACPermissionsAttribute = typing.Dict[str, LegacyAccessControlCompatiblePermissions]
RBACObjectPermissionsAttribute = typing.Dict[permissions.BasePermission, typing.List[str]]
def get_most_authorized_role(permissions: LegacyAccessControlCompatiblePermissions) -> LegacyAccessControlRole:
if not permissions:
return LegacyAccessControlRole.VIEWER
@ -72,22 +104,18 @@ def get_most_authorized_role(
return min({p.fallback_role for p in permissions}, key=lambda r: r.value)
def user_is_authorized(user, required_permissions: typing.List[LegacyAccessControlCompatiblePermission]) -> bool:
def user_is_authorized(user: "User", required_permissions: LegacyAccessControlCompatiblePermissions) -> bool:
"""
This function checks whether `user` has all permissions in `required_permissions`. RBAC permissions are used
if RBAC is enabled for the organization, otherwise the fallback basic role is checked.
Parameters
----------
user : apps.user_management.models.user.User
The user to check permissions for
required_permissions : typing.List[LegacyAccessControlCompatiblePermission]
A list of permissions that a user must have to be considered authorized
user - The user to check permissions for
required_permissions - A list of permissions that a user must have to be considered authorized
"""
if user.organization.is_rbac_permissions_enabled:
user_permissions = [u["action"] for u in user.permissions]
required_permissions = [p.value for p in required_permissions]
return all(permission in user_permissions for permission in required_permissions)
required_permission_values = [p.value for p in required_permissions]
return all(permission in user_permissions for permission in required_permission_values)
return user.role <= get_most_authorized_role(required_permissions).value
@ -187,15 +215,18 @@ class RBACPermission(permissions.BasePermission):
)
@staticmethod
def _get_view_action(request: Request, view: ViewSetOrAPIView) -> str:
def _get_view_action(request: AuthenticatedRequest, view: ViewSetOrAPIView) -> str:
"""
For right now this needs to support being used in both a ViewSet as well as APIView, we use both interchangably
Note: `request.method` is returned uppercase
"""
return view.action if isinstance(view, ViewSetMixin) else request.method.lower()
return view.action if isinstance(view, ViewSetMixin) else (request.method or "").lower()
def has_permission(self, request: Request, view: ViewSetOrAPIView) -> bool:
# mypy complains about "Liskov substitution principle" here because request is `AuthenticatedRequest` object
# and not rest_framework.request.Request
# https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides
def has_permission(self, request: AuthenticatedRequest, view: ViewSetOrAPIView) -> bool: # type: ignore[override]
# the django-debug-toolbar UI makes OPTIONS calls. Without this statement the debug UI can't gather the
# necessary info it needs to work properly
if settings.DEBUG and request.method == "OPTIONS":
@ -203,14 +234,14 @@ class RBACPermission(permissions.BasePermission):
action = self._get_view_action(request, view)
rbac_permissions: RBACPermissionsAttribute = getattr(view, RBAC_PERMISSIONS_ATTR, None)
rbac_permissions: typing.Optional[RBACPermissionsAttribute] = getattr(view, RBAC_PERMISSIONS_ATTR, None)
# first check that the rbac_permissions dict attribute is defined
assert (
rbac_permissions is not None
), f"Must define a {RBAC_PERMISSIONS_ATTR} dict on the ViewSet that is consuming the RBACPermission class"
action_required_permissions: typing.Union[None, typing.List] = rbac_permissions.get(action, None)
action_required_permissions: typing.Optional[typing.List] = rbac_permissions.get(action, None)
# next check that the action in question is defined within the rbac_permissions dict attribute
assert (
@ -220,8 +251,13 @@ class RBACPermission(permissions.BasePermission):
return user_is_authorized(request.user, action_required_permissions)
def has_object_permission(self, request: Request, view: ViewSetOrAPIView, obj: typing.Any) -> bool:
rbac_object_permissions: RBACObjectPermissionsAttribute = getattr(view, RBAC_OBJECT_PERMISSIONS_ATTR, None)
# mypy complains about "Liskov substitution principle" here because request is `AuthenticatedRequest` object
# and not rest_framework.request.Request
# https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides
def has_object_permission(self, request: AuthenticatedRequest, view: ViewSetOrAPIView, obj: typing.Any) -> bool: # type: ignore[override]
rbac_object_permissions: typing.Optional[RBACObjectPermissionsAttribute] = getattr(
view, RBAC_OBJECT_PERMISSIONS_ATTR, None
)
if rbac_object_permissions:
action = self._get_view_action(request, view)
@ -250,35 +286,45 @@ def get_permission_from_permission_string(perm: str) -> typing.Optional[LegacyAc
for permission_class in ALL_PERMISSION_CLASSES:
if permission_class.value == perm:
return permission_class
return None
class IsOwner(permissions.BasePermission):
def __init__(self, ownership_field: typing.Optional[str] = None) -> None:
self.ownership_field = ownership_field
def has_object_permission(self, request: Request, _view: ViewSet, obj: typing.Any) -> bool:
# mypy complains about "Liskov substitution principle" here because request is `AuthenticatedRequest` object
# and not rest_framework.request.Request
# https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides
def has_object_permission(self, request: AuthenticatedRequest, _view: ViewSetOrAPIView, obj: typing.Any) -> bool: # type: ignore[override]
owner = obj if self.ownership_field is None else getattrd(obj, self.ownership_field)
return owner == request.user
class HasRBACPermissions(permissions.BasePermission):
def __init__(self, required_permissions: typing.List[LegacyAccessControlCompatiblePermission]) -> None:
def __init__(self, required_permissions: LegacyAccessControlCompatiblePermissions) -> None:
self.required_permissions = required_permissions
def has_object_permission(self, request: Request, _view: ViewSetOrAPIView, _obj: typing.Any) -> bool:
# mypy complains about "Liskov substitution principle" here because request is `AuthenticatedRequest` object
# and not rest_framework.request.Request
# https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides
def has_object_permission(self, request: AuthenticatedRequest, _view: ViewSetOrAPIView, _obj: typing.Any) -> bool: # type: ignore[override]
return user_is_authorized(request.user, self.required_permissions)
class IsOwnerOrHasRBACPermissions(permissions.BasePermission):
def __init__(
self,
required_permissions: typing.List[LegacyAccessControlCompatiblePermission],
required_permissions: LegacyAccessControlCompatiblePermissions,
ownership_field: typing.Optional[str] = None,
) -> None:
self.IsOwner = IsOwner(ownership_field)
self.HasRBACPermissions = HasRBACPermissions(required_permissions)
def has_object_permission(self, request: Request, view: ViewSetOrAPIView, obj: typing.Any) -> bool:
# mypy complains about "Liskov substitution principle" here because request is `AuthenticatedRequest` object
# and not rest_framework.request.Request
# https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides
def has_object_permission(self, request: AuthenticatedRequest, view: ViewSetOrAPIView, obj: typing.Any) -> bool: # type: ignore[override]
return self.IsOwner.has_object_permission(request, view, obj) or self.HasRBACPermissions.has_object_permission(
request, view, obj
)
@ -287,14 +333,13 @@ class IsOwnerOrHasRBACPermissions(permissions.BasePermission):
class IsStaff(permissions.BasePermission):
STAFF_AUTH_CLASSES = [BasicAuthentication, SessionAuthentication]
def has_permission(self, request: Request, _view: ViewSet) -> bool:
# mypy complains about "Liskov substitution principle" here because request is `AuthenticatedRequest` object
# and not rest_framework.request.Request
# https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides
def has_permission(self, request: AuthenticatedDjangoAdminRequest, _view: ViewSet) -> bool: # type: ignore[override]
user = request.user
if not any(isinstance(request._authenticator, x) for x in self.STAFF_AUTH_CLASSES):
return False
if user and user.is_authenticated:
return user.is_staff
return False
RBACPermissionsAttribute = typing.Dict[str, typing.List[LegacyAccessControlCompatiblePermission]]
RBACObjectPermissionsAttribute = typing.Dict[permissions.BasePermission, typing.List[str]]

View file

@ -101,7 +101,7 @@ class EscalationPolicySerializer(EagerLoadingMixin, serializers.ModelSerializer)
"notify_to_group",
"important",
]
read_only_fields = ("order",)
read_only_fields = ["order"]
SELECT_RELATED = [
"escalation_chain",
@ -199,7 +199,7 @@ class EscalationPolicySerializer(EagerLoadingMixin, serializers.ModelSerializer)
class EscalationPolicyCreateSerializer(EscalationPolicySerializer):
class Meta(EscalationPolicySerializer.Meta):
read_only_fields = ("order",)
read_only_fields = ["order"]
extra_kwargs = {"escalation_chain": {"required": True, "allow_null": False}}
def create(self, validated_data):
@ -212,7 +212,7 @@ class EscalationPolicyUpdateSerializer(EscalationPolicySerializer):
escalation_chain = serializers.CharField(read_only=True, source="escalation_chain.public_primary_key")
class Meta(EscalationPolicySerializer.Meta):
read_only_fields = ("order", "escalation_chain")
read_only_fields = ["order", "escalation_chain"]
def update(self, instance, validated_data):
step = validated_data.get("step", instance.step)

View file

@ -213,7 +213,7 @@ class OnCallShiftUpdateSerializer(OnCallShiftSerializer):
type = serializers.ReadOnlyField()
class Meta(OnCallShiftSerializer.Meta):
read_only_fields = ("schedule", "type")
read_only_fields = ["schedule", "type"]
def update(self, instance, validated_data):
validated_data = self._correct_validated_data(instance.type, validated_data)

View file

@ -16,9 +16,9 @@ class TeamSerializer(serializers.ModelSerializer):
"is_sharing_resources_to_all",
)
read_only_fields = (
read_only_fields = [
"id",
"name",
"email",
"avatar_url",
)
]

View file

@ -100,7 +100,7 @@ class UserNotificationPolicyUpdateSerializer(UserNotificationPolicyBaseSerialize
)
class Meta(UserNotificationPolicyBaseSerializer.Meta):
read_only_fields = ("order", "user", "important")
read_only_fields = ["order", "user", "important"]
def update(self, instance, validated_data):
self_or_admin = instance.user.self_or_admin(

View file

@ -1,4 +1,4 @@
from typing import Tuple
import typing
from django.db import models
@ -25,8 +25,8 @@ class ScheduleExportAuthToken(BaseAuthToken):
@classmethod
def create_auth_token(
cls, user: User, organization: Organization, schedule: OnCallSchedule = None
) -> Tuple["ScheduleExportAuthToken", str]:
cls, user: User, organization: Organization, schedule: typing.Optional[OnCallSchedule] = None
) -> typing.Tuple["ScheduleExportAuthToken", str]:
token_string = crypto.generate_schedule_token_string()
digest = crypto.hash_token_string(token_string)

View file

@ -163,7 +163,7 @@ class UserNotificationPolicy(OrderedModel):
return f"{self.pk}: {self.short_verbal}"
@classmethod
def get_short_verbals_for_user(cls, user: User) -> Tuple[Tuple[str], Tuple[str]]:
def get_short_verbals_for_user(cls, user: User) -> Tuple[Tuple[str, ...], Tuple[str, ...]]:
is_wait_step = Q(step=cls.Step.WAIT)
is_wait_step_configured = Q(wait_delay__isnull=False)

View file

@ -1,20 +1,19 @@
import json
import logging
import time
from typing import Dict, List, Optional, Tuple, TypedDict
import typing
from urllib.parse import urljoin
import requests
from django.conf import settings
from rest_framework import status
from rest_framework.response import Response
from apps.api.permissions import ACTION_PREFIX, GrafanaAPIPermission
logger = logging.getLogger(__name__)
class GrafanaUser(TypedDict):
class GrafanaUser(typing.TypedDict):
orgId: int
userId: int
email: str
@ -27,18 +26,22 @@ class GrafanaUser(TypedDict):
class GrafanaUserWithPermissions(GrafanaUser):
permissions: List[GrafanaAPIPermission]
permissions: typing.List[GrafanaAPIPermission]
class GCOMInstanceInfoConfigFeatureToggles(TypedDict):
GrafanaUsersWithPermissions = typing.List[GrafanaUserWithPermissions]
UserPermissionsDict = typing.Dict[str, typing.List[GrafanaAPIPermission]]
class GCOMInstanceInfoConfigFeatureToggles(typing.TypedDict):
accessControlOnCall: str
class GCOMInstanceInfoConfig(TypedDict):
class GCOMInstanceInfoConfig(typing.TypedDict):
feature_toggles: GCOMInstanceInfoConfigFeatureToggles
class GCOMInstanceInfo(TypedDict):
class GCOMInstanceInfo(typing.TypedDict):
id: int
orgId: int
slug: str
@ -47,26 +50,66 @@ class GCOMInstanceInfo(TypedDict):
url: str
status: str
clusterSlug: str
config: Optional[GCOMInstanceInfoConfig]
config: GCOMInstanceInfoConfig | None
class ApiClientResponseCallStatus(typing.TypedDict):
url: str
connected: bool
status_code: int
message: str
# TODO: come back and make the typing.Dict strongly typed once we switch to Python 3.12
# which has better support for generics
_APIClientResponse = typing.Optional[typing.Dict | typing.List]
APIClientResponse = typing.Tuple[_APIClientResponse, ApiClientResponseCallStatus]
# can't define this using class syntax because one of the keys contains a dash
# https://docs.python.org/3/library/typing.html#typing.TypedDict:~:text=The%20functional%20syntax%20should%20also%20be%20used%20when%20any%20of%20the%20keys%20are%20not%20valid%20identifiers%2C%20for%20example%20because%20they%20are%20keywords%20or%20contain%20hyphens.%20Example%3A
APIRequestHeaders = typing.TypedDict(
"APIRequestHeaders",
{
"User-Agent": str,
"Authorization": str,
},
)
class HttpMethod(typing.Protocol):
"""
TODO: can probably replace this with something from the requests library?
https://github.com/psf/requests/blob/main/requests/api.py#L14
"""
@property
def __name__(self) -> str:
...
def __call__(self, *args, **kwargs) -> requests.Response:
...
class APIClient:
def __init__(self, api_url: str, api_token: str):
def __init__(self, api_url: str, api_token: str) -> None:
self.api_url = api_url
self.api_token = api_token
def api_head(self, endpoint: str, body: dict = None, **kwargs) -> Tuple[Optional[Response], dict]:
def api_head(self, endpoint: str, body: typing.Optional[typing.Dict] = None, **kwargs) -> APIClientResponse:
return self.call_api(endpoint, requests.head, body, **kwargs)
def api_get(self, endpoint: str, **kwargs) -> Tuple[Optional[Response], dict]:
def api_get(self, endpoint: str, **kwargs) -> APIClientResponse:
return self.call_api(endpoint, requests.get, **kwargs)
def api_post(self, endpoint: str, body: dict = None, **kwargs) -> Tuple[Optional[Response], dict]:
def api_post(self, endpoint: str, body: typing.Optional[typing.Dict] = None, **kwargs) -> APIClientResponse:
return self.call_api(endpoint, requests.post, body, **kwargs)
def call_api(self, endpoint: str, http_method, body: dict = None, **kwargs) -> Tuple[Optional[Response], dict]:
def call_api(
self, endpoint: str, http_method: HttpMethod, body: typing.Optional[typing.Dict] = None, **kwargs
) -> APIClientResponse:
request_start = time.perf_counter()
call_status = {
call_status: ApiClientResponseCallStatus = {
"url": urljoin(self.api_url, endpoint),
"connected": False,
"status_code": status.HTTP_503_SERVICE_UNAVAILABLE,
@ -108,20 +151,20 @@ class APIClient:
return None, call_status
@property
def request_headers(self) -> dict:
def request_headers(self) -> APIRequestHeaders:
return {"User-Agent": settings.GRAFANA_COM_USER_AGENT, "Authorization": f"Bearer {self.api_token}"}
class GrafanaAPIClient(APIClient):
USER_PERMISSION_ENDPOINT = f"api/access-control/users/permissions/search?actionPrefix={ACTION_PREFIX}"
def __init__(self, api_url: str, api_token: str):
def __init__(self, api_url: str, api_token: str) -> None:
super().__init__(api_url, api_token)
def check_token(self) -> Tuple[Optional[Response], dict]:
def check_token(self) -> APIClientResponse:
return self.api_head("api/org")
def get_users_permissions(self, rbac_is_enabled_for_org: bool) -> Dict[str, List[GrafanaAPIPermission]]:
def get_users_permissions(self, rbac_is_enabled_for_org: bool) -> UserPermissionsDict:
"""
It is possible that this endpoint may not be available for certain Grafana orgs.
Ex: for Grafana Cloud orgs whom have pinned their Grafana version to an earlier version
@ -141,11 +184,15 @@ class GrafanaAPIClient(APIClient):
"""
if not rbac_is_enabled_for_org:
return {}
data, _ = self.api_get(self.USER_PERMISSION_ENDPOINT)
if data is None:
response, _ = self.api_get(self.USER_PERMISSION_ENDPOINT)
if response is None:
return {}
elif isinstance(response, list):
return {}
all_users_permissions = {}
data: typing.Dict[str, typing.Dict[str, typing.List[str]]] = response
all_users_permissions: UserPermissionsDict = {}
for user_id, user_permissions in data.items():
all_users_permissions[user_id] = [GrafanaAPIPermission(action=key) for key, _ in user_permissions.items()]
@ -155,11 +202,15 @@ class GrafanaAPIClient(APIClient):
_, resp_status = self.api_head(self.USER_PERMISSION_ENDPOINT)
return resp_status["connected"]
def get_users(self, rbac_is_enabled_for_org: bool, **kwargs) -> List[GrafanaUserWithPermissions]:
users, _ = self.api_get("api/org/users", **kwargs)
def get_users(self, rbac_is_enabled_for_org: bool, **kwargs) -> GrafanaUsersWithPermissions:
users_response, _ = self.api_get("api/org/users", **kwargs)
if not users:
if not users_response:
return []
elif isinstance(users_response, dict):
return []
users: GrafanaUsersWithPermissions = users_response
user_permissions = self.get_users_permissions(rbac_is_enabled_for_org)
@ -168,32 +219,32 @@ class GrafanaAPIClient(APIClient):
user["permissions"] = user_permissions.get(str(user["userId"]), [])
return users
def get_teams(self, **kwargs):
def get_teams(self, **kwargs) -> APIClientResponse:
return self.api_get("api/teams/search?perpage=1000000", **kwargs)
def get_team_members(self, team_id):
def get_team_members(self, team_id: int) -> APIClientResponse:
return self.api_get(f"api/teams/{team_id}/members")
def get_datasources(self):
def get_datasources(self) -> APIClientResponse:
return self.api_get("api/datasources")
def get_datasource_by_id(self, datasource_id):
def get_datasource_by_id(self, datasource_id) -> APIClientResponse:
# This endpoint is deprecated for Grafana version >= 9. Use get_datasource instead
return self.api_get(f"api/datasources/{datasource_id}")
def get_datasource(self, datasource_uid):
def get_datasource(self, datasource_uid) -> APIClientResponse:
return self.api_get(f"api/datasources/uid/{datasource_uid}")
def get_alertmanager_status_with_config(self, recipient):
def get_alertmanager_status_with_config(self, recipient) -> APIClientResponse:
return self.api_get(f"api/alertmanager/{recipient}/api/v2/status")
def get_alerting_config(self, recipient):
def get_alerting_config(self, recipient: str) -> APIClientResponse:
return self.api_get(f"api/alertmanager/{recipient}/config/api/v1/alerts")
def update_alerting_config(self, recipient, config):
def update_alerting_config(self, recipient, config) -> APIClientResponse:
return self.api_post(f"api/alertmanager/{recipient}/config/api/v1/alerts", config)
def get_grafana_plugin_settings(self, recipient):
def get_grafana_plugin_settings(self, recipient: str) -> APIClientResponse:
return self.api_get(f"api/plugins/{recipient}/settings")
@ -203,10 +254,12 @@ class GcomAPIClient(APIClient):
STACK_STATUS_DELETED = "deleted"
STACK_STATUS_ACTIVE = "active"
def __init__(self, api_token: str):
def __init__(self, api_token: str) -> None:
super().__init__(settings.GRAFANA_COM_API_URL, api_token)
def get_instance_info(self, stack_id: str, include_config_query_param: bool = False) -> Optional[GCOMInstanceInfo]:
def get_instance_info(
self, stack_id: str, include_config_query_param: bool = False
) -> typing.Optional[GCOMInstanceInfo]:
"""
NOTE: in order to use ?config=true, an "Admin" GCOM token must be used to make the API call
"""
@ -222,7 +275,11 @@ class GcomAPIClient(APIClient):
there are two ways that feature toggles can be enabled, this method takes into account both
https://grafana.com/docs/grafana/latest/setup-grafana/configure-grafana/#enable
"""
instance_feature_toggles = instance_info.get("config", {}).get("feature_toggles", {})
instance_info_config = instance_info.get("config", {})
if not instance_info_config:
return False
instance_feature_toggles = instance_info_config.get("feature_toggles", {})
if not instance_feature_toggles:
return False
@ -251,8 +308,8 @@ class GcomAPIClient(APIClient):
instance_infos, _ = self.api_get(url)
return instance_infos["items"] and instance_infos["items"][0].get("status") == self.STACK_STATUS_DELETED
def post_active_users(self, body):
def post_active_users(self, body) -> APIClientResponse:
return self.api_post("app-active-users", body)
def get_stack_regions(self):
def get_stack_regions(self) -> APIClientResponse:
return self.api_get("stack-regions")

View file

@ -5,7 +5,7 @@ from rest_framework.response import Response
from rest_framework.views import APIView
from apps.grafana_plugin.helpers import GrafanaAPIClient
from apps.user_management.models.organization import Organization, ProvisionedPlugin
from apps.user_management.models.organization import Organization
from apps.user_management.sync import sync_organization
from common.api_helpers.mixins import GrafanaHeadersMixin
@ -23,7 +23,7 @@ class SelfHostedInstallView(GrafanaHeadersMixin, APIView):
grafana_url = settings.SELF_HOSTED_SETTINGS["GRAFANA_API_URL"]
grafana_api_token = self.instance_context["grafana_token"]
provisioning_info: ProvisionedPlugin = {"error": None}
provisioning_info = {"error": None}
if settings.LICENSE != settings.OPEN_SOURCE_LICENSE_NAME:
provisioning_info["error"] = f"License type not authorized"

View file

@ -1,7 +1,6 @@
import datetime
import typing
from django.utils import timezone
class AlertGroupsTotalMetricsDict(typing.TypedDict):
integration_name: str
@ -39,7 +38,7 @@ class RecalculateOrgMetricsDict(typing.TypedDict):
ALERT_GROUPS_TOTAL = "oncall_alert_groups_total"
ALERT_GROUPS_RESPONSE_TIME = "oncall_alert_groups_response_time_seconds"
METRICS_RESPONSE_TIME_CALCULATION_PERIOD = timezone.timedelta(days=7)
METRICS_RESPONSE_TIME_CALCULATION_PERIOD = datetime.timedelta(days=7)
METRICS_CACHE_LIFETIME = 93600 # 26 hours. Should be higher than METRICS_RECALCULATE_CACHE_TIMEOUT

View file

@ -1,3 +1,4 @@
import datetime
import random
import typing
@ -20,6 +21,9 @@ from apps.metrics_exporter.constants import (
AlertGroupsTotalMetricsDict,
)
if typing.TYPE_CHECKING:
from apps.alerts.models import AlertReceiveChannel
def get_organization_ids_from_db():
AlertReceiveChannel = apps.get_model("alerts", "AlertReceiveChannel")
@ -42,12 +46,12 @@ def get_organization_ids():
return organizations_ids
def get_response_time_period():
def get_response_time_period() -> datetime.datetime:
"""Returns period for response time calculation"""
return timezone.now() - METRICS_RESPONSE_TIME_CALCULATION_PERIOD
def get_metrics_recalculation_timeout():
def get_metrics_recalculation_timeout() -> int:
"""
Returns timeout when metrics should be recalculated.
Add some dispersion to avoid starting recalculation tasks for all organizations at the same time.
@ -66,7 +70,7 @@ def get_metrics_cache_timeout(organization_id):
return metrics_cache_timeout
def get_metrics_cache_timer_key(organization_id):
def get_metrics_cache_timer_key(organization_id) -> str:
return f"{METRICS_CACHE_TIMER}_{organization_id}"
@ -75,15 +79,15 @@ def get_metrics_cache_timer_for_organization(organization_id):
return cache.get(key)
def get_metric_alert_groups_total_key(organization_id):
def get_metric_alert_groups_total_key(organization_id) -> str:
return f"{ALERT_GROUPS_TOTAL}_{organization_id}"
def get_metric_alert_groups_response_time_key(organization_id):
def get_metric_alert_groups_response_time_key(organization_id) -> str:
return f"{ALERT_GROUPS_RESPONSE_TIME}_{organization_id}"
def metrics_update_integration_cache(integration):
def metrics_update_integration_cache(integration: "AlertReceiveChannel") -> None:
"""Update integration data in metrics cache"""
metrics_cache_timeout = get_metrics_cache_timeout(integration.organization_id)
metric_alert_groups_total_key = get_metric_alert_groups_total_key(integration.organization_id)
@ -105,7 +109,7 @@ def metrics_update_integration_cache(integration):
cache.set(metric_key, metric_cache, timeout=metrics_cache_timeout)
def metrics_remove_deleted_integration_from_cache(integration):
def metrics_remove_deleted_integration_from_cache(integration: "AlertReceiveChannel"):
"""Remove data related to deleted integration from metrics cache"""
metrics_cache_timeout = get_metrics_cache_timeout(integration.organization_id)
metric_alert_groups_total_key = get_metric_alert_groups_total_key(integration.organization_id)
@ -118,7 +122,7 @@ def metrics_remove_deleted_integration_from_cache(integration):
cache.set(metric_key, metric_cache, timeout=metrics_cache_timeout)
def metrics_add_integration_to_cache(integration):
def metrics_add_integration_to_cache(integration: "AlertReceiveChannel"):
"""Add new integration data to metrics cache"""
metrics_cache_timeout = get_metrics_cache_timeout(integration.organization_id)
metric_alert_groups_total_key = get_metric_alert_groups_total_key(integration.organization_id)

View file

@ -1,4 +1,5 @@
import logging
import typing
from urllib.parse import urljoin
import requests
@ -51,7 +52,7 @@ class CloudConnector(models.Model):
return sync_status, error_msg
def sync_users_with_cloud(self) -> tuple[bool, str]:
def sync_users_with_cloud(self) -> typing.Tuple[bool, typing.Optional[str]]:
sync_status = False
error_msg = None

View file

@ -276,7 +276,7 @@ class EscalationPolicyUpdateSerializer(EscalationPolicySerializer):
type = EscalationPolicyTypeField(required=False, source="step", allow_null=True)
class Meta(EscalationPolicySerializer.Meta):
read_only_fields = ("route_id",)
read_only_fields = ["route_id"]
def update(self, instance, validated_data):
if "step" in validated_data:

View file

@ -175,7 +175,7 @@ class ChannelFilterSerializer(BaseChannelFilterSerializer):
"telegram",
"manual_order",
]
read_only_fields = ("is_the_last_route",)
read_only_fields = ["is_the_last_route"]
def create(self, validated_data):
validated_data = self._correct_validated_data(validated_data)

View file

@ -13,6 +13,7 @@ from django.apps import apps
from django.db.models import Q
from django.utils import timezone
from icalendar import Calendar
from icalendar import Event as IcalEvent
from apps.api.permissions import RBACPermission
from apps.schedules.constants import (
@ -37,7 +38,8 @@ This is a hack to allow us to load models for type checking without circular dep
This module likely needs to refactored to be part of the OnCallSchedule module.
"""
if TYPE_CHECKING:
from apps.schedules.models import OnCallSchedule
from apps.schedules.models import CustomOnCallShift, OnCallSchedule
from apps.schedules.models.on_call_schedule import OnCallScheduleQuerySet
from apps.user_management.models import Organization, User
from apps.user_management.models.user import UserQuerySet
@ -45,14 +47,26 @@ logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
EmptyShift = namedtuple(
"EmptyShift",
["start", "end", "summary", "description", "attendee", "all_day", "calendar_type", "calendar_tz", "shift_pk"],
)
EmptyShifts = typing.List[EmptyShift]
DatetimeInterval = namedtuple("DatetimeInterval", ["start", "end"])
DatetimeIntervals = typing.List[DatetimeInterval]
IcalEvents = typing.List[IcalEvent]
def users_in_ical(
usernames_from_ical: typing.List[str],
organization: Organization,
organization: "Organization",
include_viewers=False,
users_to_filter: typing.Optional[UserQuerySet] = None,
) -> UserQuerySet:
users_to_filter: typing.Optional["UserQuerySet"] = None,
) -> typing.Sequence["User"]:
"""
This method returns a `UserQuerySet`, filtered by users whose username, or case-insensitive e-mail,
This method returns a sequence of `User` objects, filtered by users whose username, or case-insensitive e-mail,
is present in `usernames_from_ical`. If `include_viewers` is set to `True`, users are further filtered down
based on their granted permissions.
@ -95,21 +109,23 @@ def users_in_ical(
@timed_lru_cache(timeout=100)
def memoized_users_in_ical(usernames_from_ical: typing.List[str], organization: Organization) -> UserQuerySet:
def memoized_users_in_ical(
usernames_from_ical: typing.List[str], organization: "Organization"
) -> typing.Sequence["User"]:
# using in-memory cache instead of redis to avoid pickling python objects
return users_in_ical(usernames_from_ical, organization)
# used for display schedule events on web
def list_of_oncall_shifts_from_ical(
schedule,
date,
user_timezone="UTC",
with_empty_shifts=False,
with_gaps=False,
days=1,
filter_by=None,
from_cached_final=False,
schedule: "OnCallSchedule",
date: datetime.date,
user_timezone: str = "UTC",
with_empty_shifts: bool = False,
with_gaps: bool = False,
days: int = 1,
filter_by: str | None = None,
from_cached_final: bool = False,
):
"""
Parse the ical file and return list of events with users
@ -130,14 +146,19 @@ def list_of_oncall_shifts_from_ical(
# get list of iCalendars from current iCal files. If there is more than one calendar, primary calendar will always
# be the first
calendars: typing.Tuple[typing.Optional[Calendar], ...]
if from_cached_final:
calendars = [Calendar.from_ical(schedule.cached_ical_final_schedule)]
calendars = (Calendar.from_ical(schedule.cached_ical_final_schedule),)
else:
calendars = schedule.get_icalendars()
# TODO: Review offset usage
pytz_tz = pytz.timezone(user_timezone)
user_timezone_offset = datetime.datetime.now().astimezone(pytz_tz).utcoffset()
# utcoffset can technically return None, but we're confident it is a timedelta here
user_timezone_offset: datetime.timedelta = datetime.datetime.now().astimezone(pytz_tz).utcoffset() # type: ignore[assignment]
datetime_min = datetime.datetime.combine(date, datetime.time.min) + datetime.timedelta(milliseconds=1)
datetime_start = (datetime_min - user_timezone_offset).astimezone(pytz.UTC)
datetime_end = datetime_start + datetime.timedelta(days=days - 1, hours=23, minutes=59, seconds=59)
@ -147,6 +168,8 @@ def list_of_oncall_shifts_from_ical(
for idx, calendar in enumerate(calendars):
if calendar is not None:
calendar_type: str | int
if from_cached_final:
calendar_type = CALENDAR_TYPE_FINAL
elif idx == 0:
@ -193,7 +216,14 @@ def list_of_oncall_shifts_from_ical(
return result or None
def get_shifts_dict(calendar, calendar_type, schedule, datetime_start, datetime_end, with_empty_shifts=False):
def get_shifts_dict(
calendar: Calendar,
calendar_type: str | int,
schedule: "OnCallSchedule",
datetime_start: datetime.datetime,
datetime_end: datetime.datetime,
with_empty_shifts: bool = False,
):
events = ical_events.get_events_from_ical_between(calendar, datetime_start, datetime_end)
result_datetime = []
result_date = []
@ -244,22 +274,15 @@ def get_shifts_dict(calendar, calendar_type, schedule, datetime_start, datetime_
return result_datetime, result_date
EmptyShift = namedtuple(
"EmptyShift",
["start", "end", "summary", "description", "attendee", "all_day", "calendar_type", "calendar_tz", "shift_pk"],
)
def list_of_empty_shifts_in_schedule(schedule, start_date, end_date):
"""
Parse the ical file and return list of EmptyShift.
"""
def list_of_empty_shifts_in_schedule(
schedule: "OnCallSchedule", start_date: datetime.date, end_date: datetime.date
) -> EmptyShifts:
# Calculate lookup window in schedule's tz
# If we can't get tz from ical use UTC
OnCallSchedule = apps.get_model("schedules", "OnCallSchedule")
calendars = schedule.get_icalendars()
empty_shifts = []
empty_shifts: EmptyShifts = []
for idx, calendar in enumerate(calendars):
if calendar is not None:
if idx == 0:
@ -269,7 +292,9 @@ def list_of_empty_shifts_in_schedule(schedule, start_date, end_date):
calendar_tz = get_icalendar_tz_or_utc(calendar)
schedule_timezone_offset = datetime.datetime.now().astimezone(calendar_tz).utcoffset()
# utcoffset can technically return None, but we're confident it is a timedelta here
schedule_timezone_offset: datetime.timedelta = datetime.datetime.now().astimezone(calendar_tz).utcoffset() # type: ignore[assignment]
start_datetime = datetime.datetime.combine(start_date, datetime.time.min) + datetime.timedelta(
milliseconds=1
)
@ -322,8 +347,11 @@ def list_of_empty_shifts_in_schedule(schedule, start_date, end_date):
def list_users_to_notify_from_ical(
schedule, events_datetime=None, include_viewers=False, users_to_filter=None
) -> UserQuerySet:
schedule: "OnCallSchedule",
events_datetime: typing.Optional[datetime.datetime] = None,
include_viewers: bool = False,
users_to_filter: typing.Optional["UserQuerySet"] = None,
) -> typing.Sequence["User"]:
"""
Retrieve on-call users for the current time
"""
@ -338,24 +366,25 @@ def list_users_to_notify_from_ical(
def list_users_to_notify_from_ical_for_period(
schedule,
start_datetime,
end_datetime,
schedule: "OnCallSchedule",
start_datetime: datetime.datetime,
end_datetime: datetime.datetime,
include_viewers=False,
users_to_filter=None,
) -> UserQuerySet:
) -> typing.Sequence["User"]:
# get list of iCalendars from current iCal files. If there is more than one calendar, primary calendar will always
# be the first
calendars = schedule.get_icalendars()
# reverse calendars to make overrides calendar the first, if schedule is iCal
calendars = calendars[::-1]
users_found_in_ical = []
users_found_in_ical: typing.Sequence["User"] = []
# at first check overrides calendar and return users from it if it exists and on-call users are found
for calendar in calendars:
if calendar is None:
continue
events = ical_events.get_events_from_ical_between(calendar, start_datetime, end_datetime)
parsed_ical_events = {} # event info where key is event priority and value list of found usernames {0:["alex"]}
parsed_ical_events: typing.Dict[int, typing.List[str]] = {}
for event in events:
current_usernames, current_priority = get_usernames_from_ical_event(event)
parsed_ical_events.setdefault(current_priority, []).extend(current_usernames)
@ -373,8 +402,8 @@ def list_users_to_notify_from_ical_for_period(
def get_oncall_users_for_multiple_schedules(
schedules, events_datetime=None
) -> typing.Dict[OnCallSchedule, typing.List[User]]:
schedules: "OnCallScheduleQuerySet", events_datetime=None
) -> typing.Dict["OnCallSchedule", typing.List[User]]:
from apps.user_management.models import User
if events_datetime is None:
@ -418,7 +447,7 @@ def get_oncall_users_for_multiple_schedules(
return oncall_users
def parse_username_from_string(string):
def parse_username_from_string(string: str) -> str:
"""
Parse on-call shift user from the given string
Example input:
@ -429,7 +458,7 @@ def parse_username_from_string(string):
return re.sub(RE_PRIORITY, "", string.strip(), 1).strip()
def parse_priority_from_string(string):
def parse_priority_from_string(string: str) -> int:
"""
Parse on-call shift priority from the given string
Example input:
@ -437,17 +466,16 @@ def parse_priority_from_string(string):
Example output:
1
"""
priority = re.findall(RE_PRIORITY, string.strip())
if len(priority) > 0:
priority = int(priority[0])
priority = 0
priority_matches = re.findall(RE_PRIORITY, string.strip())
if len(priority_matches) > 0:
priority = int(priority_matches[0])
if priority < 1:
priority = 0
else:
priority = 0
return priority
def parse_event_uid(string):
def parse_event_uid(string: str):
pk = None
source = None
source_verbal = None
@ -467,8 +495,8 @@ def parse_event_uid(string):
if source is not None:
source = int(source)
CustomOnCallShift = apps.get_model("schedules", "CustomOnCallShift")
source_verbal = CustomOnCallShift.SOURCE_CHOICES[source][1]
OnCallShift: "CustomOnCallShift" = apps.get_model("schedules", "CustomOnCallShift")
source_verbal = OnCallShift.SOURCE_CHOICES[source][1]
return pk, source_verbal
@ -489,7 +517,7 @@ def get_usernames_from_ical_event(event):
return usernames_found, priority
def get_missing_users_from_ical_event(event, organization):
def get_missing_users_from_ical_event(event, organization: "Organization"):
all_usernames, _ = get_usernames_from_ical_event(event)
users = list(get_users_from_ical_event(event, organization))
found_usernames = [u.username for u in users]
@ -497,7 +525,7 @@ def get_missing_users_from_ical_event(event, organization):
return [u for u in all_usernames if u != "" and u not in found_usernames and u.lower() not in found_emails]
def get_users_from_ical_event(event, organization):
def get_users_from_ical_event(event, organization: "Organization") -> typing.Sequence["User"]:
usernames_from_ical, _ = get_usernames_from_ical_event(event)
users = []
if len(usernames_from_ical) != 0:
@ -587,9 +615,9 @@ def get_icalendar_tz_or_utc(icalendar):
return pytz.timezone(converted_timezone)
def fetch_ical_file_or_get_error(ical_url):
cached_ical_file = None
ical_file_error = None
def fetch_ical_file_or_get_error(ical_url: str) -> typing.Tuple[str | None, str | None]:
cached_ical_file: str | None = None
ical_file_error: str | None = None
try:
new_ical_file = fetch_ical_file(ical_url)
Calendar.from_ical(new_ical_file)
@ -602,13 +630,12 @@ def fetch_ical_file_or_get_error(ical_url):
return cached_ical_file, ical_file_error
def fetch_ical_file(ical_url):
def fetch_ical_file(ical_url: str) -> str:
# without user-agent header google calendar sometimes returns text/html instead of text/calendar
headers = {"User-Agent": "Grafana OnCall"}
r = requests.get(ical_url, headers=headers, timeout=10)
logger.info(f"fetch_ical_file: content-type={r.headers.get('Content-Type')}")
ical_file = r.text
return ical_file
return r.text
def create_base_icalendar(name: str) -> Calendar:
@ -624,77 +651,56 @@ def create_base_icalendar(name: str) -> Calendar:
return cal
def get_events_from_calendars(ical_obj: Calendar, calendars: tuple) -> None:
for calendar in calendars:
if calendar:
for component in calendar.walk():
if component.name == "VEVENT":
def get_user_events_from_calendars(
ical_obj: Calendar, calendar: Calendar, user: User, name: typing.Optional[str] = None
) -> None:
if calendar:
for component in calendar.walk():
if component.name == "VEVENT":
event_user = get_usernames_from_ical_event(component)
event_user_value = event_user[0][0]
if event_user_value == user.username or event_user_value.lower() == user.email.lower():
if name:
component["SUMMARY"] = "{}: {}".format(name, component["SUMMARY"])
ical_obj.add_component(component)
def get_user_events_from_calendars(ical_obj: Calendar, calendars: tuple, user: User, name: str = None) -> None:
for calendar in calendars:
if calendar:
for component in calendar.walk():
if component.name == "VEVENT":
event_user = get_usernames_from_ical_event(component)
event_user_value = event_user[0][0]
if event_user_value == user.username or event_user_value.lower() == user.email.lower():
if name:
component["SUMMARY"] = "{}: {}".format(name, component["SUMMARY"])
ical_obj.add_component(component)
def _is_final_export_enabled(schedule: OnCallSchedule) -> bool:
DynamicSetting = apps.get_model("base", "DynamicSetting")
enabled_final_export = DynamicSetting.objects.get_or_create(
name="enabled_final_schedule_export",
defaults={
"json_value": {
"schedule_ids": [],
}
},
)[0]
return schedule.public_primary_key in enabled_final_export.json_value["schedule_ids"]
def _get_ical_data_final_schedule(schedule: OnCallSchedule) -> str:
def _get_ical_data_final_schedule(schedule: "OnCallSchedule") -> str | None:
ical_data = schedule.cached_ical_final_schedule
if ical_data is None:
schedule.refresh_ical_final_schedule()
ical_data = schedule.cached_ical_final_schedule
# typing is safe here. cached_ical_final_schedule is updated inside of refresh_ical_final_schedule
ical_data: str = schedule.cached_ical_final_schedule
return ical_data
def ical_export_from_schedule(schedule: OnCallSchedule) -> bytes:
def ical_export_from_schedule(schedule: "OnCallSchedule") -> bytes:
ical_data = _get_ical_data_final_schedule(schedule)
return ical_data.encode()
def user_ical_export(user: User, schedules: list[OnCallSchedule]) -> bytes:
def user_ical_export(user: "User", schedules: "OnCallScheduleQuerySet") -> bytes:
schedule_name = "On-Call Schedule for {0}".format(user.username)
ical_obj = create_base_icalendar(schedule_name)
for schedule in schedules:
name = schedule.name
ical_data = _get_ical_data_final_schedule(schedule)
calendars = [Calendar.from_ical(ical_data)]
get_user_events_from_calendars(ical_obj, calendars, user, name=name)
get_user_events_from_calendars(ical_obj, Calendar.from_ical(ical_data), user, name=name)
return ical_obj.to_ical()
DatetimeInterval = namedtuple("DatetimeInterval", ["start", "end"])
def list_of_gaps_in_schedule(schedule, start_date, end_date):
def list_of_gaps_in_schedule(
schedule: "OnCallSchedule", start_date: datetime.date, end_date: datetime.date
) -> DatetimeIntervals:
calendars = schedule.get_icalendars()
intervals = []
intervals: DatetimeIntervals = []
start_datetime = datetime.datetime.combine(start_date, datetime.time.min) + datetime.timedelta(milliseconds=1)
start_datetime = start_datetime.astimezone(pytz.UTC)
end_datetime = datetime.datetime.combine(end_date, datetime.time.max).astimezone(pytz.UTC)
for idx, calendar in enumerate(calendars):
for calendar in calendars:
if calendar is not None:
calendar_tz = get_icalendar_tz_or_utc(calendar)
events = ical_events.get_events_from_ical_between(
@ -708,8 +714,8 @@ def list_of_gaps_in_schedule(schedule, start_date, end_date):
return detect_gaps(intervals, start_datetime, end_datetime)
def detect_gaps(intervals, start, end):
gaps = []
def detect_gaps(intervals: DatetimeIntervals, start: datetime.datetime, end: datetime.datetime) -> DatetimeIntervals:
gaps: DatetimeIntervals = []
intervals = sorted(intervals, key=lambda dt: dt.start)
if len(intervals) > 0:
base_interval = intervals[0]
@ -725,7 +731,7 @@ def detect_gaps(intervals, start, end):
return gaps
def merge_if_overlaps(a: DatetimeInterval, b: DatetimeInterval):
def merge_if_overlaps(a: DatetimeInterval, b: DatetimeInterval) -> typing.Tuple[bool, DatetimeInterval]:
if a.end >= b.end:
return True, DatetimeInterval(a.start, a.end)
if b.start - a.end < datetime.timedelta(minutes=1):
@ -734,13 +740,13 @@ def merge_if_overlaps(a: DatetimeInterval, b: DatetimeInterval):
return False, DatetimeInterval(b.start, b.end)
def start_end_with_respect_to_all_day(event, calendar_tz):
def start_end_with_respect_to_all_day(event: IcalEvent, calendar_tz):
start, _ = ical_date_to_datetime(event[ICAL_DATETIME_START].dt, calendar_tz, start=True)
end, _ = ical_date_to_datetime(event[ICAL_DATETIME_END].dt, calendar_tz, start=False)
return start, end
def event_start_end_all_day_with_respect_to_type(event, calendar_tz):
def event_start_end_all_day_with_respect_to_type(event: IcalEvent, calendar_tz):
all_day = False
if type(event[ICAL_DATETIME_START].dt) == datetime.date:
start, end = start_end_with_respect_to_all_day(event, calendar_tz)
@ -750,7 +756,7 @@ def event_start_end_all_day_with_respect_to_type(event, calendar_tz):
return start, end, all_day
def convert_windows_timezone_to_iana(tz_name):
def convert_windows_timezone_to_iana(tz_name: str) -> str | None:
"""
Conversion info taken from https://raw.githubusercontent.com/unicode-org/cldr/main/common/supplemental/windowsZones.xml
Also see https://gist.github.com/mrled/8d29fde758cfc7dd0b52f3bbf2b8f06e

View file

@ -67,10 +67,14 @@ class QualityReportOverloadedUser(typing.TypedDict):
score: int
QualityReportOverloadedUsers = typing.List[QualityReportOverloadedUser]
QualityReportComments = typing.List[QualityReportComment]
class QualityReport(typing.TypedDict):
total_score: int
comments: typing.List[QualityReportComment]
overloaded_users: typing.List[QualityReportOverloadedUser]
comments: QualityReportComments
overloaded_users: QualityReportOverloadedUsers
class ScheduleEventUser(typing.TypedDict):
@ -89,9 +93,9 @@ class ScheduleEvent(typing.TypedDict):
end: datetime.datetime
users: typing.List[ScheduleEventUser]
missing_users: typing.List[str]
priority_level: typing.Union[int, None]
source: typing.Union[str, None]
calendar_type: typing.Union[int, None]
priority_level: typing.Optional[int]
source: typing.Optional[str]
calendar_type: typing.Optional[int]
is_empty: bool
is_gap: bool
is_override: bool
@ -109,6 +113,7 @@ class ScheduleFinalShift(typing.TypedDict):
ScheduleEvents = typing.List[ScheduleEvent]
ScheduleEventIntervals = typing.List[typing.List[datetime.datetime]]
ScheduleFinalShifts = typing.List[ScheduleFinalShift]
DurationMap = typing.Dict[str, datetime.timedelta]
def generate_public_primary_key_for_oncall_schedule_channel():
@ -217,14 +222,14 @@ class OnCallSchedule(PolymorphicModel):
has_empty_shifts = models.BooleanField(default=False)
empty_shifts_report_sent_at = models.DateField(null=True, default=None)
def get_icalendars(self):
def get_icalendars(self) -> typing.Tuple[typing.Optional[icalendar.Calendar], typing.Optional[icalendar.Calendar]]:
"""Returns list of calendars. Primary calendar should always be the first"""
calendar_primary = None
calendar_overrides = None
calendar_primary: typing.Optional[icalendar.Calendar] = None
calendar_overrides: typing.Optional[icalendar.Calendar] = None
# if self._ical_file_(primary|overrides) is None -> no cache, will trigger a refresh
# if self._ical_file_(primary|overrides) == "" -> cached value for an empty schedule
if self._ical_file_primary:
calendar_primary = icalendar.Calendar.from_ical(self._ical_file_primary)
calendar_primary: icalendar.Calendar = icalendar.Calendar.from_ical(self._ical_file_primary)
if self._ical_file_overrides:
calendar_overrides = icalendar.Calendar.from_ical(self._ical_file_overrides)
return calendar_primary, calendar_overrides
@ -260,9 +265,11 @@ class OnCallSchedule(PolymorphicModel):
self._refresh_primary_ical_file()
self._refresh_overrides_ical_file()
@property
def _ical_file_primary(self):
raise NotImplementedError
@property
def _ical_file_overrides(self):
raise NotImplementedError
@ -468,7 +475,7 @@ class OnCallSchedule(PolymorphicModel):
events = self.final_events(user_tz="UTC", starting_date=date, days=days)
# an event is “good” if it's not a gap and not empty
good_events = [event for event in events if not event["is_gap"] and not event["is_empty"]]
good_events: ScheduleEvents = [event for event in events if not event["is_gap"] and not event["is_empty"]]
if not good_events:
return {
"total_score": 0,
@ -476,7 +483,7 @@ class OnCallSchedule(PolymorphicModel):
"overloaded_users": [],
}
def event_duration(ev: dict) -> datetime.timedelta:
def event_duration(ev: ScheduleEvent) -> datetime.timedelta:
return ev["end"] - ev["start"]
def timedelta_sum(deltas: typing.Iterable[datetime.timedelta]) -> datetime.timedelta:
@ -485,9 +492,9 @@ class OnCallSchedule(PolymorphicModel):
def score_to_percent(value: float) -> int:
return round(value * 100)
def get_duration_map(evs: list[dict]) -> dict[str, datetime.timedelta]:
def get_duration_map(evs: ScheduleEvents) -> DurationMap:
"""Return a map of user PKs to total duration of events they are in."""
result = defaultdict(datetime.timedelta)
result: DurationMap = defaultdict(datetime.timedelta)
for ev in evs:
for user in ev["users"]:
user_pk = user["pk"]
@ -495,7 +502,7 @@ class OnCallSchedule(PolymorphicModel):
return result
def get_balance_score_by_duration_map(dur_map: dict[str, datetime.timedelta]) -> float:
def get_balance_score_by_duration_map(dur_map: DurationMap) -> float:
"""
Return a score between 0 and 1, based on how balanced the durations are in the duration map.
The formula is taken from https://github.com/grafana/oncall/issues/118#issuecomment-1161787854.
@ -503,7 +510,7 @@ class OnCallSchedule(PolymorphicModel):
if len(dur_map) <= 1:
return 1
result = 0
result = 0.0
for key_1, key_2 in itertools.combinations(dur_map, 2):
duration_1 = dur_map[key_1]
duration_2 = dur_map[key_2]
@ -524,9 +531,10 @@ class OnCallSchedule(PolymorphicModel):
balance_score = score_to_percent(balance_score)
# calculate overloaded users
overloaded_users: QualityReportOverloadedUsers = []
if balance_score >= 95: # tolerate minor imbalance
balance_score = 100
overloaded_users = []
else:
average_duration = timedelta_sum(duration_map.values()) / len(duration_map)
overloaded_user_pks = [
@ -540,7 +548,6 @@ class OnCallSchedule(PolymorphicModel):
"public_primary_key", "username"
)
}
overloaded_users = []
for user_pk in overloaded_user_pks:
score = score_to_percent(duration_map[user_pk] / average_duration) - 100
username = usernames.get(user_pk) or "unknown" # fallback to "unknown" if user is not found
@ -550,7 +557,7 @@ class OnCallSchedule(PolymorphicModel):
overloaded_users.sort(key=lambda u: (-u["score"], u["username"]))
# generate comments regarding gaps
comments = []
comments: QualityReportComments = []
if good_event_score == 100:
comments.append({"type": QualityReportCommentType.INFO, "text": "Schedule has no gaps"})
else:
@ -628,8 +635,8 @@ class OnCallSchedule(PolymorphicModel):
resolved: ScheduleEvents = []
pending: ScheduleEvents = events
current_interval_idx = 0 # current scheduled interval being checked
current_type = OnCallSchedule.TYPE_ICAL_OVERRIDES # current calendar type
current_priority = None # current priority level being resolved
current_type: typing.Optional[int] = OnCallSchedule.TYPE_ICAL_OVERRIDES # current calendar type
current_priority: typing.Optional[int] = None # current priority level being resolved
while pending:
ev = pending.pop(0)

View file

@ -1,4 +1,4 @@
from django.utils import timezone
import datetime
SLACK_BOT_ID = "USLACKBOT"
SLACK_INVALID_AUTH_RESPONSE = "no_enough_permissions_to_retrieve"
@ -6,7 +6,7 @@ PLACEHOLDER = "Placeholder"
SLACK_WRONG_TEAM_NAMES = [SLACK_INVALID_AUTH_RESPONSE, PLACEHOLDER]
SLACK_RATE_LIMIT_TIMEOUT = timezone.timedelta(minutes=5)
SLACK_RATE_LIMIT_TIMEOUT = datetime.timedelta(minutes=5)
SLACK_RATE_LIMIT_DELAY = 10
CACHE_UPDATE_INCIDENT_SLACK_MESSAGE_LIFETIME = 60 * 10

View file

@ -4,6 +4,7 @@ import logging
from apps.alerts.models import AlertGroup
from apps.api.permissions import user_is_authorized
from apps.slack.models import SlackMessage, SlackTeamIdentity
from apps.user_management.models import User
logger = logging.getLogger(__name__)
@ -13,6 +14,8 @@ class AlertGroupActionsMixin:
Mixin for alert group actions (ack, resolve, etc.). Intended to be used as a mixin along with ScenarioStep.
"""
user: User | None
REQUIRED_PERMISSIONS = []
def get_alert_group(self, slack_team_identity: SlackTeamIdentity, payload: dict) -> AlertGroup:

View file

@ -2,10 +2,10 @@ import re
import emoji
from django.apps import apps
from slackviewer.formatter import SlackFormatter
from slackviewer.formatter import SlackFormatter as SlackFormatterBase
class SlackFormatter(SlackFormatter):
class SlackFormatter(SlackFormatterBase):
_LINK_PAT = re.compile(r"<(https|http|mailto):[A-Za-z0-9_\.\-\/\?\,\=\#\:\@\& ]+\|[^>]+>")
def __init__(self, organization):

View file

@ -100,8 +100,8 @@ class TelegramClient:
message_id: Union[int, str],
text: str,
keyboard: Optional[InlineKeyboardMarkup] = None,
) -> Message:
message = self.api_client.edit_message_text(
) -> Union[Message, bool]:
return self.api_client.edit_message_text(
chat_id=chat_id,
message_id=message_id,
text=text,
@ -109,7 +109,6 @@ class TelegramClient:
parse_mode=self.PARSE_MODE,
disable_web_page_preview=False,
)
return message
@staticmethod
def _get_message_and_keyboard(

View file

@ -18,6 +18,12 @@ from common.insight_log import ChatOpsEvent, ChatOpsTypePlug, write_chatops_insi
from common.oncall_gateway import create_oncall_connector, delete_oncall_connector, delete_slack_connector
from common.public_primary_keys import generate_public_primary_key, increase_public_primary_key_length
if typing.TYPE_CHECKING:
from django.db.models.manager import RelatedManager
from apps.schedules.models import OnCallSchedule
from apps.user_management.models import User
logger = logging.getLogger(__name__)
@ -36,7 +42,6 @@ def generate_public_primary_key_for_organization():
class ProvisionedPlugin(typing.TypedDict):
error: typing.Union[str, None]
stackId: int
orgId: int
onCallToken: str
@ -64,6 +69,8 @@ class OrganizationManager(models.Manager):
class Organization(MaintainableObject):
users: "RelatedManager['User']"
oncall_schedules: "RelatedManager['OnCallSchedule']"
objects = OrganizationManager()
objects_with_deleted = models.Manager()

View file

@ -313,7 +313,7 @@ class User(models.Model):
# TODO: check whether this signal can be moved to save method of the model
@receiver(post_save, sender=User)
def listen_for_user_model_save(sender, instance, created, *args, **kwargs):
def listen_for_user_model_save(sender: User, instance: User, created: bool, *args, **kwargs) -> None:
if created:
instance.notification_policies.create_default_policies_for_user(instance)
instance.notification_policies.create_important_policies_for_user(instance)

View file

@ -1,3 +1,5 @@
import typing
from django.contrib import admin
from django.core.exceptions import FieldDoesNotExist
from django.db.models import ForeignKey, Model
@ -7,7 +9,7 @@ class RawForeignKeysMixin:
model: Model
@property
def raw_id_fields(self) -> tuple[str]:
def raw_id_fields(self) -> typing.Tuple[str, ...]:
fields = self.model._meta.fields
fk_field_names = tuple(str(field.name) for field in fields if isinstance(field, ForeignKey))
@ -18,13 +20,13 @@ class SearchableByIdsMixin:
model: Model
@property
def search_fields(self) -> tuple[str]:
def search_fields(self) -> typing.Tuple[str, ...]:
search_fields = (
"id",
"public_primary_key",
)
existing_fields = []
existing_fields: typing.List[str] = []
for field in search_fields:
try:
@ -39,10 +41,10 @@ class SearchableByIdsMixin:
class SelectRelatedMixin:
model: Model
list_display: tuple[str]
list_display: typing.Tuple[str, ...]
@property
def list_select_related(self) -> tuple[str]:
def list_select_related(self) -> typing.Tuple[str, ...]:
fk_field_names = []
for field_name in self.list_display:

View file

@ -1,5 +1,6 @@
import json
import math
import typing
from django.core.exceptions import ObjectDoesNotExist
from django.db.models import Q
@ -7,6 +8,7 @@ from django.utils.functional import cached_property
from rest_framework import status
from rest_framework.decorators import action
from rest_framework.exceptions import NotFound, Throttled
from rest_framework.request import Request
from rest_framework.response import Response
from apps.alerts.incident_appearance.templaters import (
@ -377,11 +379,25 @@ class PreviewTemplateMixin:
return destination, attr_name
class GrafanaContext(typing.TypedDict):
IsAnonymous: bool
class InstanceContext(typing.TypedDict):
stack_id: int
org_id: int
grafana_token: str
class GrafanaHeadersMixin:
@cached_property
def grafana_context(self) -> dict:
return json.loads(self.request.headers.get("X-Grafana-Context"))
request: Request
@cached_property
def instance_context(self) -> dict:
return json.loads(self.request.headers["X-Instance-Context"])
def grafana_context(self) -> GrafanaContext:
grafana_context: GrafanaContext = json.loads(self.request.headers["X-Grafana-Context"])
return grafana_context
@cached_property
def instance_context(self) -> InstanceContext:
instance_context: InstanceContext = json.loads(self.request.headers["X-Instance-Context"])
return instance_context

View file

@ -19,7 +19,12 @@ class EntityEvent(enum.Enum):
class InsightLoggable(ABC):
@property
@abstractmethod
def public_primary_key(self):
def id(self) -> int:
pass
@property
@abstractmethod
def public_primary_key(self) -> str:
pass
@property
@ -65,7 +70,7 @@ def write_resource_insight_log(instance: InsightLoggable, author, event: EntityE
author = json.dumps(author.username)
entity_type = instance.insight_logs_type_verbal
try:
entity_id = instance.public_primary_key
entity_id: str | int = instance.public_primary_key
except AttributeError:
# Fallback for entities which have no public_primary_key, E.g. public api token, schedule export token
entity_id = instance.id

View file

@ -1,52 +0,0 @@
from django.core.management import BaseCommand
from django.db.models.signals import post_save
from django.urls import reverse
from apps.alerts.models import Alert, AlertGroup, AlertReceiveChannel, listen_for_alertreceivechannel_model_save
from apps.alerts.tests.factories import AlertReceiveChannelFactory
from apps.user_management.tests.factories import OrganizationFactory
class Command(BaseCommand):
def add_arguments(self, parser):
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
"--bootstrap_integration",
action="store_true",
help="Create random formatted webhook integration",
)
group.add_argument(
"--return_results_for_test_id",
type=str,
help="Count alert groups with specific text in the title and their alerts",
)
def handle(self, *args, **options):
if options["bootstrap_integration"]:
organization = OrganizationFactory()
def _make_alert_receive_channel(organization, **kwargs):
if "integration" not in kwargs:
kwargs["integration"] = "formatted_webhook"
post_save.disconnect(listen_for_alertreceivechannel_model_save, sender=AlertReceiveChannel)
alert_receive_channel = AlertReceiveChannelFactory(organization=organization, **kwargs)
post_save.connect(listen_for_alertreceivechannel_model_save, sender=AlertReceiveChannel)
return alert_receive_channel
integration = _make_alert_receive_channel(
organization, integration=AlertReceiveChannel.INTEGRATION_FORMATTED_WEBHOOK
)
url = reverse(
"integrations:universal",
kwargs={
"integration_type": AlertReceiveChannel.INTEGRATION_FORMATTED_WEBHOOK,
"alert_channel_key": integration.token,
},
)
return url
elif test_id := options["return_results_for_test_id"]:
alert_groups_pks = list(AlertGroup.all_objects.filter(web_title_cache=test_id).values_list("id", flat=True))
alert_groups_count = len(alert_groups_pks)
alerts_count = Alert.objects.filter(group_id__in=alert_groups_pks).count()
return f"{alert_groups_count}, {alerts_count}"

View file

@ -12,6 +12,7 @@ target-version = ["py39"]
force-exclude = "migrations"
[tool.mypy]
mypy_path = "$MYPY_CONFIG_FILE_DIR/type_stubs"
implicit_reexport = true
plugins = [
"mypy_django_plugin.main",
@ -39,7 +40,7 @@ module = [
"fcm_django.*",
"firebase_admin.*",
"humanize.*",
"icalendar.*",
"ipware.*",
"markdown2.*",
"mirage.*",
"ordered_model.*",
@ -50,6 +51,7 @@ module = [
"recurring_ical_events.*",
"rest_polymorphic.*",
"slackclient.*",
"slackviewer.*",
"social_core.*",
"social_django.*",
"twilio.*",

View file

@ -1,4 +1,4 @@
celery-types==0.17.0
celery-types==0.18.0
django-filter-stubs==0.1.3
django-stubs[compatible-mypy]==4.2.1
djangorestframework-stubs[compatible-mypy]==3.14.1
@ -10,3 +10,4 @@ pytest_factoryboy==2.5.1
types-beautifulsoup4==4.12.0.5
types-PyMySQL==1.0.19.7
types-python-dateutil==2.8.19.13
types-requests==2.31.0.1

View file

@ -1,6 +1,7 @@
import base64
import json
import os
import typing
from random import randrange
from celery.schedules import crontab
@ -88,8 +89,8 @@ DANGEROUS_WEBHOOKS_ENABLED = getenv_boolean("DANGEROUS_WEBHOOKS_ENABLED", defaul
WEBHOOK_RESPONSE_LIMIT = 50000
# Multiregion settings
ONCALL_GATEWAY_URL = os.environ.get("ONCALL_GATEWAY_URL")
ONCALL_GATEWAY_API_TOKEN = os.environ.get("ONCALL_GATEWAY_API_TOKEN")
ONCALL_GATEWAY_URL = os.environ.get("ONCALL_GATEWAY_URL", "")
ONCALL_GATEWAY_API_TOKEN = os.environ.get("ONCALL_GATEWAY_API_TOKEN", "")
ONCALL_BACKEND_REGION = os.environ.get("ONCALL_BACKEND_REGION")
# Prometheus exporter metrics endpoint auth
@ -125,7 +126,9 @@ assert DATABASE_TYPE in {DatabaseTypes.MYSQL, DatabaseTypes.POSTGRESQL, Database
DATABASE_ENGINE = f"django.db.backends.{DATABASE_TYPE}"
DATABASE_CONFIGS = {
DatabaseConfig = typing.Dict[str, typing.Dict[str, typing.Any]]
DATABASE_CONFIGS: DatabaseConfig = {
DatabaseTypes.SQLITE3: {
"ENGINE": DATABASE_ENGINE,
"NAME": DATABASE_NAME or "/var/lib/oncall/oncall.db",
@ -152,6 +155,7 @@ DATABASE_CONFIGS = {
},
}
READONLY_DATABASES: DatabaseConfig = {}
DATABASES = {
"default": DATABASE_CONFIGS[DATABASE_TYPE],
}
@ -570,7 +574,7 @@ SOCIAL_AUTH_PIPELINE = (
"apps.social_auth.pipeline.delete_slack_auth_token",
)
SOCIAL_AUTH_FIELDS_STORED_IN_SESSION = []
SOCIAL_AUTH_FIELDS_STORED_IN_SESSION: typing.List[str] = []
SOCIAL_AUTH_REDIRECT_IS_HTTPS = getenv_boolean("SOCIAL_AUTH_REDIRECT_IS_HTTPS", default=True)
SOCIAL_AUTH_SLUGIFY_USERNAMES = True

View file

@ -70,8 +70,12 @@ INTERNAL_IPS = [
"127.0.0.1",
]
# # the below two lines make it possible to use django-debug-toolbar inside of docker locally
# # https://knasmueller.net/fix-djangos-debug-toolbar-not-showing-inside-docker
# # https://stackoverflow.com/questions/10517765/django-debug-toolbar-not-showing-up
hostname, _, ips = socket.gethostbyname_ex(socket.gethostname())
INTERNAL_IPS += [".".join(ip.split(".")[:-1] + ["1"]) for ip in ips]
try:
# # the below two lines make it possible to use django-debug-toolbar inside of docker locally
# # https://knasmueller.net/fix-djangos-debug-toolbar-not-showing-inside-docker
# # https://stackoverflow.com/questions/10517765/django-debug-toolbar-not-showing-up
hostname, _, ips = socket.gethostbyname_ex(socket.gethostname())
INTERNAL_IPS += [".".join(ip.split(".")[:-1] + ["1"]) for ip in ips]
except OSError:
# usually raised if this is being run outside of a docker container context
INTERNAL_IPS = []

View file

@ -0,0 +1,34 @@
from icalendar.cal import Alarm as Alarm
from icalendar.cal import Calendar as Calendar
from icalendar.cal import ComponentFactory as ComponentFactory
from icalendar.cal import Event as Event
from icalendar.cal import FreeBusy as FreeBusy
from icalendar.cal import Journal as Journal
from icalendar.cal import Timezone as Timezone
from icalendar.cal import TimezoneDaylight as TimezoneDaylight
from icalendar.cal import TimezoneStandard as TimezoneStandard
from icalendar.cal import Todo as Todo
from icalendar.parser import Parameters as Parameters
from icalendar.parser import q_join as q_join
from icalendar.parser import q_split as q_split
from icalendar.prop import FixedOffset as FixedOffset
from icalendar.prop import LocalTimezone as LocalTimezone
from icalendar.prop import TypesFactory as TypesFactory
from icalendar.prop import vBinary as vBinary
from icalendar.prop import vBoolean as vBoolean
from icalendar.prop import vCalAddress as vCalAddress
from icalendar.prop import vDate as vDate
from icalendar.prop import vDatetime as vDatetime
from icalendar.prop import vDDDTypes as vDDDTypes
from icalendar.prop import vDuration as vDuration
from icalendar.prop import vFloat as vFloat
from icalendar.prop import vFrequency as vFrequency
from icalendar.prop import vGeo as vGeo
from icalendar.prop import vInt as vInt
from icalendar.prop import vPeriod as vPeriod
from icalendar.prop import vRecur as vRecur
from icalendar.prop import vText as vText
from icalendar.prop import vTime as vTime
from icalendar.prop import vUri as vUri
from icalendar.prop import vUTCOffset as vUTCOffset
from icalendar.prop import vWeekday as vWeekday

View file

@ -0,0 +1,109 @@
from _typeshed import Incomplete
from icalendar.caselessdict import CaselessDict as CaselessDict
from icalendar.compat import unicode_type as unicode_type
from icalendar.parser import Contentline as Contentline
from icalendar.parser import Contentlines as Contentlines
from icalendar.parser import Parameters as Parameters
from icalendar.parser import q_join as q_join
from icalendar.parser import q_split as q_split
from icalendar.parser_tools import DEFAULT_ENCODING as DEFAULT_ENCODING
from icalendar.prop import TypesFactory as TypesFactory
from icalendar.prop import vDDDLists as vDDDLists
from icalendar.prop import vText as vText
class ComponentFactory(CaselessDict):
def __init__(self, *args, **kwargs) -> None: ...
INLINE: Incomplete
class Component(CaselessDict):
name: Incomplete
required: Incomplete
singletons: Incomplete
multiple: Incomplete
exclusive: Incomplete
inclusive: Incomplete
ignore_exceptions: bool
subcomponents: Incomplete
errors: Incomplete
def __init__(self, *args, **kwargs) -> None: ...
def __bool__(self) -> bool: ...
__nonzero__ = __bool__
def is_empty(self): ...
@property
def is_broken(self): ...
def add(self, name, value, parameters: Incomplete | None = ..., encode: int = ...) -> None: ...
def decoded(self, name, default=...): ...
def get_inline(self, name, decode: int = ...): ...
def set_inline(self, name, values, encode: int = ...) -> None: ...
def add_component(self, component) -> None: ...
def walk(self, name: Incomplete | None = ...): ...
def property_items(self, recursive: bool = ..., sorted: bool = ...): ...
@classmethod
def from_ical(cls, st, multiple: bool = ...): ...
def content_line(self, name, value, sorted: bool = ...): ...
def content_lines(self, sorted: bool = ...): ...
def to_ical(self, sorted: bool = ...): ...
class Event(Component):
name: str
canonical_order: Incomplete
required: Incomplete
singletons: Incomplete
exclusive: Incomplete
multiple: Incomplete
ignore_exceptions: bool
class Todo(Component):
name: str
required: Incomplete
singletons: Incomplete
exclusive: Incomplete
multiple: Incomplete
class Journal(Component):
name: str
required: Incomplete
singletons: Incomplete
multiple: Incomplete
class FreeBusy(Component):
name: str
required: Incomplete
singletons: Incomplete
multiple: Incomplete
class Timezone(Component):
name: str
canonical_order: Incomplete
required: Incomplete
singletons: Incomplete
def to_tz(self): ...
class TimezoneStandard(Component):
name: str
required: Incomplete
singletons: Incomplete
multiple: Incomplete
class TimezoneDaylight(Component):
name: str
required: Incomplete
singletons: Incomplete
multiple: Incomplete
class Alarm(Component):
name: str
required: Incomplete
singletons: Incomplete
inclusive: Incomplete
multiple: Incomplete
class Calendar(Component):
name: str
canonical_order: Incomplete
required: Incomplete
singletons: Incomplete
types_factory: Incomplete
component_factory: Incomplete

View file

@ -0,0 +1,26 @@
from collections import OrderedDict
from _typeshed import Incomplete
from icalendar.compat import iteritems as iteritems
from icalendar.parser_tools import to_unicode as to_unicode
def canonsort_keys(keys, canonical_order: Incomplete | None = ...): ...
def canonsort_items(dict1, canonical_order: Incomplete | None = ...): ...
class CaselessDict(OrderedDict):
def __init__(self, *args, **kwargs) -> None: ...
def __getitem__(self, key): ...
def __setitem__(self, key, value) -> None: ...
def __delitem__(self, key) -> None: ...
def __contains__(self, key) -> bool: ...
def get(self, key, default: Incomplete | None = ...): ...
def setdefault(self, key, value: Incomplete | None = ...): ...
def pop(self, key, default: Incomplete | None = ...): ...
def popitem(self): ...
def has_key(self, key): ...
def update(self, *args, **kwargs) -> None: ...
def copy(self): ...
def __eq__(self, other): ...
canonical_order: Incomplete
def sorted_keys(self): ...
def sorted_items(self): ...

View file

@ -0,0 +1,4 @@
from . import Calendar as Calendar
def view(input_handle, output_handle) -> None: ...
def main() -> None: ...

View file

@ -0,0 +1,5 @@
from _typeshed import Incomplete
unicode_type = str
bytes_type = bytes
iteritems: Incomplete

View file

@ -0,0 +1,54 @@
from _typeshed import Incomplete
from icalendar import compat as compat
from icalendar.caselessdict import CaselessDict as CaselessDict
from icalendar.parser_tools import DEFAULT_ENCODING as DEFAULT_ENCODING
from icalendar.parser_tools import SEQUENCE_TYPES as SEQUENCE_TYPES
from icalendar.parser_tools import to_unicode as to_unicode
from icalendar.prop import vText as vText
def escape_char(text): ...
def unescape_char(text): ...
def tzid_from_dt(dt): ...
def foldline(line, limit: int = ..., fold_sep: str = ...): ...
def param_value(value): ...
NAME: Incomplete
UNSAFE_CHAR: Incomplete
QUNSAFE_CHAR: Incomplete
FOLD: Incomplete
uFOLD: Incomplete
NEWLINE: Incomplete
def validate_token(name) -> None: ...
def validate_param_value(value, quoted: bool = ...) -> None: ...
QUOTABLE: Incomplete
def dquote(val): ...
def q_split(st, sep: str = ..., maxsplit: int = ...): ...
def q_join(lst, sep: str = ...): ...
class Parameters(CaselessDict):
def params(self): ...
def to_ical(self, sorted: bool = ...): ...
@classmethod
def from_ical(cls, st, strict: bool = ...): ...
def escape_string(val): ...
def unescape_string(val): ...
def unescape_list_or_string(val): ...
class Contentline(compat.unicode_type):
strict: Incomplete
def __new__(cls, value, strict: bool = ..., encoding=...): ...
@classmethod
def from_parts(cls, name, params, values, sorted: bool = ...): ...
def parts(self): ...
@classmethod
def from_ical(cls, ical, strict: bool = ...): ...
def to_ical(self): ...
class Contentlines(list):
def to_ical(self): ...
@classmethod
def from_ical(cls, st): ...

View file

@ -0,0 +1,8 @@
from _typeshed import Incomplete
from icalendar import compat as compat
SEQUENCE_TYPES: Incomplete
DEFAULT_ENCODING: str
def to_unicode(value, encoding: str = ...): ...
def data_encode(data, encoding=...): ...

View file

@ -0,0 +1,219 @@
from datetime import tzinfo
from _typeshed import Incomplete
from icalendar import compat as compat
from icalendar.caselessdict import CaselessDict as CaselessDict
from icalendar.parser import Parameters as Parameters
from icalendar.parser import escape_char as escape_char
from icalendar.parser import tzid_from_dt as tzid_from_dt
from icalendar.parser import unescape_char as unescape_char
from icalendar.parser_tools import DEFAULT_ENCODING as DEFAULT_ENCODING
from icalendar.parser_tools import SEQUENCE_TYPES as SEQUENCE_TYPES
from icalendar.parser_tools import to_unicode as to_unicode
from icalendar.windows_to_olson import WINDOWS_TO_OLSON as WINDOWS_TO_OLSON
DATE_PART: str
TIME_PART: str
DATETIME_PART: Incomplete
WEEKS_PART: str
DURATION_REGEX: Incomplete
WEEKDAY_RULE: Incomplete
ZERO: Incomplete
HOUR: Incomplete
STDOFFSET: Incomplete
DSTOFFSET: Incomplete
DSTOFFSET = STDOFFSET
DSTDIFF: Incomplete
class FixedOffset(tzinfo):
def __init__(self, offset, name) -> None: ...
def utcoffset(self, dt): ...
def tzname(self, dt): ...
def dst(self, dt): ...
class LocalTimezone(tzinfo):
def utcoffset(self, dt): ...
def dst(self, dt): ...
def tzname(self, dt): ...
class vBinary:
obj: Incomplete
params: Incomplete
def __init__(self, obj) -> None: ...
def to_ical(self): ...
@staticmethod
def from_ical(ical): ...
class vBoolean(int):
BOOL_MAP: Incomplete
params: Incomplete
def __new__(cls, *args, **kwargs): ...
def to_ical(self): ...
@classmethod
def from_ical(cls, ical): ...
class vCalAddress(compat.unicode_type):
params: Incomplete
def __new__(cls, value, encoding=...): ...
def to_ical(self): ...
@classmethod
def from_ical(cls, ical): ...
class vFloat(float):
params: Incomplete
def __new__(cls, *args, **kwargs): ...
def to_ical(self): ...
@classmethod
def from_ical(cls, ical): ...
class vInt(int):
params: Incomplete
def __new__(cls, *args, **kwargs): ...
def to_ical(self): ...
@classmethod
def from_ical(cls, ical): ...
class vDDDLists:
params: Incomplete
dts: Incomplete
def __init__(self, dt_list) -> None: ...
def to_ical(self): ...
@staticmethod
def from_ical(ical, timezone: Incomplete | None = ...): ...
class vCategory:
cats: Incomplete
def __init__(self, c_list) -> None: ...
def to_ical(self): ...
@staticmethod
def from_ical(ical, timezone: Incomplete | None = ...): ...
class vDDDTypes:
params: Incomplete
dt: Incomplete
def __init__(self, dt) -> None: ...
def to_ical(self): ...
@classmethod
def from_ical(cls, ical, timezone: Incomplete | None = ...): ...
class vDate:
dt: Incomplete
params: Incomplete
def __init__(self, dt) -> None: ...
def to_ical(self): ...
@staticmethod
def from_ical(ical): ...
class vDatetime:
dt: Incomplete
params: Incomplete
def __init__(self, dt) -> None: ...
def to_ical(self): ...
@staticmethod
def from_ical(ical, timezone: Incomplete | None = ...): ...
class vDuration:
td: Incomplete
params: Incomplete
def __init__(self, td) -> None: ...
def to_ical(self): ...
@staticmethod
def from_ical(ical): ...
class vPeriod:
params: Incomplete
start: Incomplete
end: Incomplete
by_duration: Incomplete
duration: Incomplete
def __init__(self, per) -> None: ...
def __cmp__(self, other): ...
def overlaps(self, other): ...
def to_ical(self): ...
@staticmethod
def from_ical(ical): ...
class vWeekday(compat.unicode_type):
week_days: Incomplete
relative: Incomplete
params: Incomplete
def __new__(cls, value, encoding=...): ...
def to_ical(self): ...
@classmethod
def from_ical(cls, ical): ...
class vFrequency(compat.unicode_type):
frequencies: Incomplete
params: Incomplete
def __new__(cls, value, encoding=...): ...
def to_ical(self): ...
@classmethod
def from_ical(cls, ical): ...
class vRecur(CaselessDict):
frequencies: Incomplete
canonical_order: Incomplete
types: Incomplete
params: Incomplete
def __init__(self, *args, **kwargs) -> None: ...
def to_ical(self): ...
@classmethod
def parse_type(cls, key, values): ...
@classmethod
def from_ical(cls, ical): ...
class vText(compat.unicode_type):
encoding: Incomplete
params: Incomplete
def __new__(cls, value, encoding=...): ...
def to_ical(self): ...
@classmethod
def from_ical(cls, ical): ...
class vTime:
dt: Incomplete
params: Incomplete
def __init__(self, *args) -> None: ...
def to_ical(self): ...
@staticmethod
def from_ical(ical): ...
class vUri(compat.unicode_type):
params: Incomplete
def __new__(cls, value, encoding=...): ...
def to_ical(self): ...
@classmethod
def from_ical(cls, ical): ...
class vGeo:
latitude: Incomplete
longitude: Incomplete
params: Incomplete
def __init__(self, geo) -> None: ...
def to_ical(self): ...
@staticmethod
def from_ical(ical): ...
class vUTCOffset:
ignore_exceptions: bool
td: Incomplete
params: Incomplete
def __init__(self, td) -> None: ...
def to_ical(self): ...
@classmethod
def from_ical(cls, ical): ...
class vInline(compat.unicode_type):
params: Incomplete
def __new__(cls, value, encoding=...): ...
def to_ical(self): ...
@classmethod
def from_ical(cls, ical): ...
class TypesFactory(CaselessDict):
all_types: Incomplete
def __init__(self, *args, **kwargs) -> None: ...
types_map: Incomplete
def for_property(self, name): ...
def to_ical(self, name, value): ...
def from_ical(self, name, value): ...

View file

@ -0,0 +1,9 @@
from _typeshed import Incomplete
from icalendar.parser_tools import to_unicode as to_unicode
from icalendar.prop import vDatetime as vDatetime
from icalendar.prop import vText as vText
class UIDGenerator:
chars: Incomplete
def rnd_string(self, length: int = ...): ...
def uid(self, host_name: str = ..., unique: str = ...): ...

View file

@ -0,0 +1,3 @@
from _typeshed import Incomplete
WINDOWS_TO_OLSON: Incomplete