apps.get_model -> import (#2619)

# What this PR does

Remove
[`apps.get_model`](https://docs.djangoproject.com/en/3.2/ref/applications/#django.apps.apps.get_model)
invocations and use inline `import` statements in places where models
are imported within functions/methods to avoid circular imports.

I believe `import` statements are more appropriate for most use cases as
they allow for better static code analysis & formatting, and solve the
issue of circular imports without being unnecessarily dynamic as
`apps.get_model`. With `import` statements, it's possible to:

- Jump to model definitions in most IDEs
- Automatically sort inline imports with `isort`
- Find import errors faster/easier (most IDEs highlight broken imports)
- Have more consistency across regular & inline imports when importing
models

This PR also adds a flake8 rule to ban imports of `django.apps.apps`, so
it's harder to use `apps.get_model` by mistake (it's possible to ignore
this rule by using `# noqa: I251`). The rule is not enforced on
directories with migration files, because `apps.get_model` is often used
to get a historical state of a model, which is useful when writing
migrations ([see this SO answer for more
details](https://stackoverflow.com/a/37769213)). So `apps.get_model` is
considered OK in migrations (even necessary in some cases).

## Checklist

- [x] Unit, integration, and e2e (if applicable) tests updated
- [x] Documentation added (or `pr:no public docs` PR label added if not
required)
- [x] `CHANGELOG.md` updated (or `pr:no changelog` PR label added if not
required)
This commit is contained in:
Vadim Stepanov 2023-07-25 10:43:23 +01:00 committed by GitHub
parent af9d5c935b
commit b2f4ffb98a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
106 changed files with 378 additions and 427 deletions

View file

@ -5,7 +5,6 @@ import typing
import pytz
from celery import uuid as celery_uuid
from dateutil.parser import parse
from django.apps import apps
from django.utils.functional import cached_property
from rest_framework.exceptions import ValidationError
@ -229,7 +228,7 @@ class EscalationSnapshotMixin:
"""
:type self:AlertGroup
"""
AlertGroup = apps.get_model("alerts", "AlertGroup")
from apps.alerts.models import AlertGroup
is_on_maintenace_or_debug_mode = self.channel.maintenance_mode is not None

View file

@ -2,7 +2,6 @@ import copy
import logging
from typing import TYPE_CHECKING, Optional, Tuple
from django.apps import apps
from rest_framework import status
from apps.alerts.tasks import schedule_create_contact_points_for_datasource
@ -357,7 +356,8 @@ class GrafanaAlertingSyncManager:
receiver_config = self._get_receiver_config(is_grafana_datasource, payload)
GrafanaAlertingContactPoint = apps.get_model("alerts", "GrafanaAlertingContactPoint")
from apps.alerts.models import GrafanaAlertingContactPoint
contact_point = GrafanaAlertingContactPoint(
alert_receive_channel=self.alert_receive_channel,
name=receiver_config["name"],

View file

@ -1,6 +1,5 @@
import json
from django.apps import apps
from django.utils.text import Truncator
from apps.alerts.incident_appearance.renderers.base_renderer import AlertBaseRenderer, AlertGroupBaseRenderer
@ -167,7 +166,8 @@ class AlertGroupSlackRenderer(AlertGroupBaseRenderer):
return attachment
def _get_buttons_blocks(self):
AlertGroup = apps.get_model("alerts", "AlertGroup")
from apps.alerts.models import AlertGroup
buttons = []
if not self.alert_group.is_maintenance_incident:
if not self.alert_group.resolved:
@ -319,7 +319,8 @@ class AlertGroupSlackRenderer(AlertGroupBaseRenderer):
return blocks
def _get_invitation_attachment(self):
Invitation = apps.get_model("alerts", "Invitation")
from apps.alerts.models import Invitation
invitations = Invitation.objects.filter(is_active=True, alert_group=self.alert_group).all()
if len(invitations) == 0:
return []

View file

@ -1,5 +1,3 @@
from django.apps import apps
from apps.alerts.incident_appearance.templaters.alert_templater import AlertTemplater
@ -30,7 +28,8 @@ class AlertSlackTemplater(AlertTemplater):
payload = self.alert.raw_request_data
# First check if payload look like payload from manual incident integration and was not modified before.
if "view" in payload and "private_metadata" in payload.get("view", {}) and "oncall" not in payload:
AlertReceiveChannel = apps.get_model("alerts", "AlertReceiveChannel")
from apps.alerts.models import AlertReceiveChannel
# If so - check it with db query.
if self.alert.group.channel.integration == AlertReceiveChannel.INTEGRATION_MANUAL:
metadata = payload.get("view", {}).get("private_metadata", {})

View file

@ -1,4 +1,3 @@
from django.apps import apps
from django.db.models import Q
from django.utils import timezone
@ -32,8 +31,7 @@ class IncidentLogBuilder:
return all_log_records_sorted
def _get_log_records_for_after_resolve_report(self):
EscalationPolicy = apps.get_model("alerts", "EscalationPolicy")
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroupLogRecord, EscalationPolicy
excluded_log_types = [
AlertGroupLogRecord.TYPE_ESCALATION_FINISHED,
@ -86,8 +84,7 @@ class IncidentLogBuilder:
)
def _get_user_notification_log_records_for_log_report(self):
UserNotificationPolicyLogRecord = apps.get_model("base", "UserNotificationPolicyLogRecord")
UserNotificationPolicy = apps.get_model("base", "UserNotificationPolicy")
from apps.base.models import UserNotificationPolicy, UserNotificationPolicyLogRecord
# exclude user notification logs with step 'wait' or with status 'finished'
return (
@ -160,7 +157,8 @@ class IncidentLogBuilder:
def _render_escalation_plan_from_escalation_snapshot(
self, escalation_plan_dict, stop_escalation_log_pk, esc_timedelta, escalation_snapshot, for_slack=False
):
EscalationPolicy = apps.get_model("alerts", "EscalationPolicy")
from apps.alerts.models import EscalationPolicy
now = timezone.now()
escalation_eta = None
last_log_timedelta = None
@ -261,7 +259,8 @@ class IncidentLogBuilder:
:param for_slack:
:return: {timedelta: [{"user_id": user.pk, "plan_lines": [#rendered escalation policy line, ]}, ..., ...], ...}
"""
Invitation = apps.get_model("alerts", "Invitation")
from apps.alerts.models import Invitation
now = timezone.now()
for invitation in self.alert_group.invitations.filter(is_active=True):
invitation_timedelta = timezone.timedelta()
@ -377,7 +376,7 @@ class IncidentLogBuilder:
:return: dict with timedelta as a key and list with escalation and notification plan lines as a value
"""
EscalationPolicy = apps.get_model("alerts", "EscalationPolicy")
from apps.alerts.models import EscalationPolicy
escalation_plan_dict = {}
timedelta = timezone.timedelta()
@ -577,7 +576,8 @@ class IncidentLogBuilder:
:param for_slack: (bool) add or not user slack id to user notification plan line
:return: plan line
"""
UserNotificationPolicy = apps.get_model("base", "UserNotificationPolicy")
from apps.base.models import UserNotificationPolicy
result = ""
user_verbal = user_to_notify.get_username_with_slack_verbal() if for_slack else user_to_notify.username
if notification_policy.step == UserNotificationPolicy.Step.NOTIFY:
@ -610,8 +610,7 @@ class IncidentLogBuilder:
:param for_slack: (bool) add or not user slack id to user notification plan line
:return: {timedelta: [{"user_id": user.pk, "plan_lines": [#rendered notification policy line, ]}, ...], ...}
"""
UserNotificationPolicyLogRecord = apps.get_model("base", "UserNotificationPolicyLogRecord")
UserNotificationPolicy = apps.get_model("base", "UserNotificationPolicy")
from apps.base.models import UserNotificationPolicy, UserNotificationPolicyLogRecord
timedelta = timezone.timedelta()
is_the_first_notification_step = future_step # escalation starts with this step or not

View file

@ -2,7 +2,6 @@ import hashlib
import logging
from uuid import uuid4
from django.apps import apps
from django.conf import settings
from django.core.validators import MinLengthValidator
from django.db import models
@ -85,10 +84,7 @@ class Alert(models.Model):
Creates an alert and a group if needed.
"""
# This import is here to avoid circular imports
ChannelFilter = apps.get_model("alerts", "ChannelFilter")
AlertGroup = apps.get_model("alerts", "AlertGroup")
AlertReceiveChannel = apps.get_model("alerts", "AlertReceiveChannel")
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroup, AlertGroupLogRecord, AlertReceiveChannel, ChannelFilter
group_data = Alert.render_group_data(alert_receive_channel, raw_request_data, is_demo)
if channel_filter is None:
@ -186,7 +182,7 @@ class Alert(models.Model):
@classmethod
def render_group_data(cls, alert_receive_channel, raw_request_data, is_demo=False):
AlertGroup = apps.get_model("alerts", "AlertGroup")
from apps.alerts.models import AlertGroup
template_manager = TemplateLoader()

View file

@ -7,7 +7,6 @@ from urllib.parse import urljoin
from uuid import UUID, uuid1
from celery import uuid as celery_uuid
from django.apps import apps
from django.conf import settings
from django.core.validators import MinLengthValidator
from django.db import IntegrityError, models, transaction
@ -407,7 +406,7 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
return self.maintenance_uuid is not None
def stop_maintenance(self, user: User) -> None:
AlertReceiveChannel = apps.get_model("alerts", "AlertReceiveChannel")
from apps.alerts.models import AlertReceiveChannel
try:
integration_on_maintenance = AlertReceiveChannel.objects.get(maintenance_uuid=self.maintenance_uuid)
@ -521,7 +520,8 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
)
def acknowledge_by_user(self, user: User, action_source: typing.Optional[str] = None) -> None:
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroupLogRecord
initial_state = self.state
logger.debug(f"Started acknowledge_by_user for alert_group {self.pk}")
@ -565,7 +565,8 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
logger.debug(f"Finished acknowledge_by_user for alert_group {self.pk}")
def acknowledge_by_source(self):
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroupLogRecord
initial_state = self.state
# if incident was silenced, unsilence it without starting escalation
@ -600,7 +601,8 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
dependent_alert_group.acknowledge_by_source()
def un_acknowledge_by_user(self, user: User, action_source: typing.Optional[str] = None) -> None:
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroupLogRecord
initial_state = self.state
logger.debug(f"Started un_acknowledge_by_user for alert_group {self.pk}")
@ -628,7 +630,8 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
logger.debug(f"Finished un_acknowledge_by_user for alert_group {self.pk}")
def resolve_by_user(self, user: User, action_source: typing.Optional[str] = None) -> None:
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroupLogRecord
initial_state = self.state
# if incident was silenced, unsilence it without starting escalation
@ -661,7 +664,8 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
dependent_alert_group.resolve_by_user(user, action_source=action_source)
def resolve_by_source(self):
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroupLogRecord
initial_state = self.state
# if incident was silenced, unsilence it without starting escalation
@ -695,7 +699,8 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
dependent_alert_group.resolve_by_source()
def resolve_by_last_step(self):
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroupLogRecord
initial_state = self.state
self.resolve(resolved_by=AlertGroup.LAST_STEP)
@ -721,7 +726,7 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
dependent_alert_group.resolve_by_last_step()
def resolve_by_disable_maintenance(self):
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroupLogRecord
self.resolve(resolved_by=AlertGroup.DISABLE_MAINTENANCE)
self.stop_escalation()
@ -743,7 +748,7 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
dependent_alert_group.resolve_by_disable_maintenance()
def un_resolve_by_user(self, user: User, action_source: typing.Optional[str] = None) -> None:
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroupLogRecord
if self.wiped_at is None:
initial_state = self.state
@ -774,7 +779,7 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
def attach_by_user(
self, user: User, root_alert_group: "AlertGroup", action_source: typing.Optional[str] = None
) -> None:
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroupLogRecord
if root_alert_group.root_alert_group is None and not root_alert_group.resolved:
self.root_alert_group = root_alert_group
@ -850,7 +855,7 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
)
def un_attach_by_user(self, user: User, action_source: typing.Optional[str] = None) -> None:
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroupLogRecord
root_alert_group: AlertGroup = self.root_alert_group
self.root_alert_group = None
@ -897,7 +902,7 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
)
def un_attach_by_delete(self):
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroupLogRecord
self.root_alert_group = None
self.save(update_fields=["root_alert_group"])
@ -924,7 +929,8 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
def silence_by_user(
self, user: User, silence_delay: typing.Optional[int], action_source: typing.Optional[str] = None
) -> None:
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroupLogRecord
initial_state = self.state
if self.resolved:
@ -981,7 +987,8 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
dependent_alert_group.silence_by_user(user, silence_delay, action_source)
def un_silence_by_user(self, user: User, action_source: typing.Optional[str] = None) -> None:
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroupLogRecord
initial_state = self.state
self.un_silence()
@ -1014,7 +1021,8 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
dependent_alert_group.un_silence_by_user(user, action_source=action_source)
def wipe_by_user(self, user: User) -> None:
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroupLogRecord
initial_state = self.state
if not self.wiped_at:
@ -1059,7 +1067,8 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
dependent_alert_group.wipe_by_user(user)
def delete_by_user(self, user: User):
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroupLogRecord
initial_state = self.state
self.stop_escalation()
@ -1098,7 +1107,7 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
dependent_alert_group.un_attach_by_delete()
def hard_delete(self):
ResolutionNote = apps.get_model("alerts", "ResolutionNote")
from apps.alerts.models import ResolutionNote
alerts = self.alerts.all()
alerts.delete()
@ -1114,7 +1123,7 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
@staticmethod
def _bulk_acknowledge(user: User, alert_groups_to_acknowledge: "QuerySet[AlertGroup]") -> None:
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroupLogRecord
# it is needed to unserolve those alert_groups which were resolved to build proper log.
alert_groups_to_unresolve_before_acknowledge = alert_groups_to_acknowledge.filter(resolved=models.Value("1"))
@ -1207,7 +1216,7 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
@staticmethod
def _bulk_resolve(user: User, alert_groups_to_resolve: "QuerySet[AlertGroup]") -> None:
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroupLogRecord
# it is needed to unsilence those alert_groups which were silenced to build proper log.
alert_groups_to_unsilence_before_resolve = alert_groups_to_resolve.filter(silenced=models.Value("1"))
@ -1294,7 +1303,7 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
@staticmethod
def _bulk_restart_unack(user: User, alert_groups_to_restart_unack: "QuerySet[AlertGroup]") -> None:
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroupLogRecord
# convert current qs to list to prevent changes by update
alert_groups_to_restart_unack_list = list(alert_groups_to_restart_unack)
@ -1337,7 +1346,7 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
@staticmethod
def _bulk_restart_unresolve(user: User, alert_groups_to_restart_unresolve: "QuerySet[AlertGroup]") -> None:
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroupLogRecord
# convert current qs to list to prevent changes by update
alert_groups_to_restart_unresolve_list = list(alert_groups_to_restart_unresolve)
@ -1380,7 +1389,7 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
@staticmethod
def _bulk_restart_unsilence(user: User, alert_groups_to_restart_unsilence: "QuerySet[AlertGroup]") -> None:
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroupLogRecord
# convert current qs to list to prevent changes by update
alert_groups_to_restart_unsilence_list = list(alert_groups_to_restart_unsilence)
@ -1450,7 +1459,7 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
@staticmethod
def _bulk_silence(user: User, alert_groups_to_silence: "QuerySet[AlertGroup]", silence_delay: int) -> None:
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroupLogRecord
now = timezone.now()
silence_for_period = silence_delay is not None and silence_delay > 0
@ -1566,7 +1575,8 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
AlertGroup._bulk_silence(user, dependent_alert_groups_to_silence, silence_delay)
def start_ack_reminder(self, user: User):
Organization = apps.get_model("user_management", "Organization")
from apps.user_management.models import Organization
unique_unacknowledge_process_id = uuid1()
logger.info(
f"AlertGroup acknowledged by user with pk "
@ -1719,9 +1729,8 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
return "Acknowledged"
def render_after_resolve_report_json(self):
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
UserNotificationPolicyLogRecord = apps.get_model("base", "UserNotificationPolicyLogRecord")
ResolutionNote = apps.get_model("alerts", "ResolutionNote")
from apps.alerts.models import AlertGroupLogRecord, ResolutionNote
from apps.base.models import UserNotificationPolicyLogRecord
log_builder = IncidentLogBuilder(self)
log_records_list = log_builder.get_log_records_list(with_resolution_notes=True)
@ -1775,7 +1784,8 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
return slack_channel_id
def get_slack_message(self):
SlackMessage = apps.get_model("slack", "SlackMessage")
from apps.slack.models import SlackMessage
if self.slack_message is None:
slack_message = SlackMessage.objects.filter(alert_group=self).order_by("created_at").first()
return slack_message
@ -1783,7 +1793,8 @@ class AlertGroup(AlertGroupSlackRenderingMixin, EscalationSnapshotMixin, models.
@cached_property
def last_stop_escalation_log(self):
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroupLogRecord
stop_escalation_log = (
self.log_records.filter(
type__in=[

View file

@ -2,7 +2,6 @@ import json
import logging
import humanize
from django.apps import apps
from django.db import models
from django.db.models import JSONField
from django.db.models.signals import post_save
@ -232,7 +231,7 @@ class AlertGroupLogRecord(models.Model):
return result
def rendered_log_line_action(self, for_slack=False, html=False, substitute_author_with_tag=False):
EscalationPolicy = apps.get_model("alerts", "EscalationPolicy")
from apps.alerts.models import EscalationPolicy
result = ""
author_name = None

View file

@ -5,7 +5,6 @@ from urllib.parse import urljoin
import emoji
from celery import uuid as celery_uuid
from django.apps import apps
from django.conf import settings
from django.core.validators import MinLengthValidator
from django.db import models, transaction
@ -325,7 +324,8 @@ class AlertReceiveChannel(IntegrationOptionsMixin, MaintainableObject):
@property
def alerts_count(self):
Alert = apps.get_model("alerts", "Alert")
from apps.alerts.models import Alert
return Alert.objects.filter(group__channel=self).count()
@property
@ -631,8 +631,8 @@ class AlertReceiveChannel(IntegrationOptionsMixin, MaintainableObject):
def listen_for_alertreceivechannel_model_save(
sender: AlertReceiveChannel, instance: AlertReceiveChannel, created: bool, *args, **kwargs
) -> None:
ChannelFilter = apps.get_model("alerts", "ChannelFilter")
IntegrationHeartBeat = apps.get_model("heartbeat", "IntegrationHeartBeat")
from apps.alerts.models import ChannelFilter
from apps.heartbeat.models import IntegrationHeartBeat
if created:
write_resource_insight_log(instance=instance, author=instance.author, event=EntityEvent.CREATED)

View file

@ -2,7 +2,6 @@ import json
import logging
import re
from django.apps import apps
from django.conf import settings
from django.core.validators import MinLengthValidator
from django.db import models
@ -186,7 +185,8 @@ class ChannelFilter(OrderedModel):
}
if self.slack_channel_id:
if self.slack_channel_id:
SlackChannel = apps.get_model("slack", "SlackChannel")
from apps.slack.models import SlackChannel
sti = self.alert_receive_channel.organization.slack_team_identity
slack_channel = SlackChannel.objects.filter(
slack_team_identity=sti, slack_id=self.slack_channel_id

View file

@ -1,7 +1,6 @@
import datetime
import logging
from django.apps import apps
from django.db import models, transaction
from apps.alerts.tasks import invite_user_to_join_incident, send_alert_group_signal
@ -57,7 +56,7 @@ class Invitation(models.Model):
@staticmethod
def invite_user(invitee_user, alert_group, user):
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroupLogRecord
# RFCT - why atomic? without select for update?
with transaction.atomic():
@ -98,7 +97,7 @@ class Invitation(models.Model):
@staticmethod
def stop_invitation(invitation_pk, user):
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroupLogRecord
with transaction.atomic():
try:

View file

@ -3,7 +3,6 @@ from uuid import uuid4
import humanize
import pytz
from django.apps import apps
from django.db import models, transaction
from django.utils import timezone
@ -68,9 +67,7 @@ class MaintainableObject(models.Model):
raise NotImplementedError
def start_maintenance(self, mode, maintenance_duration, user):
AlertGroup = apps.get_model("alerts", "AlertGroup")
AlertReceiveChannel = apps.get_model("alerts", "AlertReceiveChannel")
Alert = apps.get_model("alerts", "Alert")
from apps.alerts.models import Alert, AlertGroup, AlertReceiveChannel
with transaction.atomic():
_self = self.__class__.objects.select_for_update().get(pk=self.pk)

View file

@ -1,8 +1,6 @@
import logging
from abc import ABC, abstractmethod
from django.apps import apps
logger = logging.getLogger(__name__)
@ -15,7 +13,8 @@ class AlertGroupAbstractRepresentative(ABC):
@staticmethod
def get_handlers_map():
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroupLogRecord
return AlertGroupLogRecord.ACTIONS_TO_HANDLERS_MAP
@classmethod

View file

@ -1,4 +1,3 @@
from django.apps import apps
from django.conf import settings
from django.db import transaction
@ -12,9 +11,8 @@ from .task_logger import task_logger
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def acknowledge_reminder_task(alert_group_pk, unacknowledge_process_id):
Organization = apps.get_model("user_management", "Organization")
AlertGroup = apps.get_model("alerts", "AlertGroup")
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroup, AlertGroupLogRecord
from apps.user_management.models import Organization
log_record = None
@ -76,9 +74,8 @@ def acknowledge_reminder_task(alert_group_pk, unacknowledge_process_id):
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def unacknowledge_timeout_task(alert_group_pk, unacknowledge_process_id):
Organization = apps.get_model("user_management", "Organization")
AlertGroup = apps.get_model("alerts", "AlertGroup")
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroup, AlertGroupLogRecord
from apps.user_management.models import Organization
log_record = None

View file

View file

@ -3,7 +3,6 @@ import typing
import requests
from celery import shared_task
from django.apps import apps
from django.conf import settings
from django.db.models import Q
from django.utils import timezone
@ -109,7 +108,7 @@ def check_escalation_finished_task() -> None:
"""
don't retry this task, the idea is to be alerted of failures
"""
AlertGroup = apps.get_model("alerts", "AlertGroup")
from apps.alerts.models import AlertGroup
now = timezone.now()
two_days_ago = now - datetime.timedelta(days=2)

View file

@ -1,7 +1,6 @@
import logging
from celery.utils.log import get_task_logger
from django.apps import apps
from django.core.cache import cache
from rest_framework import status
@ -43,7 +42,7 @@ def create_contact_points_for_datasource(alert_receive_channel_id, datasource_li
if cached_task_id is not None and current_task_id != cached_task_id:
return
AlertReceiveChannel = apps.get_model("alerts", "AlertReceiveChannel")
from apps.alerts.models import AlertReceiveChannel
alert_receive_channel = AlertReceiveChannel.objects.filter(pk=alert_receive_channel_id).first()
if not alert_receive_channel:

View file

@ -1,7 +1,6 @@
import json
import logging
from django.apps import apps
from django.conf import settings
from django.db import transaction
from jinja2 import TemplateError
@ -19,11 +18,8 @@ logger = logging.getLogger(__name__)
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def custom_button_result(custom_button_pk, alert_group_pk, user_pk=None, escalation_policy_pk=None):
AlertGroup = apps.get_model("alerts", "AlertGroup")
EscalationPolicy = apps.get_model("alerts", "EscalationPolicy")
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
CustomButton = apps.get_model("alerts", "CustomButton")
User = apps.get_model("user_management", "User")
from apps.alerts.models import AlertGroup, AlertGroupLogRecord, CustomButton, EscalationPolicy
from apps.user_management.models import User
task_logger.debug(
f"Start custom_button_result for alert_group {alert_group_pk}, " f"custom_button {custom_button_pk}"

View file

@ -1,5 +1,4 @@
from celery.utils.log import get_task_logger
from django.apps import apps
from django.conf import settings
from common.custom_celery_tasks import shared_dedicated_queue_retry_task
@ -11,8 +10,9 @@ logger = get_task_logger(__name__)
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def delete_alert_group(alert_group_pk, user_pk):
AlertGroup = apps.get_model("alerts", "AlertGroup")
User = apps.get_model("user_management", "User")
from apps.alerts.models import AlertGroup
from apps.user_management.models import User
alert_group = AlertGroup.objects.filter(pk=alert_group_pk).first()
if not alert_group:
logger.debug("Alert group not found, skipping delete_alert_group")

View file

@ -1,4 +1,3 @@
from django.apps import apps
from django.conf import settings
from apps.alerts.constants import TASK_DELAY_SECONDS
@ -15,8 +14,7 @@ def distribute_alert(alert_id):
"""
We need this task to make task processing async and to make sure the task is delivered.
"""
Alert = apps.get_model("alerts", "Alert")
AlertGroup = apps.get_model("alerts", "AlertGroup")
from apps.alerts.models import Alert, AlertGroup
alert = Alert.objects.get(pk=alert_id)
task_logger.debug(f"Start distribute_alert for alert {alert_id} from alert_group {alert.group_id}")
@ -41,8 +39,7 @@ def distribute_alert(alert_id):
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def send_alert_create_signal(alert_id):
Alert = apps.get_model("alerts", "Alert")
AlertReceiveChannel = apps.get_model("alerts", "AlertReceiveChannel")
from apps.alerts.models import Alert, AlertReceiveChannel
task_logger.debug(f"Started send_alert_create_signal task for alert {alert_id}")
alert = Alert.objects.get(pk=alert_id)

View file

@ -1,4 +1,3 @@
from django.apps import apps
from django.conf import settings
from django.db import transaction
from kombu.utils.uuid import uuid as celery_uuid
@ -16,7 +15,7 @@ def escalate_alert_group(alert_group_pk):
"""
This task is on duty to send escalated alerts and schedule further escalation.
"""
AlertGroup = apps.get_model("alerts", "AlertGroup")
from apps.alerts.models import AlertGroup
task_logger.debug(f"Start escalate_alert_group for alert_group {alert_group_pk}")

View file

@ -1,5 +1,4 @@
import humanize
from django.apps import apps
from django.conf import settings
from django.db import transaction
from django.db.models import F
@ -13,8 +12,7 @@ from .notify_user import notify_user_task
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def invite_user_to_join_incident(invitation_pk):
Invitation = apps.get_model("alerts", "Invitation")
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroupLogRecord, Invitation
with transaction.atomic():
try:

View file

@ -1,4 +1,3 @@
from django.apps import apps
from django.conf import settings
from django.db import transaction
from django.db.models import ExpressionWrapper, F, fields
@ -14,8 +13,9 @@ from .task_logger import task_logger
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def disable_maintenance(*args, **kwargs):
AlertGroup = apps.get_model("alerts", "AlertGroup")
User = apps.get_model("user_management", "User")
from apps.alerts.models import AlertGroup
from apps.user_management.models import User
user = None
object_under_maintenance = None
user_id = kwargs.get("user_id")
@ -25,7 +25,7 @@ def disable_maintenance(*args, **kwargs):
force = kwargs.get("force", False)
with transaction.atomic():
if "alert_receive_channel_id" in kwargs:
AlertReceiveChannel = apps.get_model("alerts", "AlertReceiveChannel")
from apps.alerts.models import AlertReceiveChannel
alert_receive_channel_id = kwargs["alert_receive_channel_id"]
try:
@ -82,7 +82,8 @@ def disable_maintenance(*args, **kwargs):
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def check_maintenance_finished(*args, **kwargs):
AlertReceiveChannel = apps.get_model("alerts", "AlertReceiveChannel")
from apps.alerts.models import AlertReceiveChannel
now = timezone.now()
maintenance_finish_at = ExpressionWrapper(
(F("maintenance_started_at") + F("maintenance_duration")), output_field=fields.DateTimeField()

View file

@ -1,4 +1,3 @@
from django.apps import apps
from django.conf import settings
from apps.slack.tasks import check_slack_message_exists_before_post_message_to_thread
@ -12,9 +11,7 @@ from .task_logger import task_logger
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def notify_all_task(alert_group_pk, escalation_policy_snapshot_order=None):
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
EscalationPolicy = apps.get_model("alerts", "EscalationPolicy")
AlertGroup = apps.get_model("alerts", "AlertGroup")
from apps.alerts.models import AlertGroup, AlertGroupLogRecord, EscalationPolicy
alert_group = AlertGroup.objects.get(pk=alert_group_pk)

View file

@ -1,4 +1,3 @@
from django.apps import apps
from django.conf import settings
from apps.slack.scenarios import scenario_step
@ -13,10 +12,9 @@ from .task_logger import task_logger
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def notify_group_task(alert_group_pk, escalation_policy_snapshot_order=None):
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
UserNotificationPolicy = apps.get_model("base", "UserNotificationPolicy")
EscalationPolicy = apps.get_model("alerts", "EscalationPolicy")
AlertGroup = apps.get_model("alerts", "AlertGroup")
from apps.alerts.models import AlertGroup, AlertGroupLogRecord, EscalationPolicy
from apps.base.models import UserNotificationPolicy
EscalationDeliveryStep = scenario_step.ScenarioStep.get_step("escalation_delivery", "EscalationDeliveryStep")
alert_group = AlertGroup.objects.get(pk=alert_group_pk)

View file

@ -2,7 +2,6 @@ import datetime
import json
from copy import copy
from django.apps import apps
from django.utils import timezone
from apps.schedules.ical_events import ical_events
@ -173,7 +172,7 @@ def recalculate_shifts_with_respect_to_priority(shifts, users=None):
@shared_dedicated_queue_retry_task()
def notify_ical_schedule_shift(schedule_pk):
task_logger.info(f"Notify ical schedule shift {schedule_pk}")
OnCallSchedule = apps.get_model("schedules", "OnCallSchedule")
from apps.schedules.models import OnCallSchedule
try:
schedule = OnCallSchedule.objects.get(

View file

@ -1,6 +1,5 @@
import time
from django.apps import apps
from django.conf import settings
from django.db import transaction
from django.utils import timezone
@ -29,11 +28,9 @@ def notify_user_task(
important=False,
notify_anyway=False,
):
UserNotificationPolicy = apps.get_model("base", "UserNotificationPolicy")
UserNotificationPolicyLogRecord = apps.get_model("base", "UserNotificationPolicyLogRecord")
User = apps.get_model("user_management", "User")
AlertGroup = apps.get_model("alerts", "AlertGroup")
UserHasNotification = apps.get_model("alerts", "UserHasNotification")
from apps.alerts.models import AlertGroup, UserHasNotification
from apps.base.models import UserNotificationPolicy, UserNotificationPolicyLogRecord
from apps.user_management.models import User
try:
alert_group = AlertGroup.objects.get(pk=alert_group_pk)
@ -235,9 +232,9 @@ def notify_user_task(
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def perform_notification(log_record_pk):
UserNotificationPolicy = apps.get_model("base", "UserNotificationPolicy")
TelegramToUserConnector = apps.get_model("telegram", "TelegramToUserConnector")
UserNotificationPolicyLogRecord = apps.get_model("base", "UserNotificationPolicyLogRecord")
from apps.base.models import UserNotificationPolicy, UserNotificationPolicyLogRecord
from apps.telegram.models import TelegramToUserConnector
log_record = UserNotificationPolicyLogRecord.objects.get(pk=log_record_pk)
user = log_record.author
@ -376,7 +373,8 @@ def perform_notification(log_record_pk):
def send_user_notification_signal(log_record_pk):
start_time = time.time()
UserNotificationPolicyLogRecord = apps.get_model("base", "UserNotificationPolicyLogRecord")
from apps.base.models import UserNotificationPolicyLogRecord
task_logger.debug(f"LOG RECORD PK: {log_record_pk}")
task_logger.debug(f"LOG RECORD LAST: {UserNotificationPolicyLogRecord.objects.last()}")

View file

@ -1,4 +1,3 @@
from django.apps import apps
from django.conf import settings
from common.custom_celery_tasks import shared_dedicated_queue_retry_task
@ -12,8 +11,7 @@ def resolve_alert_group_by_source_if_needed(alert_group_pk):
The purpose of this task is to avoid computation-heavy check after each alert.
Should be delayed and invoked only for the last one.
"""
AlertGroupForAlertManager = apps.get_model("alerts", "AlertGroupForAlertManager")
AlertForAlertManager = apps.get_model("alerts", "AlertForAlertManager")
from apps.alerts.models import AlertForAlertManager, AlertGroupForAlertManager
alert_group = AlertGroupForAlertManager.objects.get(pk=alert_group_pk)

View file

@ -1,4 +1,3 @@
from django.apps import apps
from django.conf import settings
from common.custom_celery_tasks import shared_dedicated_queue_retry_task
@ -8,6 +7,7 @@ from common.custom_celery_tasks import shared_dedicated_queue_retry_task
autoretry_for=(Exception,), retry_backoff=True, max_retries=0 if settings.DEBUG else None
)
def resolve_by_last_step_task(alert_group_pk):
AlertGroup = apps.get_model("alerts", "AlertGroup")
from apps.alerts.models import AlertGroup
alert_group = AlertGroup.objects.get(pk=alert_group_pk)
alert_group.resolve_by_last_step()

View file

@ -1,4 +1,3 @@
from django.apps import apps
from django.conf import settings
from apps.alerts.signals import alert_group_update_log_report_signal
@ -11,8 +10,7 @@ from .task_logger import task_logger
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def send_update_log_report_signal(log_record_pk=None, alert_group_pk=None):
AlertGroup = apps.get_model("alerts", "AlertGroup")
AlertReceiveChannel = apps.get_model("alerts", "AlertReceiveChannel")
from apps.alerts.models import AlertGroup, AlertReceiveChannel
alert_group = AlertGroup.objects.get(id=alert_group_pk)
if alert_group.is_maintenance_incident:

View file

@ -1,6 +1,5 @@
import logging
from django.apps import apps
from django.conf import settings
from apps.alerts.signals import alert_group_update_resolution_note_signal
@ -14,8 +13,7 @@ logger = logging.getLogger(__name__)
)
def send_update_resolution_note_signal(alert_group_pk, resolution_note_pk):
"""Sends a signal to update messages associated with resolution note"""
AlertGroup = apps.get_model("alerts", "AlertGroup")
ResolutionNote = apps.get_model("alerts", "ResolutionNote")
from apps.alerts.models import AlertGroup, ResolutionNote
alert_group = AlertGroup.objects.filter(pk=alert_group_pk).first()
if alert_group is None:

View file

@ -1,12 +1,10 @@
from django.apps import apps
from apps.alerts.grafana_alerting_sync_manager.grafana_alerting_sync import GrafanaAlertingSyncManager
from common.custom_celery_tasks import shared_dedicated_queue_retry_task
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=10)
def sync_grafana_alerting_contact_points(alert_receive_channel_id):
AlertReceiveChannel = apps.get_model("alerts", "AlertReceiveChannel")
from apps.alerts.models import AlertReceiveChannel
alert_receive_channel = AlertReceiveChannel.objects_with_deleted.get(pk=alert_receive_channel_id)

View file

@ -1,4 +1,3 @@
from django.apps import apps
from django.conf import settings
from django.db import transaction
@ -13,8 +12,8 @@ from .task_logger import task_logger
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def unsilence_task(alert_group_pk):
AlertGroup = apps.get_model("alerts", "AlertGroup")
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroup, AlertGroupLogRecord
task_logger.info(f"Start unsilence_task for alert_group {alert_group_pk}")
with transaction.atomic():
try:

View file

@ -1,4 +1,3 @@
from django.apps import apps
from django.conf import settings
from common.custom_celery_tasks import shared_dedicated_queue_retry_task
@ -8,8 +7,9 @@ from common.custom_celery_tasks import shared_dedicated_queue_retry_task
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def wipe(alert_group_pk, user_pk):
AlertGroup = apps.get_model("alerts", "AlertGroup")
User = apps.get_model("user_management", "User")
from apps.alerts.models import AlertGroup
from apps.user_management.models import User
alert_group = AlertGroup.objects.filter(pk=alert_group_pk).first()
user = User.objects.filter(pk=user_pk).first()
alert_group.wipe_by_user(user)

View file

@ -1,6 +1,5 @@
from collections import OrderedDict
from django.apps import apps
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.core.exceptions import ValidationError as DjangoValidationError
@ -246,7 +245,8 @@ class AlertReceiveChannelTemplatesSerializer(EagerLoadingMixin, serializers.Mode
extra_kwargs = {"integration": {"required": True}}
def get_payload_example(self, obj):
AlertGroup = apps.get_model("alerts", "AlertGroup")
from apps.alerts.models import AlertGroup
if "alert_group_id" in self.context["request"].query_params:
alert_group_id = self.context["request"].query_params.get("alert_group_id")
try:

View file

@ -1,4 +1,3 @@
from django.apps import apps
from rest_framework import serializers
from apps.alerts.models import AlertReceiveChannel, ChannelFilter, EscalationChain
@ -95,7 +94,7 @@ class ChannelFilterSerializer(EagerLoadingMixin, serializers.ModelSerializer):
return None
def validate_slack_channel(self, slack_channel_id):
SlackChannel = apps.get_model("slack", "SlackChannel")
from apps.slack.models import SlackChannel
if slack_channel_id is not None:
slack_channel_id = slack_channel_id.upper()

View file

@ -1,6 +1,5 @@
from dataclasses import asdict
from django.apps import apps
from rest_framework import serializers
from apps.base.models import LiveSetting
@ -42,7 +41,8 @@ class OrganizationSerializer(EagerLoadingMixin, serializers.ModelSerializer):
]
def get_slack_channel(self, obj):
SlackChannel = apps.get_model("slack", "SlackChannel")
from apps.slack.models import SlackChannel
if obj.general_log_channel_id is None or obj.slack_team_identity is None:
return None
try:
@ -74,7 +74,8 @@ class CurrentOrganizationSerializer(OrganizationSerializer):
]
def get_banner(self, obj):
DynamicSetting = apps.get_model("base", "DynamicSetting")
from apps.base.models import DynamicSetting
banner = DynamicSetting.objects.get_or_create(
name="banner",
defaults={"json_value": {"title": None, "body": None}},

View file

@ -1,4 +1,3 @@
from django.apps import apps
from django.conf import settings
from rest_framework.response import Response
from rest_framework.views import APIView
@ -27,7 +26,8 @@ class FeaturesAPIView(APIView):
return Response(self._get_enabled_features(request))
def _get_enabled_features(self, request):
DynamicSetting = apps.get_model("base", "DynamicSetting")
from apps.base.models import DynamicSetting
enabled_features = []
if settings.FEATURE_SLACK_INTEGRATION_ENABLED:

View file

@ -1,6 +1,5 @@
from contextlib import suppress
from django.apps import apps
from rest_framework import status
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
@ -62,7 +61,8 @@ class GetTelegramVerificationCode(APIView):
def get(self, request):
organization = request.auth.organization
user = request.user
TelegramChannelVerificationCode = apps.get_model("telegram", "TelegramChannelVerificationCode")
from apps.telegram.models import TelegramChannelVerificationCode
with suppress(TelegramChannelVerificationCode.DoesNotExist):
existing_verification_code = organization.telegram_verification_code
existing_verification_code.delete()
@ -103,7 +103,8 @@ class SetGeneralChannel(APIView):
}
def post(self, request):
SlackChannel = apps.get_model("slack", "SlackChannel")
from apps.slack.models import SlackChannel
organization = request.auth.organization
slack_team_identity = organization.slack_team_identity
slack_channel_id = request.data["id"]

View file

@ -1,4 +1,3 @@
from django.apps import apps
from rest_framework import mixins, status, viewsets
from rest_framework.decorators import action
from rest_framework.permissions import IsAuthenticated
@ -32,7 +31,8 @@ class TelegramChannelViewSet(
serializer_class = TelegramToOrganizationConnectorSerializer
def get_queryset(self):
TelegramToOrganizationConnector = apps.get_model("telegram", "TelegramToOrganizationConnector")
from apps.telegram.models import TelegramToOrganizationConnector
return TelegramToOrganizationConnector.objects.filter(organization=self.request.user.organization)
@action(detail=True, methods=["post"])

View file

@ -1,7 +1,6 @@
import logging
import pytz
from django.apps import apps
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.db.utils import IntegrityError
@ -528,7 +527,8 @@ class UserView(
@action(detail=True, methods=["post"])
def unlink_telegram(self, request, pk) -> Response:
user = self.get_object()
TelegramToUserConnector = apps.get_model("telegram", "TelegramToUserConnector")
from apps.telegram.models import TelegramToUserConnector
try:
connector = TelegramToUserConnector.objects.get(user=user)
connector.delete()

View file

@ -3,7 +3,6 @@ import re
from urllib.parse import urlparse
import phonenumbers
from django.apps import apps
from phonenumbers import NumberParseException
from telegram import Bot
from twilio.base.exceptions import TwilioException
@ -14,17 +13,19 @@ from common.api_helpers.utils import create_engine_url
class LiveSettingProxy:
def __dir__(self):
LiveSetting = apps.get_model("base", "LiveSetting")
from apps.base.models import LiveSetting
return LiveSetting.AVAILABLE_NAMES
def __getattr__(self, item):
LiveSetting = apps.get_model("base", "LiveSetting")
from apps.base.models import LiveSetting
value = LiveSetting.get_setting(item)
return value
def __setattr__(self, key, value):
LiveSetting = apps.get_model("base", "LiveSetting")
from apps.base.models import LiveSetting
LiveSetting.objects.update_or_create(name=key, defaults={"value": value})

View file

@ -1,7 +1,7 @@
import logging
import sys
from django.apps import AppConfig, apps
from django.apps import AppConfig
from django.conf import settings
from django.db import OperationalError
@ -27,7 +27,8 @@ class GrafanaPluginConfig(AppConfig):
is_not_migration_script = any(startup_command in sys.argv for startup_command in STARTUP_COMMANDS)
if is_not_migration_script and settings.IS_OPEN_SOURCE:
try:
Organization = apps.get_model("user_management", "Organization")
from apps.user_management.models import Organization
has_existing_org = Organization.objects.first() is not None
# only enforce the following for new setups - if no organization exists in the database

View file

@ -1,7 +1,6 @@
import logging
from typing import Optional, Tuple
from django.apps import apps
from django.conf import settings
from django.utils import timezone
@ -45,7 +44,8 @@ def check_gcom_permission(token_string: str, context) -> GcomToken:
raise InvalidToken
if not organization:
DynamicSetting = apps.get_model("base", "DynamicSetting")
from apps.base.models import DynamicSetting
allow_signup = DynamicSetting.objects.get_or_create(
name="allow_plugin_organization_signup", defaults={"boolean_value": True}
)[0].boolean_value

View file

@ -2,7 +2,7 @@ import sys
from unittest.mock import patch
import pytest
from django.apps import apps
from django.apps import apps # noqa: I251
from django.conf import settings
from django.test import override_settings

View file

@ -2,7 +2,6 @@ import json
from unittest.mock import patch
import pytest
from django.apps import apps
from django.conf import settings
from django.test import override_settings
from django.urls import reverse
@ -148,7 +147,8 @@ def test_if_organization_does_not_exist_it_is_created(
url = reverse("grafana-plugin:self-hosted-install")
response = client.post(url, format="json", **make_self_hosted_install_header(GRAFANA_TOKEN))
Organization = apps.get_model("user_management", "Organization")
from apps.user_management.models import Organization
organization = Organization.objects.filter(stack_id=STACK_ID, org_id=ORG_ID).first()
assert mocked_grafana_api_client.called_once_with(api_url=GRAFANA_API_URL, api_token=GRAFANA_TOKEN)

View file

@ -1,4 +1,3 @@
from django.apps import apps
from django.conf import settings
from rest_framework.request import Request
from rest_framework.response import Response
@ -25,7 +24,8 @@ class StatusView(GrafanaHeadersMixin, APIView):
_, resp = GrafanaAPIClient(api_url=organization.grafana_url, api_token=organization.api_token).check_token()
token_ok = resp["connected"]
else:
DynamicSetting = apps.get_model("base", "DynamicSetting")
from apps.base.models import DynamicSetting
allow_signup = DynamicSetting.objects.get_or_create(
name="allow_plugin_organization_signup", defaults={"boolean_value": True}
)[0].boolean_value

View file

@ -1,6 +1,5 @@
import logging
from django.apps import apps
from django.conf import settings
from rest_framework import status
from rest_framework.request import Request
@ -37,7 +36,8 @@ class PluginSyncView(GrafanaHeadersMixin, APIView):
organization.save(update_fields=["api_token_status"])
if not organization:
DynamicSetting = apps.get_model("base", "DynamicSetting")
from apps.base.models import DynamicSetting
allow_signup = DynamicSetting.objects.get_or_create(
name="allow_plugin_organization_signup", defaults={"boolean_value": True}
)[0].boolean_value

View file

@ -1,7 +1,6 @@
from time import perf_counter
from celery.utils.log import get_task_logger
from django.apps import apps
from django.db import transaction
from django.utils import timezone
@ -12,14 +11,16 @@ logger = get_task_logger(__name__)
@shared_dedicated_queue_retry_task()
def integration_heartbeat_checkup(heartbeat_id: int) -> None:
IntegrationHeartBeat = apps.get_model("heartbeat", "IntegrationHeartBeat")
from apps.heartbeat.models import IntegrationHeartBeat
IntegrationHeartBeat.perform_heartbeat_check(heartbeat_id, integration_heartbeat_checkup.request.id)
@shared_dedicated_queue_retry_task()
def process_heartbeat_task(alert_receive_channel_pk):
start = perf_counter()
IntegrationHeartBeat = apps.get_model("heartbeat", "IntegrationHeartBeat")
from apps.heartbeat.models import IntegrationHeartBeat
with transaction.atomic():
heartbeats = IntegrationHeartBeat.objects.filter(
alert_receive_channel__pk=alert_receive_channel_pk,

View file

@ -1,7 +1,6 @@
import logging
from time import perf_counter
from django.apps import apps
from django.core import serializers
from django.core.cache import cache
from django.core.exceptions import PermissionDenied
@ -26,7 +25,8 @@ class AlertChannelDefiningMixin(object):
CACHE_SHORT_TERM_TIMEOUT = 5
def dispatch(self, *args, **kwargs):
AlertReceiveChannel = apps.get_model("alerts", "AlertReceiveChannel")
from apps.alerts.models import AlertReceiveChannel
logger.info("AlertChannelDefiningMixin started")
start = perf_counter()
alert_receive_channel = None
@ -83,7 +83,8 @@ class AlertChannelDefiningMixin(object):
return super(AlertChannelDefiningMixin, self).dispatch(*args, **kwargs)
def update_alert_receive_channel_cache(self):
AlertReceiveChannel = apps.get_model("alerts", "AlertReceiveChannel")
from apps.alerts.models import AlertReceiveChannel
logger.info("Caching alert receive channels from database.")
serialized = serializers.serialize("json", AlertReceiveChannel.objects.all())
# Caching forever, re-caching is managed by "obsolete key"

View file

@ -2,7 +2,6 @@ import logging
from abc import ABC, abstractmethod
from functools import wraps
from django.apps import apps
from django.core.cache import cache
from django.http import HttpRequest, HttpResponse
from django.views import View
@ -86,7 +85,8 @@ def ratelimit(group=None, key=None, rate=None, method=ALL, block=False, reason=N
def is_ratelimit_ignored(alert_receive_channel):
DynamicSetting = apps.get_model("base", "DynamicSetting")
from apps.base.models import DynamicSetting
integration_token_to_ignore_ratelimit = DynamicSetting.objects.get_or_create(
name="integration_tokens_to_ignore_ratelimit",
defaults={

View file

@ -3,7 +3,6 @@ import random
from celery import shared_task
from celery.utils.log import get_task_logger
from django.apps import apps
from django.conf import settings
from django.core.cache import cache
@ -25,8 +24,7 @@ logger.setLevel(logging.DEBUG)
max_retries=1 if settings.DEBUG else None,
)
def create_alertmanager_alerts(alert_receive_channel_pk, alert, is_demo=False, force_route_id=None):
AlertReceiveChannel = apps.get_model("alerts", "AlertReceiveChannel")
Alert = apps.get_model("alerts", "Alert")
from apps.alerts.models import Alert, AlertReceiveChannel
alert_receive_channel = AlertReceiveChannel.objects_with_deleted.get(pk=alert_receive_channel_pk)
if (
@ -84,8 +82,7 @@ def create_alert(
is_demo=False,
force_route_id=None,
):
AlertReceiveChannel = apps.get_model("alerts", "AlertReceiveChannel")
Alert = apps.get_model("alerts", "Alert")
from apps.alerts.models import Alert, AlertReceiveChannel
try:
alert_receive_channel = AlertReceiveChannel.objects.get(pk=alert_receive_channel_pk)
@ -144,7 +141,7 @@ def start_notify_about_integration_ratelimit(team_id, text, **kwargs):
)
def notify_about_integration_ratelimit_in_slack(organization_id, text, **kwargs):
# TODO: Review ratelimits
Organization = apps.get_model("user_management", "Organization")
from apps.user_management.models import Organization
try:
organization = Organization.objects.get(pk=organization_id)

View file

@ -2,7 +2,6 @@ import datetime
import random
import typing
from django.apps import apps
from django.core.cache import cache
from django.utils import timezone
@ -29,7 +28,8 @@ if typing.TYPE_CHECKING:
def get_organization_ids_from_db():
AlertReceiveChannel = apps.get_model("alerts", "AlertReceiveChannel")
from apps.alerts.models import AlertReceiveChannel
# get only not deleted organizations that have integrations
organizations_ids = (
AlertReceiveChannel.objects.filter(organization__deleted_at__isnull=True)

View file

@ -1,6 +1,5 @@
import typing
from django.apps import apps
from django.conf import settings
from django.core.cache import cache
from django.db.models import Count, Q
@ -78,9 +77,8 @@ def calculate_and_cache_metrics(organization_id, force=False):
"""
Calculate integrations metrics for organization.
"""
AlertGroup = apps.get_model("alerts", "AlertGroup")
AlertReceiveChannel = apps.get_model("alerts", "AlertReceiveChannel")
Organization = apps.get_model("user_management", "Organization")
from apps.alerts.models import AlertGroup, AlertReceiveChannel
from apps.user_management.models import Organization
ONE_HOUR = 3600
TWO_HOURS = 7200
@ -165,8 +163,8 @@ def calculate_and_cache_user_was_notified_metric(organization_id):
"""
Calculate metric "user_was_notified_of_alert_groups" for organization.
"""
UserNotificationPolicyLogRecord = apps.get_model("base", "UserNotificationPolicyLogRecord")
Organization = apps.get_model("user_management", "Organization")
from apps.base.models import UserNotificationPolicyLogRecord
from apps.user_management.models import Organization
TWO_HOURS = 7200

View file

@ -3,7 +3,6 @@ import random
from urllib.parse import urljoin
import requests
from django.apps import apps
from django.conf import settings
from rest_framework import status
@ -15,7 +14,7 @@ logger = logging.getLogger(__name__)
def setup_heartbeat_integration(name=None):
"""Setup Grafana Cloud OnCall heartbeat integration."""
CloudHeartbeat = apps.get_model("oss_installation", "CloudHeartbeat")
from apps.oss_installation.models import CloudHeartbeat
cloud_heartbeat = None
api_token = live_settings.GRAFANA_CLOUD_ONCALL_TOKEN
@ -57,8 +56,8 @@ def setup_heartbeat_integration(name=None):
def send_cloud_heartbeat():
CloudHeartbeat = apps.get_model("oss_installation", "CloudHeartbeat")
CloudConnector = apps.get_model("oss_installation", "CloudConnector")
from apps.oss_installation.models import CloudConnector, CloudHeartbeat
"""Send heartbeat to Grafana Cloud OnCall integration."""
if not live_settings.GRAFANA_CLOUD_ONCALL_HEARTBEAT_ENABLED or not live_settings.GRAFANA_CLOUD_ONCALL_TOKEN:
logger.info(

View file

@ -1,5 +1,4 @@
from celery.utils.log import get_task_logger
from django.apps import apps
from django.utils import timezone
from apps.base.utils import live_settings
@ -13,7 +12,7 @@ logger = get_task_logger(__name__)
@shared_dedicated_queue_retry_task()
def send_usage_stats_report():
logger.info("Start send_usage_stats_report")
OssInstallation = apps.get_model("oss_installation", "OssInstallation")
from apps.oss_installation.models import OssInstallation
installation = OssInstallation.objects.get_or_create()[0]
enabled = live_settings.SEND_ANONYMOUS_USAGE_STATS
@ -35,7 +34,8 @@ def send_cloud_heartbeat_task():
@shared_dedicated_queue_retry_task()
def sync_users_with_cloud():
CloudConnector = apps.get_model("oss_installation", "CloudConnector")
from apps.oss_installation.models import CloudConnector
logger.info("Start sync_users_with_cloud")
if live_settings.GRAFANA_CLOUD_NOTIFICATIONS_ENABLED:
connector = CloudConnector.objects.first()

View file

@ -3,7 +3,6 @@ import platform
from dataclasses import asdict, dataclass
import requests
from django.apps import apps
from django.conf import settings
from django.db.models import Sum
@ -27,7 +26,8 @@ class UsageStatsReport:
class UsageStatsService:
def get_usage_stats_report(self):
OssInstallation = apps.get_model("oss_installation", "OssInstallation")
from apps.oss_installation.models import OssInstallation
metrics = {}
metrics["active_users_count"] = active_oss_users_count()
total_alert_groups = AlertGroupCounter.objects.aggregate(Sum("value")).get("value__sum", None)

View file

@ -1,7 +1,6 @@
import logging
from urllib.parse import urljoin
from django.apps import apps
from django.utils import timezone
from apps.oss_installation import constants as oss_constants
@ -14,10 +13,9 @@ def active_oss_users_count():
"""
active_oss_users_count returns count of active users of oss installation.
"""
OnCallSchedule = apps.get_model("schedules", "OnCallSchedule")
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
EscalationPolicy = apps.get_model("alerts", "EscalationPolicy")
UserNotificationPolicyLogRecord = apps.get_model("base", "UserNotificationPolicyLogRecord")
from apps.alerts.models import AlertGroupLogRecord, EscalationPolicy
from apps.base.models import UserNotificationPolicyLogRecord
from apps.schedules.models import OnCallSchedule
# Take logs for previous 24 hours
start = timezone.now() - timezone.timedelta(hours=24)

View file

@ -2,7 +2,6 @@ import logging
from typing import Optional
import requests
from django.apps import apps
from django.conf import settings
from apps.alerts.incident_appearance.renderers.phone_call_renderer import AlertGroupPhoneCallRenderer
@ -40,7 +39,8 @@ class PhoneBackend:
notify_by_call makes a notification call to a user using configured phone provider or cloud notifications.
It handles all business logic related to the call.
"""
UserNotificationPolicyLogRecord = apps.get_model("base", "UserNotificationPolicyLogRecord")
from apps.base.models import UserNotificationPolicyLogRecord
log_record_error_code = None
renderer = AlertGroupPhoneCallRenderer(alert_group)
@ -145,7 +145,8 @@ class PhoneBackend:
SMS itself is handled by phone provider.
"""
UserNotificationPolicyLogRecord = apps.get_model("base", "UserNotificationPolicyLogRecord")
from apps.base.models import UserNotificationPolicyLogRecord
log_record_error_code = None
renderer = AlertGroupSmsRenderer(alert_group)

View file

@ -1,4 +1,3 @@
from django.apps import apps
from rest_framework import fields, serializers
from apps.alerts.models import AlertReceiveChannel, ChannelFilter, EscalationChain
@ -81,7 +80,7 @@ class BaseChannelFilterSerializer(OrderedModelSerializer):
return validated_data
def _validate_slack_channel_id(self, slack_channel_id):
SlackChannel = apps.get_model("slack", "SlackChannel")
from apps.slack.models import SlackChannel
if slack_channel_id is not None:
slack_channel_id = slack_channel_id.upper()
@ -94,7 +93,8 @@ class BaseChannelFilterSerializer(OrderedModelSerializer):
return slack_channel_id
def _validate_telegram_channel(self, telegram_channel_id):
TelegramToOrganizationConnector = apps.get_model("telegram", "TelegramToOrganizationConnector")
from apps.telegram.models import TelegramToOrganizationConnector
if telegram_channel_id is not None:
organization = self.context["request"].auth.organization
try:

View file

@ -1,6 +1,5 @@
import datetime
from django.apps import apps
from django.utils import timezone
from rest_framework import serializers
@ -39,7 +38,7 @@ class ScheduleBaseSerializer(serializers.ModelSerializer):
return validated_data
def validate_slack(self, slack_field):
SlackChannel = apps.get_model("slack", "SlackChannel")
from apps.slack.models import SlackChannel
slack_channel_id = slack_field.get("channel_id")
user_group_id = slack_field.get("user_group_id")

View file

@ -3,7 +3,6 @@ import re
import typing
from collections import defaultdict
from django.apps import apps
from icalendar import Calendar, Event
from recurring_ical_events import UnfoldableCalendar, compare_greater, is_event, time_span_contains_event
@ -143,7 +142,7 @@ class AmixrRecurringIcalEventsAdapter(IcalService):
"""
Calculate start and end datetime
"""
CustomOnCallShift = apps.get_model("schedules", "CustomOnCallShift")
from apps.schedules.models import CustomOnCallShift
start = event[ICAL_DATETIME_START].dt
end = event[ICAL_DATETIME_END].dt

View file

@ -9,7 +9,6 @@ from typing import TYPE_CHECKING
import pytz
import requests
from django.apps import apps
from django.db.models import Q
from django.utils import timezone
from icalendar import Calendar
@ -38,7 +37,7 @@ This is a hack to allow us to load models for type checking without circular dep
This module likely needs to refactored to be part of the OnCallSchedule module.
"""
if TYPE_CHECKING:
from apps.schedules.models import CustomOnCallShift, OnCallSchedule
from apps.schedules.models import OnCallSchedule
from apps.schedules.models.on_call_schedule import OnCallScheduleQuerySet
from apps.user_management.models import Organization, User
from apps.user_management.models.user import UserQuerySet
@ -142,7 +141,7 @@ def list_of_oncall_shifts_from_ical(
}
]
"""
OnCallSchedule = apps.get_model("schedules", "OnCallSchedule")
from apps.schedules.models import OnCallSchedule
# get list of iCalendars from current iCal files. If there is more than one calendar, primary calendar will always
# be the first
@ -279,7 +278,7 @@ def list_of_empty_shifts_in_schedule(
) -> EmptyShifts:
# Calculate lookup window in schedule's tz
# If we can't get tz from ical use UTC
OnCallSchedule = apps.get_model("schedules", "OnCallSchedule")
from apps.schedules.models import OnCallSchedule
calendars = schedule.get_icalendars()
empty_shifts: EmptyShifts = []
@ -495,8 +494,9 @@ def parse_event_uid(string: str):
if source is not None:
source = int(source)
OnCallShift: "CustomOnCallShift" = apps.get_model("schedules", "CustomOnCallShift")
source_verbal = OnCallShift.SOURCE_CHOICES[source][1]
from apps.schedules.models import CustomOnCallShift
source_verbal = CustomOnCallShift.SOURCE_CHOICES[source][1]
return pk, source_verbal

View file

@ -8,7 +8,6 @@ from uuid import uuid4
import pytz
from dateutil import relativedelta
from django.apps import apps
from django.conf import settings
from django.core.validators import MinLengthValidator
from django.db import models, transaction
@ -623,7 +622,8 @@ class CustomOnCallShift(models.Model):
return pytz.timezone(time_zone).localize(start_naive, is_dst=None)
def get_rolling_users(self):
User = apps.get_model("user_management", "User")
from apps.user_management.models import User
all_users_pks = set()
users_queue = []
if self.rolling_users is not None:

View file

@ -7,7 +7,6 @@ from enum import Enum
import icalendar
import pytz
from django.apps import apps
from django.conf import settings
from django.core.validators import MinLengthValidator
from django.db import models
@ -819,7 +818,8 @@ class OnCallSchedule(PolymorphicModel):
result["team"] = "General"
if self.organization.slack_team_identity:
if self.channel:
SlackChannel = apps.get_model("slack", "SlackChannel")
from apps.slack.models import SlackChannel
sti = self.organization.slack_team_identity
slack_channel = SlackChannel.objects.filter(slack_team_identity=sti, slack_id=self.channel).first()
if slack_channel:

View file

@ -1,5 +1,4 @@
from celery.utils.log import get_task_logger
from django.apps import apps
from common.custom_celery_tasks import shared_dedicated_queue_retry_task
@ -10,7 +9,7 @@ task_logger = get_task_logger(__name__)
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=1)
def drop_cached_ical_task(schedule_pk):
OnCallSchedule = apps.get_model("schedules", "OnCallSchedule")
from apps.schedules.models import OnCallSchedule
task_logger.info(f"Start drop_cached_ical_task for schedule {schedule_pk}")
try:
@ -26,7 +25,7 @@ def drop_cached_ical_task(schedule_pk):
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=1)
def drop_cached_ical_for_custom_events_for_organization(organization_id):
OnCallScheduleCalendar = apps.get_model("schedules", "OnCallScheduleCalendar")
from apps.schedules.models import OnCallScheduleCalendar
for schedule in OnCallScheduleCalendar.objects.filter(organization_id=organization_id):
drop_cached_ical_task.apply_async(

View file

@ -1,6 +1,5 @@
import pytz
from celery.utils.log import get_task_logger
from django.apps import apps
from django.core.cache import cache
from django.utils import timezone
@ -14,7 +13,7 @@ task_logger = get_task_logger(__name__)
@shared_dedicated_queue_retry_task()
def start_check_empty_shifts_in_schedule():
OnCallSchedule = apps.get_model("schedules", "OnCallSchedule")
from apps.schedules.models import OnCallSchedule
task_logger.info("Start start_notify_about_empty_shifts_in_schedule")
@ -28,7 +27,7 @@ def start_check_empty_shifts_in_schedule():
@shared_dedicated_queue_retry_task()
def check_empty_shifts_in_schedule(schedule_pk):
OnCallSchedule = apps.get_model("schedules", "OnCallSchedule")
from apps.schedules.models import OnCallSchedule
task_logger.info(f"Start check_empty_shifts_in_schedule {schedule_pk}")
@ -46,13 +45,13 @@ def check_empty_shifts_in_schedule(schedule_pk):
@shared_dedicated_queue_retry_task()
def start_notify_about_empty_shifts_in_schedule():
OnCallSchedule = apps.get_model("schedules", "OnCallScheduleICal")
from apps.schedules.models import OnCallScheduleICal
task_logger.info("Start start_notify_about_empty_shifts_in_schedule")
today = timezone.now().date()
week_ago = today - timezone.timedelta(days=7)
schedules = OnCallSchedule.objects.filter(
schedules = OnCallScheduleICal.objects.filter(
empty_shifts_report_sent_at__lte=week_ago,
channel__isnull=False,
)
@ -65,7 +64,7 @@ def start_notify_about_empty_shifts_in_schedule():
@shared_dedicated_queue_retry_task()
def notify_about_empty_shifts_in_schedule(schedule_pk):
OnCallSchedule = apps.get_model("schedules", "OnCallSchedule")
from apps.schedules.models import OnCallSchedule
task_logger.info(f"Start notify_about_empty_shifts_in_schedule {schedule_pk}")

View file

@ -1,6 +1,5 @@
import pytz
from celery.utils.log import get_task_logger
from django.apps import apps
from django.core.cache import cache
from django.utils import timezone
@ -13,7 +12,7 @@ task_logger = get_task_logger(__name__)
@shared_dedicated_queue_retry_task()
def start_check_gaps_in_schedule():
OnCallSchedule = apps.get_model("schedules", "OnCallSchedule")
from apps.schedules.models import OnCallSchedule
task_logger.info("Start start_check_gaps_in_schedule")
@ -27,7 +26,7 @@ def start_check_gaps_in_schedule():
@shared_dedicated_queue_retry_task()
def check_gaps_in_schedule(schedule_pk):
OnCallSchedule = apps.get_model("schedules", "OnCallSchedule")
from apps.schedules.models import OnCallSchedule
task_logger.info(f"Start check_gaps_in_schedule {schedule_pk}")
@ -45,7 +44,7 @@ def check_gaps_in_schedule(schedule_pk):
@shared_dedicated_queue_retry_task()
def start_notify_about_gaps_in_schedule():
OnCallSchedule = apps.get_model("schedules", "OnCallSchedule")
from apps.schedules.models import OnCallSchedule
task_logger.info("Start start_notify_about_gaps_in_schedule")
@ -64,7 +63,7 @@ def start_notify_about_gaps_in_schedule():
@shared_dedicated_queue_retry_task()
def notify_about_gaps_in_schedule(schedule_pk):
OnCallSchedule = apps.get_model("schedules", "OnCallSchedule")
from apps.schedules.models import OnCallSchedule
task_logger.info(f"Start notify_about_gaps_in_schedule {schedule_pk}")

View file

@ -1,5 +1,4 @@
from celery.utils.log import get_task_logger
from django.apps import apps
from apps.alerts.tasks import notify_ical_schedule_shift
from apps.schedules.ical_utils import is_icals_equal
@ -12,7 +11,7 @@ task_logger = get_task_logger(__name__)
@shared_dedicated_queue_retry_task()
def start_refresh_ical_files():
OnCallSchedule = apps.get_model("schedules", "OnCallSchedule")
from apps.schedules.models import OnCallSchedule
task_logger.info("Start refresh ical files")
@ -26,7 +25,7 @@ def start_refresh_ical_files():
@shared_dedicated_queue_retry_task()
def start_refresh_ical_final_schedules():
OnCallSchedule = apps.get_model("schedules", "OnCallSchedule")
from apps.schedules.models import OnCallSchedule
task_logger.info("Start refresh ical final schedules")
@ -37,7 +36,7 @@ def start_refresh_ical_final_schedules():
@shared_dedicated_queue_retry_task()
def refresh_ical_file(schedule_pk):
OnCallSchedule = apps.get_model("schedules", "OnCallSchedule")
from apps.schedules.models import OnCallSchedule
task_logger.info(f"Refresh ical files for schedule {schedule_pk}")
@ -89,7 +88,8 @@ def refresh_ical_file(schedule_pk):
@shared_dedicated_queue_retry_task()
def refresh_ical_final_schedule(schedule_pk):
OnCallSchedule = apps.get_model("schedules", "OnCallSchedule")
from apps.schedules.models import OnCallSchedule
task_logger.info(f"Refresh ical final schedule {schedule_pk}")
try:

View file

@ -1,7 +1,5 @@
import logging
from django.apps import apps
from apps.slack.constants import SLACK_RATE_LIMIT_DELAY
from apps.slack.slack_client import SlackClientWithErrorHandling
from apps.slack.slack_client.exceptions import (
@ -24,8 +22,8 @@ class AlertGroupSlackService:
def update_alert_group_slack_message(self, alert_group):
logger.info(f"Started _update_slack_message for alert_group {alert_group.pk}")
SlackMessage = apps.get_model("slack", "SlackMessage")
AlertReceiveChannel = apps.get_model("alerts", "AlertReceiveChannel")
from apps.alerts.models import AlertReceiveChannel
from apps.slack.models import SlackMessage
slack_message = alert_group.slack_message
attachments = alert_group.render_slack_attachments()
@ -86,7 +84,8 @@ class AlertGroupSlackService:
if alert_group.channel.is_rate_limited_in_slack:
return
SlackMessage = apps.get_model("slack", "SlackMessage")
from apps.slack.models import SlackMessage
slack_message = alert_group.get_slack_message()
channel_id = slack_message.channel_id
try:

View file

@ -2,7 +2,6 @@ import logging
import time
import uuid
from django.apps import apps
from django.db import models
from apps.slack.slack_client import SlackClientWithErrorHandling
@ -111,7 +110,8 @@ class SlackMessage(models.Model):
return self.cached_permalink
def send_slack_notification(self, user, alert_group, notification_policy):
UserNotificationPolicyLogRecord = apps.get_model("base", "UserNotificationPolicyLogRecord")
from apps.base.models import UserNotificationPolicyLogRecord
slack_message = alert_group.get_slack_message()
user_verbal = user.get_username_with_slack_verbal(mention=True)

View file

@ -1,6 +1,5 @@
import logging
from django.apps import apps
from django.db import models
from django.db.models import JSONField
@ -45,7 +44,8 @@ class SlackTeamIdentity(models.Model):
def update_oauth_fields(self, user, organization, reinstall_data):
logger.info(f"updated oauth_fields for sti {self.pk}")
SlackUserIdentity = apps.get_model("slack", "SlackUserIdentity")
from apps.slack.models import SlackUserIdentity
organization.slack_team_identity = self
organization.save(update_fields=["slack_team_identity"])
slack_user_identity, _ = SlackUserIdentity.objects.get_or_create(

View file

@ -1,7 +1,6 @@
import logging
from celery.utils.log import get_task_logger
from django.apps import apps
from django.conf import settings
from apps.alerts.constants import ActionSource
@ -20,7 +19,7 @@ def on_create_alert_slack_representative_async(alert_pk):
"""
It's asynced in order to prevent Slack downtime causing issues with SMS and other destinations.
"""
Alert = apps.get_model("alerts", "Alert")
from apps.alerts.models import Alert
alert = (
Alert.objects.filter(pk=alert_pk)
@ -53,7 +52,7 @@ def on_create_alert_slack_representative_async(alert_pk):
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def on_alert_group_action_triggered_async(log_record_id):
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroupLogRecord
logger.debug(f"SLACK representative: get log record {log_record_id}")
@ -84,7 +83,8 @@ def on_alert_group_action_triggered_async(log_record_id):
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def on_alert_group_update_log_report_async(alert_group_id):
AlertGroup = apps.get_model("alerts", "AlertGroup")
from apps.alerts.models import AlertGroup
alert_group = AlertGroup.objects.get(pk=alert_group_id)
logger.debug(f"Start on_alert_group_update_log_report for alert_group {alert_group_id}")
organization = alert_group.channel.organization
@ -113,7 +113,8 @@ class AlertGroupSlackRepresentative(AlertGroupAbstractRepresentative):
@classmethod
def on_create_alert(cls, **kwargs):
Alert = apps.get_model("alerts", "Alert")
from apps.alerts.models import Alert
alert = kwargs["alert"]
if isinstance(alert, Alert):
alert_id = alert.pk
@ -141,7 +142,8 @@ class AlertGroupSlackRepresentative(AlertGroupAbstractRepresentative):
@classmethod
def on_alert_group_action_triggered(cls, **kwargs):
logger.debug("Received alert_group_action_triggered signal in SLACK representative")
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroupLogRecord
log_record = kwargs["log_record"]
action_source = kwargs.get("action_source")
force_sync = kwargs.get("force_sync", False)
@ -157,7 +159,8 @@ class AlertGroupSlackRepresentative(AlertGroupAbstractRepresentative):
@classmethod
def on_alert_group_update_log_report(cls, **kwargs):
AlertGroup = apps.get_model("alerts", "AlertGroup")
from apps.alerts.models import AlertGroup
alert_group = kwargs["alert_group"]
if isinstance(alert_group, AlertGroup):

View file

@ -1,7 +1,5 @@
import json
from django.apps import apps
from apps.api.permissions import RBACPermission
from apps.slack.scenarios import scenario_step
@ -59,7 +57,7 @@ class OpenAlertAppearanceDialogStep(AlertGroupActionsMixin, scenario_step.Scenar
class UpdateAppearanceStep(scenario_step.ScenarioStep):
def process_scenario(self, slack_user_identity, slack_team_identity, payload):
AlertGroup = apps.get_model("alerts", "AlertGroup")
from apps.alerts.models import AlertGroup
private_metadata = json.loads(payload["view"]["private_metadata"])
alert_group_pk = private_metadata["alert_group_pk"]

View file

@ -3,7 +3,6 @@ import logging
from contextlib import suppress
from datetime import datetime
from django.apps import apps
from django.core.cache import cache
from django.utils import timezone
from jinja2 import TemplateError
@ -220,7 +219,7 @@ class InviteOtherPersonToIncident(AlertGroupActionsMixin, scenario_step.Scenario
REQUIRED_PERMISSIONS = [RBACPermission.Permissions.CHATOPS_WRITE]
def process_scenario(self, slack_user_identity, slack_team_identity, payload):
User = apps.get_model("user_management", "User")
from apps.user_management.models import User
alert_group = self.get_alert_group(slack_team_identity, payload)
if not self.is_authorized(alert_group):
@ -525,7 +524,8 @@ class CustomButtonProcessStep(AlertGroupActionsMixin, scenario_step.ScenarioStep
REQUIRED_PERMISSIONS = [RBACPermission.Permissions.CHATOPS_WRITE]
def process_scenario(self, slack_user_identity, slack_team_identity, payload):
CustomButtom = apps.get_model("alerts", "CustomButton")
from apps.alerts.models import CustomButton
alert_group = self.get_alert_group(slack_team_identity, payload)
if not self.is_authorized(alert_group):
self.open_unauthorized_warning(payload)
@ -534,8 +534,8 @@ class CustomButtonProcessStep(AlertGroupActionsMixin, scenario_step.ScenarioStep
custom_button_pk = payload["actions"][0]["name"].split("_")[1]
alert_group_pk = payload["actions"][0]["name"].split("_")[2]
try:
CustomButtom.objects.get(pk=custom_button_pk)
except CustomButtom.DoesNotExist:
CustomButton.objects.get(pk=custom_button_pk)
except CustomButton.DoesNotExist:
warning_text = "Oops! This button was deleted"
self.open_warning_window(payload, warning_text=warning_text)
self.alert_group_slack_service.update_alert_group_slack_message(alert_group)
@ -658,7 +658,8 @@ class UnAcknowledgeGroupStep(AlertGroupActionsMixin, scenario_step.ScenarioStep)
alert_group.un_acknowledge_by_user(self.user, action_source=ActionSource.SLACK)
def process_signal(self, log_record):
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroupLogRecord
alert_group = log_record.alert_group
logger.debug(f"Started process_signal in UnAcknowledgeGroupStep for alert_group {alert_group.pk}")
@ -711,7 +712,8 @@ class UnAcknowledgeGroupStep(AlertGroupActionsMixin, scenario_step.ScenarioStep)
class AcknowledgeConfirmationStep(AcknowledgeGroupStep):
def process_scenario(self, slack_user_identity, slack_team_identity, payload):
AlertGroup = apps.get_model("alerts", "AlertGroup")
from apps.alerts.models import AlertGroup
alert_group_id = payload["actions"][0]["value"].split("_")[1]
alert_group = AlertGroup.objects.get(pk=alert_group_id)
channel = payload["channel"]["id"]
@ -762,8 +764,8 @@ class AcknowledgeConfirmationStep(AcknowledgeGroupStep):
)
def process_signal(self, log_record):
Organization = apps.get_model("user_management", "Organization")
SlackMessage = apps.get_model("slack", "SlackMessage")
from apps.slack.models import SlackMessage
from apps.user_management.models import Organization
alert_group = log_record.alert_group
channel_id = alert_group.slack_message.channel_id
@ -939,7 +941,7 @@ class UpdateLogReportMessageStep(scenario_step.ScenarioStep):
self.update_log_message(alert_group)
def post_log_message(self, alert_group):
SlackMessage = apps.get_model("slack", "SlackMessage")
from apps.slack.models import SlackMessage
slack_message = alert_group.get_slack_message()

View file

@ -1,5 +1,4 @@
import humanize
from django.apps import apps
from apps.slack.scenarios import scenario_step
@ -10,7 +9,8 @@ class EscalationDeliveryStep(scenario_step.ScenarioStep):
"""
def get_user_notification_message_for_thread_for_usergroup(self, user, notification_policy):
UserNotificationPolicy = apps.get_model("base", "UserNotificationPolicy")
from apps.base.models import UserNotificationPolicy
notification_channel = notification_policy.notify_by
notification_step = notification_policy.step
user_verbal = user.get_username_with_slack_verbal()

View file

@ -1,7 +1,5 @@
import json
from django.apps import apps
from apps.alerts.paging import check_user_availability, direct_paging, unpage_user
from apps.slack.scenarios import scenario_step
from apps.slack.scenarios.paging import (
@ -197,7 +195,7 @@ def render_dialog(alert_group):
def _get_selected_user_from_payload(payload):
User = apps.get_model("user_management", "User")
from apps.user_management.models import User
try:
selected_user_id = payload["actions"][0]["value"] # "remove" button
@ -216,7 +214,7 @@ def _get_selected_user_from_payload(payload):
def _get_selected_schedule_from_payload(payload):
OnCallSchedule = apps.get_model("schedules", "OnCallSchedule")
from apps.schedules.models import OnCallSchedule
input_id_prefix = json.loads(payload["view"]["private_metadata"])["input_id_prefix"]
selected_schedule_id = _get_select_field_value(
@ -227,7 +225,8 @@ def _get_selected_schedule_from_payload(payload):
def _get_alert_group_from_payload(payload):
AlertGroup = apps.get_model("alerts", "AlertGroup")
from apps.alerts.models import AlertGroup
alert_group_pk = json.loads(payload["view"]["private_metadata"])[ALERT_GROUP_DATA_KEY]
return AlertGroup.objects.get(pk=alert_group_pk)

View file

@ -1,7 +1,6 @@
import json
from uuid import uuid4
from django.apps import apps
from django.conf import settings
from apps.alerts.models import AlertReceiveChannel
@ -62,7 +61,7 @@ class FinishCreateIncidentFromSlashCommand(scenario_step.ScenarioStep):
"""
def process_scenario(self, slack_user_identity, slack_team_identity, payload):
Alert = apps.get_model("alerts", "Alert")
from apps.alerts.models import Alert
title = _get_title_from_payload(payload)
message = _get_message_from_payload(payload)
@ -345,7 +344,8 @@ def _get_organization_select(slack_team_identity, slack_user_identity, value, in
def _get_selected_org_from_payload(payload, input_id_prefix):
Organization = apps.get_model("user_management", "Organization")
from apps.user_management.models import Organization
selected_org_id = payload["view"]["state"]["values"][input_id_prefix + MANUAL_INCIDENT_ORG_SELECT_ID][
OnOrgChange.routing_uid()
]["selected_option"]["value"]
@ -401,7 +401,8 @@ def _get_team_select(slack_user_identity, organization, value, input_id_prefix):
def _get_selected_team_from_payload(payload, input_id_prefix):
Team = apps.get_model("user_management", "Team")
from apps.user_management.models import Team
selected_team_id = payload["view"]["state"]["values"][input_id_prefix + MANUAL_INCIDENT_TEAM_SELECT_ID][
OnTeamChange.routing_uid()
]["selected_option"]["value"]
@ -446,7 +447,8 @@ def _get_route_select(integration, value, input_id_prefix):
def _get_selected_route_from_payload(payload, input_id_prefix):
ChannelFilter = apps.get_model("alerts", "ChannelFilter")
from apps.alerts.models import ChannelFilter
selected_org_id = payload["view"]["state"]["values"][input_id_prefix + MANUAL_INCIDENT_ROUTE_SELECT_ID][
OnRouteChange.routing_uid()
]["selected_option"]["value"]

View file

@ -1,13 +1,10 @@
from django.apps import apps
from apps.slack.scenarios import scenario_step
from apps.slack.slack_client.exceptions import SlackAPIException, SlackAPITokenException
class NotificationDeliveryStep(scenario_step.ScenarioStep):
def process_signal(self, log_record):
UserNotificationPolicy = apps.get_model("base", "UserNotificationPolicy")
UserNotificationPolicyLogRecord = apps.get_model("base", "UserNotificationPolicyLogRecord")
from apps.base.models import UserNotificationPolicy, UserNotificationPolicyLogRecord
user = log_record.author
alert_group = log_record.alert_group

View file

@ -1,7 +1,6 @@
import json
from uuid import uuid4
from django.apps import apps
from django.conf import settings
from apps.alerts.models import AlertReceiveChannel, EscalationChain
@ -474,7 +473,8 @@ def _get_select_field_value(payload, prefix_id, routing_uid, field_id):
def _get_selected_org_from_payload(payload, input_id_prefix, slack_team_identity, slack_user_identity):
Organization = apps.get_model("user_management", "Organization")
from apps.user_management.models import Organization
selected_org_id = _get_select_field_value(
payload, input_id_prefix, OnPagingOrgChange.routing_uid(), DIRECT_PAGING_ORG_SELECT_ID
)
@ -823,7 +823,8 @@ def _get_availability_warnings_view(warnings, organization, user, callback_id, p
def _get_selected_team_from_payload(payload, input_id_prefix):
Team = apps.get_model("user_management", "Team")
from apps.user_management.models import Team
selected_team_id = _get_select_field_value(
payload, input_id_prefix, OnPagingTeamChange.routing_uid(), DIRECT_PAGING_TEAM_SELECT_ID
)
@ -850,7 +851,8 @@ def _get_additional_responders_checked_from_payload(payload, input_id_prefix):
def _get_selected_user_from_payload(payload, input_id_prefix):
User = apps.get_model("user_management", "User")
from apps.user_management.models import User
selected_user_id = _get_select_field_value(
payload, input_id_prefix, OnPagingUserChange.routing_uid(), DIRECT_PAGING_USER_SELECT_ID
)
@ -860,7 +862,8 @@ def _get_selected_user_from_payload(payload, input_id_prefix):
def _get_selected_schedule_from_payload(payload, input_id_prefix):
OnCallSchedule = apps.get_model("schedules", "OnCallSchedule")
from apps.schedules.models import OnCallSchedule
selected_schedule_id = _get_select_field_value(
payload, input_id_prefix, OnPagingScheduleChange.routing_uid(), DIRECT_PAGING_SCHEDULE_SELECT_ID
)

View file

@ -2,7 +2,6 @@ import datetime
import json
import logging
from django.apps import apps
from django.db.models import Q
from apps.api.permissions import RBACPermission
@ -25,10 +24,8 @@ class AddToResolutionNoteStep(scenario_step.ScenarioStep):
]
def process_scenario(self, slack_user_identity, slack_team_identity, payload):
SlackMessage = apps.get_model("slack", "SlackMessage")
ResolutionNoteSlackMessage = apps.get_model("alerts", "ResolutionNoteSlackMessage")
ResolutionNote = apps.get_model("alerts", "ResolutionNote")
SlackUserIdentity = apps.get_model("slack", "SlackUserIdentity")
from apps.alerts.models import ResolutionNote, ResolutionNoteSlackMessage
from apps.slack.models import SlackMessage, SlackUserIdentity
try:
channel_id = payload["channel"]["id"]
@ -216,7 +213,8 @@ class UpdateResolutionNoteStep(scenario_step.ScenarioStep):
self.remove_resolution_note_reaction(resolution_note_slack_message)
def post_or_update_resolution_note_in_thread(self, resolution_note):
ResolutionNoteSlackMessage = apps.get_model("alerts", "ResolutionNoteSlackMessage")
from apps.alerts.models import ResolutionNoteSlackMessage
resolution_note_slack_message = resolution_note.resolution_note_slack_message
alert_group = resolution_note.alert_group
alert_group_slack_message = alert_group.slack_message
@ -378,7 +376,8 @@ class ResolutionNoteModalStep(AlertGroupActionsMixin, scenario_step.ScenarioStep
def process_scenario(self, slack_user_identity, slack_team_identity, payload, data=None):
if data:
# Argument "data" is used when step is called from other step, e.g. AddRemoveThreadMessageStep
AlertGroup = apps.get_model("alerts", "AlertGroup")
from apps.alerts.models import AlertGroup
alert_group = AlertGroup.objects.get(pk=data["alert_group_pk"])
else:
# Handle "Add Resolution notes" button click
@ -449,7 +448,8 @@ class ResolutionNoteModalStep(AlertGroupActionsMixin, scenario_step.ScenarioStep
)
def get_resolution_notes_blocks(self, alert_group, resolution_note_window_action, action_resolve):
ResolutionNote = apps.get_model("alerts", "ResolutionNote")
from apps.alerts.models import ResolutionNote
blocks = []
other_resolution_notes = alert_group.resolution_notes.filter(~Q(source=ResolutionNote.Source.SLACK))
@ -675,9 +675,8 @@ class ReadEditPostmortemStep(ResolutionNoteModalStep):
class AddRemoveThreadMessageStep(UpdateResolutionNoteStep, scenario_step.ScenarioStep):
def process_scenario(self, slack_user_identity, slack_team_identity, payload):
AlertGroup = apps.get_model("alerts", "AlertGroup")
ResolutionNoteSlackMessage = apps.get_model("alerts", "ResolutionNoteSlackMessage")
ResolutionNote = apps.get_model("alerts", "ResolutionNote")
from apps.alerts.models import AlertGroup, ResolutionNote, ResolutionNoteSlackMessage
value = json.loads(payload["actions"][0]["value"])
slack_message_pk = value.get("message_pk")
resolution_note_pk = value.get("resolution_note_pk")

View file

@ -1,6 +1,5 @@
from contextlib import suppress
from django.apps import apps
from django.utils import timezone
from apps.slack.scenarios import scenario_step
@ -12,7 +11,7 @@ class SlackChannelCreatedOrRenamedEventStep(scenario_step.ScenarioStep):
"""
Triggered by action: Create or rename channel
"""
SlackChannel = apps.get_model("slack", "SlackChannel")
from apps.slack.models import SlackChannel
slack_id = payload["event"]["channel"]["id"]
channel_name = payload["event"]["channel"]["name"]
@ -32,7 +31,7 @@ class SlackChannelDeletedEventStep(scenario_step.ScenarioStep):
"""
Triggered by action: Delete channel
"""
SlackChannel = apps.get_model("slack", "SlackChannel")
from apps.slack.models import SlackChannel
slack_id = payload["event"]["channel"]
with suppress(SlackChannel.DoesNotExist):
@ -49,7 +48,7 @@ class SlackChannelArchivedEventStep(scenario_step.ScenarioStep):
"""
Triggered by action: Archive channel
"""
SlackChannel = apps.get_model("slack", "SlackChannel")
from apps.slack.models import SlackChannel
slack_id = payload["event"]["channel"]
@ -65,7 +64,7 @@ class SlackChannelUnArchivedEventStep(scenario_step.ScenarioStep):
"""
Triggered by action: UnArchive channel
"""
SlackChannel = apps.get_model("slack", "SlackChannel")
from apps.slack.models import SlackChannel
slack_id = payload["event"]["channel"]

View file

@ -1,7 +1,5 @@
import logging
from django.apps import apps
from apps.slack.scenarios import scenario_step
logger = logging.getLogger(__name__)
@ -30,8 +28,8 @@ class SlackChannelMessageEventStep(scenario_step.ScenarioStep):
self.delete_thread_message_from_resolution_note(slack_user_identity, payload)
def save_thread_message_for_resolution_note(self, slack_user_identity, payload):
ResolutionNoteSlackMessage = apps.get_model("alerts", "ResolutionNoteSlackMessage")
SlackMessage = apps.get_model("slack", "SlackMessage")
from apps.alerts.models import ResolutionNoteSlackMessage
from apps.slack.models import SlackMessage
if slack_user_identity is None:
logger.warning(
@ -114,7 +112,7 @@ class SlackChannelMessageEventStep(scenario_step.ScenarioStep):
slack_thread_message.save()
def delete_thread_message_from_resolution_note(self, slack_user_identity, payload):
ResolutionNoteSlackMessage = apps.get_model("alerts", "ResolutionNoteSlackMessage")
from apps.alerts.models import ResolutionNoteSlackMessage
if slack_user_identity is None:
logger.warning(

View file

@ -1,5 +1,4 @@
import humanize
from django.apps import apps
from apps.alerts.incident_log_builder import IncidentLogBuilder
@ -7,8 +6,8 @@ from apps.alerts.incident_log_builder import IncidentLogBuilder
class AlertGroupLogSlackRenderer:
@staticmethod
def render_incident_log_report_for_slack(alert_group):
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
UserNotificationPolicyLogRecord = apps.get_model("base", "UserNotificationPolicyLogRecord")
from apps.alerts.models import AlertGroupLogRecord
from apps.base.models import UserNotificationPolicyLogRecord
log_builder = IncidentLogBuilder(alert_group)
all_log_records = log_builder.get_log_records_list()

View file

@ -1,4 +1,3 @@
from django.apps import apps
from django.utils import timezone
from apps.slack.scenarios import scenario_step
@ -9,7 +8,7 @@ class SlackUserGroupEventStep(scenario_step.ScenarioStep):
"""
Triggered by action: creation user groups or changes in user groups except its members.
"""
SlackUserGroup = apps.get_model("slack", "SlackUserGroup")
from apps.slack.models import SlackUserGroup
slack_id = payload["event"]["subteam"]["id"]
usergroup_name = payload["event"]["subteam"]["name"]
@ -35,7 +34,7 @@ class SlackUserGroupMembersChangedEventStep(scenario_step.ScenarioStep):
"""
Triggered by action: changed members in user group.
"""
SlackUserGroup = apps.get_model("slack", "SlackUserGroup")
from apps.slack.models import SlackUserGroup
slack_id = payload["event"]["subteam_id"]
try:

View file

@ -1,7 +1,6 @@
import logging
from typing import Optional, Tuple
from django.apps import apps
from django.utils import timezone
from slackclient import SlackClient
from slackclient.exceptions import TokenRefreshError
@ -89,7 +88,7 @@ class SlackClientWithErrorHandling(SlackClient):
return cumulative_response, cursor, rate_limited
def api_call(self, *args, **kwargs):
DynamicSetting = apps.get_model("base", "DynamicSetting")
from apps.base.models import DynamicSetting
simulate_slack_downtime = DynamicSetting.objects.get_or_create(
name="simulate_slack_downtime", defaults={"boolean_value": False}

View file

@ -1,7 +1,6 @@
import re
import emoji
from django.apps import apps
from slackviewer.formatter import SlackFormatter as SlackFormatterBase
@ -84,7 +83,7 @@ class SlackFormatter(SlackFormatterBase):
return annotation
def _sub_annotated_mention_slack_user(self, ref_id):
SlackUserIdentity = apps.get_model("slack", "SlackUserIdentity")
from apps.slack.models import SlackUserIdentity
slack_user_identity = SlackUserIdentity.objects.filter(
slack_team_identity=self.__ORGANIZATION.slack_team_identity, slack_id=ref_id

View file

@ -4,7 +4,6 @@ from typing import Optional
from celery import uuid as celery_uuid
from celery.utils.log import get_task_logger
from django.apps import apps
from django.conf import settings
from django.core.cache import cache
from django.utils import timezone
@ -50,8 +49,8 @@ def update_incident_slack_message(slack_team_identity_pk, alert_group_pk):
f" doesn't equal to cached task_id ({cached_task_id}) for alert_group {alert_group_pk}"
)
SlackTeamIdentity = apps.get_model("slack", "SlackTeamIdentity")
AlertGroup = apps.get_model("alerts", "AlertGroup")
from apps.alerts.models import AlertGroup
from apps.slack.models import SlackTeamIdentity
slack_team_identity = SlackTeamIdentity.objects.get(pk=slack_team_identity_pk)
alert_group = AlertGroup.objects.get(pk=alert_group_pk)
@ -75,9 +74,7 @@ def check_slack_message_exists_before_post_message_to_thread(
Check if slack message for current alert group exists before before posting a message to a thread in slack.
If it does not exist - restart task every 10 seconds for 24 hours.
"""
AlertGroup = apps.get_model("alerts", "AlertGroup")
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
EscalationPolicy = apps.get_model("alerts", "EscalationPolicy")
from apps.alerts.models import AlertGroup, AlertGroupLogRecord, EscalationPolicy
alert_group = AlertGroup.objects.get(pk=alert_group_pk)
slack_team_identity = alert_group.channel.organization.slack_team_identity
@ -146,8 +143,8 @@ def send_message_to_thread_if_bot_not_in_channel(alert_group_pk, slack_team_iden
Send message to alert group's thread if bot is not in current channel
"""
AlertGroup = apps.get_model("alerts", "AlertGroup")
SlackTeamIdentity = apps.get_model("slack", "SlackTeamIdentity")
from apps.alerts.models import AlertGroup
from apps.slack.models import SlackTeamIdentity
slack_team_identity = SlackTeamIdentity.objects.get(pk=slack_team_identity_pk)
alert_group = AlertGroup.objects.get(pk=alert_group_pk)
@ -163,8 +160,7 @@ def send_message_to_thread_if_bot_not_in_channel(alert_group_pk, slack_team_iden
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=0)
def unpopulate_slack_user_identities(organization_pk, force=False, ts=None):
User = apps.get_model("user_management", "User")
Organization = apps.get_model("user_management", "Organization")
from apps.user_management.models import Organization, User
organization = Organization.objects.get(pk=organization_pk)
@ -183,9 +179,8 @@ def unpopulate_slack_user_identities(organization_pk, force=False, ts=None):
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=0)
def populate_slack_user_identities(organization_pk):
SlackUserIdentity = apps.get_model("slack", "SlackUserIdentity")
Organization = apps.get_model("user_management", "Organization")
from apps.slack.models import SlackUserIdentity
from apps.user_management.models import Organization
organization = Organization.objects.get(pk=organization_pk)
unpopulate_slack_user_identities(organization_pk)
@ -269,8 +264,9 @@ def populate_slack_user_identities(organization_pk):
)
def post_or_update_log_report_message_task(alert_group_pk, slack_team_identity_pk, update=False):
logger.debug(f"Start post_or_update_log_report_message_task for alert_group {alert_group_pk}")
AlertGroup = apps.get_model("alerts", "AlertGroup")
SlackTeamIdentity = apps.get_model("slack", "SlackTeamIdentity")
from apps.alerts.models import AlertGroup
from apps.slack.models import SlackTeamIdentity
UpdateLogReportMessageStep = ScenarioStep.get_step("distribute_alerts", "UpdateLogReportMessageStep")
slack_team_identity = SlackTeamIdentity.objects.get(pk=slack_team_identity_pk)
@ -291,7 +287,7 @@ def post_or_update_log_report_message_task(alert_group_pk, slack_team_identity_p
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def post_slack_rate_limit_message(integration_id):
AlertReceiveChannel = apps.get_model("alerts", "AlertReceiveChannel")
from apps.alerts.models import AlertReceiveChannel
try:
integration = AlertReceiveChannel.objects.get(pk=integration_id)
@ -321,7 +317,7 @@ def post_slack_rate_limit_message(integration_id):
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def populate_slack_usergroups():
SlackTeamIdentity = apps.get_model("slack", "SlackTeamIdentity")
from apps.slack.models import SlackTeamIdentity
slack_team_identities = SlackTeamIdentity.objects.filter(
detected_token_revoked__isnull=True,
@ -343,8 +339,7 @@ def populate_slack_usergroups():
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def populate_slack_usergroups_for_team(slack_team_identity_id):
SlackTeamIdentity = apps.get_model("slack", "SlackTeamIdentity")
SlackUserGroup = apps.get_model("slack", "SlackUserGroup")
from apps.slack.models import SlackTeamIdentity, SlackUserGroup
slack_team_identity = SlackTeamIdentity.objects.get(pk=slack_team_identity_id)
sc = SlackClientWithErrorHandling(slack_team_identity.bot_access_token)
@ -465,7 +460,7 @@ def populate_slack_usergroups_for_team(slack_team_identity_id):
@shared_dedicated_queue_retry_task()
def start_update_slack_user_group_for_schedules():
SlackUserGroup = apps.get_model("slack", "SlackUserGroup")
from apps.slack.models import SlackUserGroup
user_group_pks = (
SlackUserGroup.objects.filter(oncall_schedules__isnull=False).distinct().values_list("pk", flat=True)
@ -477,7 +472,7 @@ def start_update_slack_user_group_for_schedules():
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=3)
def update_slack_user_group_for_schedules(user_group_pk):
SlackUserGroup = apps.get_model("slack", "SlackUserGroup")
from apps.slack.models import SlackUserGroup
try:
user_group = SlackUserGroup.objects.get(pk=user_group_pk)
@ -492,7 +487,7 @@ def update_slack_user_group_for_schedules(user_group_pk):
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def populate_slack_channels():
SlackTeamIdentity = apps.get_model("slack", "SlackTeamIdentity")
from apps.slack.models import SlackTeamIdentity
slack_team_identities = SlackTeamIdentity.objects.filter(
detected_token_revoked__isnull=True,
@ -529,8 +524,7 @@ def populate_slack_channels_for_team(slack_team_identity_id: int, cursor: Option
ids in cache and restart the task with the last successful pagination cursor to avoid any data loss during delay
time.
"""
SlackTeamIdentity = apps.get_model("slack", "SlackTeamIdentity")
SlackChannel = apps.get_model("slack", "SlackChannel")
from apps.slack.models import SlackChannel, SlackTeamIdentity
slack_team_identity = SlackTeamIdentity.objects.get(pk=slack_team_identity_id)
sc = SlackClientWithErrorHandling(slack_team_identity.bot_access_token)
@ -636,8 +630,9 @@ def clean_slack_integration_leftovers(organization_id, *args, **kwargs):
This task removes binding to slack (e.g ChannelFilter's slack channel) for a given organization.
It is used when user changes slack integration.
"""
ChannelFilter = apps.get_model("alerts", "ChannelFilter")
OnCallSchedule = apps.get_model("schedules", "OnCallSchedule")
from apps.alerts.models import ChannelFilter
from apps.schedules.models import OnCallSchedule
logger.info(f"Start clean slack leftovers for organization {organization_id}")
ChannelFilter.objects.filter(alert_receive_channel__organization_id=organization_id).update(slack_channel_id=None)
logger.info(f"Cleaned ChannelFilters slack_channel_id for organization {organization_id}")
@ -651,9 +646,9 @@ def clean_slack_channel_leftovers(slack_team_identity_id, slack_channel_id):
"""
This task removes binding to slack channel after channel arcived or deleted in slack.
"""
SlackTeamIdentity = apps.get_model("slack", "SlackTeamIdentity")
ChannelFilter = apps.get_model("alerts", "ChannelFilter")
Organization = apps.get_model("user_management", "Organization")
from apps.alerts.models import ChannelFilter
from apps.slack.models import SlackTeamIdentity
from apps.user_management.models import Organization
try:
sti = SlackTeamIdentity.objects.get(id=slack_team_identity_id)

View file

@ -1,7 +1,6 @@
import logging
from urllib.parse import urljoin
from django.apps import apps
from django.conf import settings
from django.contrib.auth import REDIRECT_FIELD_NAME
from django.http import HttpResponse
@ -29,7 +28,7 @@ def set_user_and_organization_from_request(backend, strategy, *args, **kwargs):
def connect_user_to_slack(response, backend, strategy, user, organization, *args, **kwargs):
SlackUserIdentity = apps.get_model("slack", "SlackUserIdentity")
from apps.slack.models import SlackUserIdentity
# Continue pipeline step only if it was installation
if backend.name != "slack-login":
@ -80,7 +79,7 @@ def connect_user_to_slack(response, backend, strategy, user, organization, *args
def populate_slack_identities(response, backend, user, organization, **kwargs):
SlackTeamIdentity = apps.get_model("slack", "SlackTeamIdentity")
from apps.slack.models import SlackTeamIdentity
# Continue pipeline step only if it was installation
if backend.name != "slack-install-free":

View file

@ -1,7 +1,5 @@
import logging
from django.apps import apps
from apps.alerts.models import AlertGroup
from apps.alerts.representative import AlertGroupAbstractRepresentative
from apps.telegram.models import TelegramMessage
@ -16,8 +14,7 @@ class AlertGroupTelegramRepresentative(AlertGroupAbstractRepresentative):
self.log_record = log_record
def is_applicable(self):
TelegramToUserConnector = apps.get_model("telegram", "TelegramToUserConnector")
TelegramToOrganizationConnector = apps.get_model("telegram", "TelegramToOrganizationConnector")
from apps.telegram.models import TelegramToOrganizationConnector, TelegramToUserConnector
organization = self.log_record.alert_group.channel.organization
@ -32,7 +29,7 @@ class AlertGroupTelegramRepresentative(AlertGroupAbstractRepresentative):
@staticmethod
def get_handlers_map():
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroupLogRecord
return {
AlertGroupLogRecord.TYPE_ACK: "alert_group_action",
@ -78,7 +75,8 @@ class AlertGroupTelegramRepresentative(AlertGroupAbstractRepresentative):
@classmethod
def on_alert_group_action_triggered(cls, **kwargs):
AlertGroupLogRecord = apps.get_model("alerts", "AlertGroupLogRecord")
from apps.alerts.models import AlertGroupLogRecord
log_record = kwargs["log_record"]
logger.info(f"AlertGroupTelegramRepresentative ACTION SIGNAL, log record {log_record}")

View file

@ -2,7 +2,6 @@ import logging
from celery import uuid as celery_uuid
from celery.utils.log import get_task_logger
from django.apps import apps
from django.conf import settings
from telegram import error
@ -83,7 +82,7 @@ def edit_message(self, message_pk):
def send_link_to_channel_message_or_fallback_to_full_alert_group(
self, alert_group_pk, notification_policy_pk, user_connector_pk
):
TelegramToUserConnector = apps.get_model("telegram", "TelegramToUserConnector")
from apps.telegram.models import TelegramToUserConnector
try:
user_connector = TelegramToUserConnector.objects.get(pk=user_connector_pk)

View file

@ -2,7 +2,6 @@ import logging
import urllib.parse
from string import digits
from django.apps import apps
from django.db.models import F, Q
from phonenumbers import COUNTRY_CODE_TO_REGION_CODE
from twilio.base.exceptions import TwilioRestException
@ -18,7 +17,14 @@ from apps.phone_notifications.exceptions import (
)
from apps.phone_notifications.phone_provider import PhoneProvider, ProviderFlags
from apps.twilioapp.gather import get_gather_message, get_gather_url
from apps.twilioapp.models import TwilioCallStatuses, TwilioPhoneCall, TwilioSMS
from apps.twilioapp.models import (
TwilioCallStatuses,
TwilioPhoneCall,
TwilioPhoneCallSender,
TwilioSMS,
TwilioSmsSender,
TwilioVerificationSender,
)
from apps.twilioapp.status_callback import get_call_status_callback_url, get_sms_status_callback_url
logger = logging.getLogger(__name__)
@ -260,11 +266,10 @@ class TwilioPhoneProvider(PhoneProvider):
def _default_twilio_number(self):
return live_settings.TWILIO_NUMBER
def _twilio_sender(self, sender_type, to):
def _twilio_sender(self, sender_model, to):
_, _, country_code = self._parse_number(to)
TwilioSender = apps.get_model("twilioapp", sender_type)
sender = (
TwilioSender.objects.filter(Q(country_code=country_code) | Q(country_code__isnull=True))
sender_model.objects.filter(Q(country_code=country_code) | Q(country_code__isnull=True))
.order_by(F("country_code").desc(nulls_last=True))
.first()
)
@ -275,15 +280,15 @@ class TwilioPhoneProvider(PhoneProvider):
return self._default_twilio_api_client, None
def _sms_sender(self, to):
client, sender = self._twilio_sender("TwilioSmsSender", to)
client, sender = self._twilio_sender(TwilioSmsSender, to)
return client, sender.sender if sender else self._default_twilio_number
def _phone_sender(self, to):
client, sender = self._twilio_sender("TwilioPhoneCallSender", to)
client, sender = self._twilio_sender(TwilioPhoneCallSender, to)
return client, sender.number if sender else self._default_twilio_number
def _verify_sender(self, to):
client, sender = self._twilio_sender("TwilioVerificationSender", to)
client, sender = self._twilio_sender(TwilioVerificationSender, to)
return client, sender.verify_service_sid if sender else live_settings.TWILIO_VERIFY_SERVICE_SID
def _get_calling_code(self, iso):

View file

@ -1,6 +1,5 @@
import logging
from django.apps import apps
from django.urls import reverse
from apps.alerts.signals import user_notification_action_triggered_signal
@ -21,7 +20,7 @@ def update_twilio_call_status(call_sid, call_status):
Returns:
"""
UserNotificationPolicyLogRecord = apps.get_model("base", "UserNotificationPolicyLogRecord")
from apps.base.models import UserNotificationPolicyLogRecord
if call_sid and call_status:
logger.info(f"twilioapp.update_twilio_call_status: processing sid={call_sid} status={call_status}")
@ -85,7 +84,8 @@ def update_twilio_call_status(call_sid, call_status):
def get_error_code_by_twilio_status(status):
UserNotificationPolicyLogRecord = apps.get_model("base", "UserNotificationPolicyLogRecord")
from apps.base.models import UserNotificationPolicyLogRecord
TWILIO_ERRORS_TO_ERROR_CODES_MAP = {
TwilioCallStatuses.BUSY: UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_PHONE_CALL_LINE_BUSY,
TwilioCallStatuses.FAILED: UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_PHONE_CALL_FAILED,
@ -106,7 +106,7 @@ def update_twilio_sms_status(message_sid, message_status):
Returns:
"""
UserNotificationPolicyLogRecord = apps.get_model("base", "UserNotificationPolicyLogRecord")
from apps.base.models import UserNotificationPolicyLogRecord
if message_sid and message_status:
logger.info(f"twilioapp.update_twilio_message_status: processing sid={message_sid} status={message_status}")
@ -163,7 +163,8 @@ def update_twilio_sms_status(message_sid, message_status):
def get_sms_error_code_by_twilio_status(status):
UserNotificationPolicyLogRecord = apps.get_model("base", "UserNotificationPolicyLogRecord")
from apps.base.models import UserNotificationPolicyLogRecord
TWILIO_ERRORS_TO_ERROR_CODES_MAP = {
TwilioSMSstatuses.UNDELIVERED: UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_SMS_DELIVERY_FAILED,
TwilioSMSstatuses.FAILED: UserNotificationPolicyLogRecord.ERROR_NOTIFICATION_SMS_DELIVERY_FAILED,

View file

@ -1,6 +1,5 @@
import logging
from django.apps import apps
from django.http import HttpResponse
from rest_framework import status
from rest_framework.permissions import BasePermission
@ -25,7 +24,8 @@ class AllowOnlyTwilio(BasePermission):
if not request_account_sid:
return False
TwilioAccount = apps.get_model("twilioapp", "TwilioAccount")
from apps.twilioapp.models import TwilioAccount
account = TwilioAccount.objects.filter(account_sid=request_account_sid).first()
if account:
return self.validate_request(request, account.account_sid, account.auth_token)

View file

@ -3,7 +3,6 @@ import typing
import uuid
from urllib.parse import urljoin
from django.apps import apps
from django.conf import settings
from django.core.validators import MinLengthValidator
from django.db import models
@ -243,7 +242,8 @@ class Organization(MaintainableObject):
unique_together = ("stack_id", "org_id")
def provision_plugin(self) -> ProvisionedPlugin:
PluginAuthToken = apps.get_model("auth_token", "PluginAuthToken")
from apps.auth_token.models import PluginAuthToken
_, token = PluginAuthToken.create_auth_token(organization=self)
return {
"stackId": self.stack_id,
@ -253,8 +253,9 @@ class Organization(MaintainableObject):
}
def revoke_plugin(self):
token_model = apps.get_model("auth_token", "PluginAuthToken")
token_model.objects.filter(organization=self).delete()
from apps.auth_token.models import PluginAuthToken
PluginAuthToken.objects.filter(organization=self).delete()
"""
Following methods:

View file

@ -1,13 +1,13 @@
import logging
from django.apps import apps
from django.db import models
logger = logging.getLogger(__name__)
def sync_regions(regions: list[dict]):
Region = apps.get_model("user_management", "Region")
from apps.user_management.models import Region
gcom_regions = {region["slug"]: region for region in regions}
existing_region_slugs = set(Region.objects.all().values_list("slug", flat=True))

View file

@ -3,7 +3,6 @@ import logging
import typing
from urllib.parse import urljoin
from django.apps import apps
from django.conf import settings
from django.core.validators import MinLengthValidator
from django.db import models
@ -288,7 +287,8 @@ class User(models.Model):
@property
def insight_logs_serialized(self):
UserNotificationPolicy = apps.get_model("base", "UserNotificationPolicy")
from apps.base.models import UserNotificationPolicy
default, important = UserNotificationPolicy.get_short_verbals_for_user(user=self)
notification_policies_verbal = f"default: {' - '.join(default)}, important: {' - '.join(important)}"
notification_policies_verbal = demojize(notification_policies_verbal)

View file

@ -1,4 +1,3 @@
from django.apps import apps
from django.conf import settings
from django.utils import timezone
@ -36,15 +35,15 @@ class FreePublicBetaSubscriptionStrategy(BaseSubscriptionStrategy):
Count sms and calls together and they have common limit.
For FreePublicBetaSubscriptionStrategy notifications are counted per day
"""
PhoneCallRecord = apps.get_model("phone_notifications", "PhoneCallRecord")
SMSMessage = apps.get_model("phone_notifications", "SMSRecord")
from apps.phone_notifications.models import PhoneCallRecord, SMSRecord
now = timezone.now()
day_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
calls_today = PhoneCallRecord.objects.filter(
created_at__gte=day_start,
receiver=user,
).count()
sms_today = SMSMessage.objects.filter(
sms_today = SMSRecord.objects.filter(
created_at__gte=day_start,
receiver=user,
).count()

View file

@ -1,7 +1,6 @@
import logging
from celery.utils.log import get_task_logger
from django.apps import apps
from django.conf import settings
from django.utils import timezone
@ -127,7 +126,8 @@ def check_grafana_incident_is_enabled(client):
def delete_organization_if_needed(organization):
# Organization has a manually set API token, it will not be found within GCOM
# and would need to be deleted manually.
PluginAuthToken = apps.get_model("auth_token", "PluginAuthToken")
from apps.auth_token.models import PluginAuthToken
manually_provisioned_token = PluginAuthToken.objects.filter(organization_id=organization.pk).first()
if manually_provisioned_token:
logger.info(f"Organization {organization.pk} has PluginAuthToken. Probably it's needed to delete org manually.")

View file

@ -1,7 +1,5 @@
from abc import ABC, abstractmethod
from django.apps import apps
class UserAbstractRepresentative(ABC):
HANDLER_PREFIX = "on_"
@ -12,5 +10,6 @@ class UserAbstractRepresentative(ABC):
@staticmethod
def get_handlers_map():
UserNotificationPolicyLogRecord = apps.get_model("base", "UserNotificationPolicyLogRecord")
from apps.base.models import UserNotificationPolicyLogRecord
return UserNotificationPolicyLogRecord.TYPE_TO_HANDLERS_MAP

Some files were not shown because too many files have changed in this diff Show more