# What this PR does Remove [`apps.get_model`](https://docs.djangoproject.com/en/3.2/ref/applications/#django.apps.apps.get_model) invocations and use inline `import` statements in places where models are imported within functions/methods to avoid circular imports. I believe `import` statements are more appropriate for most use cases as they allow for better static code analysis & formatting, and solve the issue of circular imports without being unnecessarily dynamic as `apps.get_model`. With `import` statements, it's possible to: - Jump to model definitions in most IDEs - Automatically sort inline imports with `isort` - Find import errors faster/easier (most IDEs highlight broken imports) - Have more consistency across regular & inline imports when importing models This PR also adds a flake8 rule to ban imports of `django.apps.apps`, so it's harder to use `apps.get_model` by mistake (it's possible to ignore this rule by using `# noqa: I251`). The rule is not enforced on directories with migration files, because `apps.get_model` is often used to get a historical state of a model, which is useful when writing migrations ([see this SO answer for more details](https://stackoverflow.com/a/37769213)). So `apps.get_model` is considered OK in migrations (even necessary in some cases). ## Checklist - [x] Unit, integration, and e2e (if applicable) tests updated - [x] Documentation added (or `pr:no public docs` PR label added if not required) - [x] `CHANGELOG.md` updated (or `pr:no changelog` PR label added if not required)
69 lines
2.3 KiB
Python
69 lines
2.3 KiB
Python
import humanize
|
|
from django.conf import settings
|
|
from django.db import transaction
|
|
from django.db.models import F
|
|
|
|
from common.custom_celery_tasks import shared_dedicated_queue_retry_task
|
|
|
|
from .notify_user import notify_user_task
|
|
|
|
|
|
@shared_dedicated_queue_retry_task(
|
|
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
|
|
)
|
|
def invite_user_to_join_incident(invitation_pk):
|
|
from apps.alerts.models import AlertGroupLogRecord, Invitation
|
|
|
|
with transaction.atomic():
|
|
try:
|
|
invitation = Invitation.objects.filter(pk=invitation_pk).select_for_update()[0]
|
|
except IndexError:
|
|
return f"invite_user_to_join_incident: Invitation with pk {invitation_pk} doesn't exist"
|
|
|
|
if not invitation.is_active:
|
|
return None
|
|
if invitation.attempts_left <= 0 or invitation.alert_group.resolved:
|
|
invitation.is_active = False
|
|
invitation.save(update_fields=["is_active"])
|
|
return None
|
|
|
|
delay = Invitation.get_delay_by_attempt(invitation.attempt)
|
|
|
|
user_verbal = invitation.author.get_username_with_slack_verbal(mention=True)
|
|
reason = "Invitation activated by {}. Will try again in {} (attempt {}/{})".format(
|
|
user_verbal,
|
|
humanize.naturaldelta(delay),
|
|
invitation.attempt + 1,
|
|
Invitation.ATTEMPTS_LIMIT,
|
|
)
|
|
|
|
notify_task = notify_user_task.signature(
|
|
(
|
|
invitation.invitee.pk,
|
|
invitation.alert_group.pk,
|
|
),
|
|
{
|
|
"reason": reason,
|
|
"notify_even_acknowledged": True,
|
|
"notify_anyway": True,
|
|
"important": True,
|
|
},
|
|
immutable=True,
|
|
)
|
|
|
|
log_record = AlertGroupLogRecord(
|
|
type=AlertGroupLogRecord.TYPE_INVITATION_TRIGGERED,
|
|
author=None,
|
|
alert_group=invitation.alert_group,
|
|
invitation=invitation,
|
|
)
|
|
log_record.save()
|
|
|
|
invitation_task = invite_user_to_join_incident.signature(
|
|
(invitation.pk,), countdown=delay.total_seconds(), immutable=True
|
|
)
|
|
notify_task.apply_async()
|
|
invitation_task.apply_async()
|
|
|
|
invitation.attempt = F("attempt") + 1
|
|
invitation.save()
|