oncall-engine/engine/apps/slack/tasks.py
Joey Orlando e115617528
chore: drop usage of SlackMessage.organization + drop orphaned SlackMessages (#5330)
# What this PR does

- Stops writing `SlackMessage.organization` + removes references to this
field. [As we
discussed](https://raintank-corp.slack.com/archives/C083TU81TCH/p1733315887463279?thread_ts=1733311105.095309&cid=C083TU81TCH),
we do not need this field on this model/table,
`SlackMessage._slack_team_identity` is sufficient (`organization` will
be dropped in a separate PR)
- Adds a data migration script which:
- drops orphaned `SlackMessage` records; ie. ones which, even after the
[`engine/apps/slack/migrations/0007_migrate_slackmessage_channel_id.py`](https://github.com/grafana/oncall/blob/dev/engine/apps/slack/migrations/0007_migrate_slackmessage_channel_id.py)
migration, still don't have a `SlackMessage.channel` id filled in (we
discussed + agreed on dropping these records
[here](https://raintank-corp.slack.com/archives/C083TU81TCH/p1733329914516859?thread_ts=1733311105.095309&cid=C083TU81TCH))
- fills in empty `SlackMessage.slack_team_identity` values (from
`slack_message.channel.slack_team_identity`)

### Other notes

On the `organization` topic.

We store records in `SlackMessage` for two purposes (AFAICT), and in
both cases, we have references back to the `organization`:
- alert groups - `slack_message.alert_group.channel.organization`
- shift swap requests - `shift_swap_request.schedule.organization`

## Checklist

- [x] Unit, integration, and e2e (if applicable) tests updated
- [x] Documentation added (or `pr:no public docs` PR label added if not
required)
- [x] Added the relevant release notes label (see labels prefixed w/
`release:`). These labels dictate how your PR will
    show up in the autogenerated release notes.
2024-12-06 11:43:40 -05:00

664 lines
28 KiB
Python

import logging
import random
import typing
from celery import uuid as celery_uuid
from celery.exceptions import Retry
from celery.utils.log import get_task_logger
from django.conf import settings
from django.core.cache import cache
from django.utils import timezone
from apps.slack.alert_group_slack_service import AlertGroupSlackService
from apps.slack.client import SlackClient
from apps.slack.constants import SLACK_BOT_ID
from apps.slack.errors import (
SlackAPICantUpdateMessageError,
SlackAPIChannelInactiveError,
SlackAPIChannelNotFoundError,
SlackAPIInvalidAuthError,
SlackAPIMessageNotFoundError,
SlackAPIPlanUpgradeRequiredError,
SlackAPIRatelimitError,
SlackAPITokenError,
SlackAPIUsergroupNotFoundError,
)
from apps.slack.utils import get_populate_slack_channel_task_id_key, post_message_to_channel
from common.custom_celery_tasks import shared_dedicated_queue_retry_task
from common.utils import batch_queryset
logger = get_task_logger(__name__)
logger.setLevel(logging.DEBUG)
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True)
def update_alert_group_slack_message(slack_message_pk: int) -> None:
"""
Background task to update the Slack message for an alert group.
This function is intended to be executed as a Celery task. It performs the following:
- Compares the current task ID with the task ID stored in the cache.
- If they do not match, it means a newer task has been scheduled, so the current task exits to prevent duplicated updates.
- Does the actual update of the Slack message.
- Upon successful completion, clears the task ID from the cache to allow future updates (also note that
the task ID is set in the cache with a timeout, so it will be automatically cleared after a certain period, even
if this task fails to clear it. See `SlackMessage.update_alert_groups_message` for more details).
Args:
slack_message_pk (int): The primary key of the `SlackMessage` instance to update.
"""
from apps.slack.models import SlackMessage
current_task_id = update_alert_group_slack_message.request.id
logger.info(
f"update_alert_group_slack_message for slack message {slack_message_pk} started with task_id {current_task_id}"
)
try:
slack_message = SlackMessage.objects.get(pk=slack_message_pk)
except SlackMessage.DoesNotExist:
logger.warning(f"SlackMessage {slack_message_pk} doesn't exist")
return
active_update_task_id = slack_message.get_active_update_task_id()
if current_task_id != active_update_task_id:
logger.warning(
f"update_alert_group_slack_message skipped, because current_task_id ({current_task_id}) "
f"does not equal to active_update_task_id ({active_update_task_id}) "
)
return
alert_group = slack_message.alert_group
if not alert_group:
logger.warning(
f"skipping update_alert_group_slack_message as SlackMessage {slack_message_pk} "
"doesn't have an alert group associated with it"
)
return
alert_group_pk = alert_group.pk
alert_receive_channel = alert_group.channel
alert_receive_channel_is_rate_limited = alert_receive_channel.is_rate_limited_in_slack
if alert_group.skip_escalation_in_slack:
logger.warning(
f"skipping update_alert_group_slack_message as AlertGroup {alert_group_pk} "
"has skip_escalation_in_slack set to True"
)
return
elif alert_receive_channel_is_rate_limited:
logger.warning(
f"skipping update_alert_group_slack_message as AlertGroup {alert_group.pk}'s "
f"integration ({alert_receive_channel.pk}) is rate-limited"
)
return
slack_client = SlackClient(slack_message.slack_team_identity)
try:
slack_client.chat_update(
channel=slack_message.channel.slack_id,
ts=slack_message.slack_id,
attachments=alert_group.render_slack_attachments(),
blocks=alert_group.render_slack_blocks(),
)
logger.info(f"Message has been updated for alert_group {alert_group_pk}")
except SlackAPIRatelimitError as e:
if not alert_receive_channel.is_maintenace_integration:
if not alert_receive_channel_is_rate_limited:
alert_receive_channel.start_send_rate_limit_message_task("Updating", e.retry_after)
logger.info(f"Message has not been updated for alert_group {alert_group_pk} due to slack rate limit.")
else:
raise
except (
SlackAPIMessageNotFoundError,
SlackAPICantUpdateMessageError,
SlackAPIChannelInactiveError,
SlackAPITokenError,
SlackAPIChannelNotFoundError,
):
pass
slack_message.mark_active_update_task_as_complete()
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True)
def check_slack_message_exists_before_post_message_to_thread(
alert_group_pk,
text,
escalation_policy_pk=None,
escalation_policy_step=None,
step_specific_info=None,
):
"""
Check if slack message for current alert group exists before before posting a message to a thread in slack.
If it does not exist - restart task every 10 seconds for 24 hours.
"""
from apps.alerts.models import AlertGroup, AlertGroupLogRecord, EscalationPolicy
alert_group = AlertGroup.objects.get(pk=alert_group_pk)
slack_team_identity = alert_group.channel.organization.slack_team_identity
# get escalation policy object if it exists to save it in log record
escalation_policy = EscalationPolicy.objects.filter(pk=escalation_policy_pk).first()
# we cannot post message to thread if team does not have slack team identity
if not slack_team_identity:
AlertGroupLogRecord(
type=AlertGroupLogRecord.TYPE_ESCALATION_FAILED,
alert_group=alert_group,
escalation_policy=escalation_policy,
escalation_error_code=AlertGroupLogRecord.ERROR_ESCALATION_NOTIFY_IN_SLACK,
escalation_policy_step=escalation_policy_step,
step_specific_info=step_specific_info,
).save()
logger.debug(
f"Failed to post message to thread in Slack for alert_group {alert_group_pk} because "
f"slack team identity doesn't exist"
)
return
retry_timeout_hours = 24
if alert_group.slack_message:
AlertGroupSlackService(slack_team_identity).publish_message_to_alert_group_thread(alert_group, text=text)
# check how much time has passed since alert group was created
# to prevent eternal loop of restarting check_slack_message_before_post_message_to_thread
elif timezone.now() < alert_group.started_at + timezone.timedelta(hours=retry_timeout_hours):
logger.debug(
f"check_slack_message_exists_before_post_message_to_thread for alert_group {alert_group.pk} failed "
f"because slack message does not exist. Restarting check_slack_message_before_post_message_to_thread."
)
restart_delay_seconds = 10
check_slack_message_exists_before_post_message_to_thread.apply_async(
(
alert_group_pk,
text,
escalation_policy_pk,
escalation_policy_step,
step_specific_info,
),
countdown=restart_delay_seconds,
)
else:
logger.debug(
f"check_slack_message_exists_before_post_message_to_thread for alert_group {alert_group.pk} failed "
f"because slack message after {retry_timeout_hours} hours still does not exist"
)
# create log if it was triggered by escalation step
if escalation_policy_step:
AlertGroupLogRecord(
type=AlertGroupLogRecord.TYPE_ESCALATION_FAILED,
alert_group=alert_group,
escalation_policy=escalation_policy,
escalation_error_code=AlertGroupLogRecord.ERROR_ESCALATION_NOTIFY_IN_SLACK,
escalation_policy_step=escalation_policy_step,
step_specific_info=step_specific_info,
).save()
@shared_dedicated_queue_retry_task(
autoretry_for=(Exception,),
dont_autoretry_for=(Retry,),
retry_backoff=True,
max_retries=1,
)
def send_message_to_thread_if_bot_not_in_channel(
alert_group_pk: int, slack_team_identity_pk: int, channel_id: int
) -> None:
"""
Send message to alert group's thread if bot is not in current channel
"""
from apps.alerts.models import AlertGroup
from apps.slack.models import SlackTeamIdentity
logger.info(
f"Starting send_message_to_thread_if_bot_not_in_channel alert_group={alert_group_pk} "
f"slack_team_identity={slack_team_identity_pk} channel_id={channel_id}"
)
slack_team_identity = SlackTeamIdentity.objects.get(pk=slack_team_identity_pk)
alert_group = AlertGroup.objects.get(pk=alert_group_pk)
sc = SlackClient(slack_team_identity, enable_ratelimit_retry=True)
bot_user_id = slack_team_identity.bot_user_id
members = slack_team_identity.get_conversation_members(sc, channel_id)
if bot_user_id not in members:
text = f"Please invite <@{bot_user_id}> to this channel to make all features available :wink:"
try:
logger.info("Attempting to send message to thread in Slack")
AlertGroupSlackService(slack_team_identity, sc).publish_message_to_alert_group_thread(
alert_group, text=text
)
except SlackAPIRatelimitError as e:
logger.warning(f"Slack API rate limit error: {e}, retrying task")
raise send_message_to_thread_if_bot_not_in_channel.retry(
(alert_group_pk, slack_team_identity_pk, channel_id), countdown=e.retry_after, exc=e
)
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=0)
def unpopulate_slack_user_identities(organization_pk, force=False, ts=None):
from apps.user_management.models import Organization, User
organization = Organization.objects.get(pk=organization_pk)
# Reset slack_user_identity for organization users (make sure to include deleted users in the queryset)
User.objects.filter_with_deleted(organization=organization).update(slack_user_identity=None)
if force:
organization.slack_team_identity = None
organization.default_slack_channel = None
organization.save(update_fields=["slack_team_identity", "default_slack_channel"])
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=0)
def populate_slack_user_identities(organization_pk):
from apps.slack.models import SlackUserIdentity
from apps.user_management.models import Organization
organization = Organization.objects.get(pk=organization_pk)
unpopulate_slack_user_identities(organization_pk)
slack_team_identity = organization.slack_team_identity
slack_user_identity_installed = slack_team_identity.installed_by
slack_user_identities_to_update = []
for member in slack_team_identity.members:
profile = member.get("profile")
email = profile.get("email", None)
# Don't collect bots, invited users and users from other workspaces
if (
member.get("id", None) == SLACK_BOT_ID
or member.get("is_bot", False)
or not email
or member.get("is_invited_user", False)
or member.get("is_restricted")
or member.get("is_ultra_restricted")
):
continue
# For user which installs bot
if member.get("id", None) == slack_user_identity_installed.slack_id:
slack_user_identity = slack_user_identity_installed
else:
try:
slack_user_identity, _ = slack_team_identity.slack_user_identities.get(
slack_id=member["id"],
)
except SlackUserIdentity.DoesNotExist:
continue
slack_user_identity.cached_slack_login = member.get("name", None)
slack_user_identity.cached_name = member.get("real_name") or profile.get("real_name", None)
slack_user_identity.cached_slack_email = profile.get("email", "")
slack_user_identity.profile_real_name = profile.get("real_name", None)
slack_user_identity.profile_real_name_normalized = profile.get("real_name_normalized", None)
slack_user_identity.profile_display_name = profile.get("display_name", None)
slack_user_identity.profile_display_name_normalized = profile.get("display_name_normalized", None)
slack_user_identity.cached_avatar = profile.get("image_512", None)
slack_user_identity.cached_timezone = member.get("tz", None)
slack_user_identity.deleted = member.get("deleted", None)
slack_user_identity.is_admin = member.get("is_admin", None)
slack_user_identity.is_owner = member.get("is_owner", None)
slack_user_identity.is_primary_owner = member.get("is_primary_owner", None)
slack_user_identity.is_restricted = member.get("is_restricted", None)
slack_user_identity.is_ultra_restricted = member.get("is_ultra_restricted", None)
slack_user_identity.cached_is_bot = member.get("is_bot", None) # This fields already existed
slack_user_identity.is_app_user = member.get("is_app_user", None)
slack_user_identities_to_update.append(slack_user_identity)
fields_to_update = [
"cached_slack_login",
"cached_name",
"cached_slack_email",
"profile_real_name",
"profile_real_name_normalized",
"profile_display_name",
"profile_display_name_normalized",
"cached_avatar",
"cached_timezone",
"deleted",
"is_admin",
"is_owner",
"is_primary_owner",
"is_restricted",
"is_ultra_restricted",
"cached_is_bot",
"is_app_user",
]
SlackUserIdentity.objects.bulk_update(slack_user_identities_to_update, fields_to_update, batch_size=5000)
@shared_dedicated_queue_retry_task(
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def post_slack_rate_limit_message(integration_id: int, error_message_verb: typing.Optional[str] = None) -> None:
"""
NOTE: error_message_verb was added to the function signature to allow for more descriptive error messages.
We set it to None by default to maintain backwards compatibility with existing tasks. The default of None
can likely be removed in the near future (once existing tasks on the queue have been processed).
"""
from apps.alerts.models import AlertReceiveChannel
try:
integration = AlertReceiveChannel.objects.get(pk=integration_id)
except AlertReceiveChannel.DoesNotExist:
logger.warning(f"AlertReceiveChannel {integration_id} doesn't exist")
return
if post_slack_rate_limit_message.request.id != integration.rate_limit_message_task_id:
logger.info(
f"post_slack_rate_limit_message. integration {integration_id}. ID mismatch. "
f"Active: {integration.rate_limit_message_task_id}"
)
return
default_route = integration.channel_filters.get(is_default=True)
if (slack_channel := default_route.slack_channel_or_org_default) is not None:
# NOTE: see function docstring above 👆
if error_message_verb is None:
error_message_verb = "Sending messages for"
text = (
f"{error_message_verb} Alert Groups in Slack, for integration {integration.verbal_name}, is "
f"temporarily rate-limited (due to a Slack rate-limit). Meanwhile, you can still find new Alert Groups "
f'in the <{integration.new_incidents_web_link}|"Alert Groups" web page>'
)
post_message_to_channel(integration.organization, slack_channel.slack_id, text)
@shared_dedicated_queue_retry_task(
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def populate_slack_usergroups():
from apps.slack.models import SlackTeamIdentity
slack_team_identities = SlackTeamIdentity.objects.filter(detected_token_revoked__isnull=True)
delay = 0
counter = 0
for qs in batch_queryset(slack_team_identities, 5000):
for slack_team_identity in qs:
counter += 1
# increase delay to prevent slack ratelimit
if counter % 8 == 0:
delay += 60
populate_slack_usergroups_for_team.apply_async((slack_team_identity.pk,), countdown=delay)
@shared_dedicated_queue_retry_task(
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def populate_slack_usergroups_for_team(slack_team_identity_id):
from apps.slack.models import SlackTeamIdentity, SlackUserGroup
slack_team_identity = SlackTeamIdentity.objects.get(pk=slack_team_identity_id)
sc = SlackClient(slack_team_identity, enable_ratelimit_retry=True)
try:
usergroups = sc.usergroups_list()["usergroups"]
except SlackAPIRatelimitError as e:
populate_slack_usergroups_for_team.apply_async((slack_team_identity_id,), countdown=e.retry_after)
return
except (SlackAPITokenError, SlackAPIInvalidAuthError, SlackAPIPlanUpgradeRequiredError):
return
today = timezone.now().date()
populated_user_groups_ids = slack_team_identity.usergroups.filter(last_populated=today).values_list(
"slack_id", flat=True
)
for usergroup in usergroups:
# skip groups that were recently populated
if usergroup["id"] in populated_user_groups_ids:
continue
try:
members = sc.usergroups_users_list(usergroup=usergroup["id"])["users"]
except SlackAPIRatelimitError as e:
populate_slack_usergroups_for_team.apply_async((slack_team_identity_id,), countdown=e.retry_after)
return
except (SlackAPIUsergroupNotFoundError, SlackAPIInvalidAuthError):
return
SlackUserGroup.objects.update_or_create(
slack_id=usergroup["id"],
slack_team_identity=slack_team_identity,
defaults={
"name": usergroup["name"],
"handle": usergroup["handle"],
"members": members,
"is_active": usergroup["date_delete"] == 0,
"last_populated": today,
},
)
@shared_dedicated_queue_retry_task()
def start_update_slack_user_group_for_schedules():
from apps.slack.models import SlackUserGroup
user_group_pks = (
SlackUserGroup.objects.filter(
oncall_schedules__isnull=False, # has oncall schedules connected
oncall_schedules__organization__deleted_at__isnull=True, # organization is not deleted
)
.distinct()
.values_list("pk", flat=True)
)
for user_group_pk in user_group_pks:
update_slack_user_group_for_schedules.delay(user_group_pk=user_group_pk)
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=3)
def update_slack_user_group_for_schedules(user_group_pk):
from apps.slack.models import SlackUserGroup
try:
user_group = SlackUserGroup.objects.get(pk=user_group_pk)
except SlackUserGroup.DoesNotExist:
logger.warning(f"Slack user group {user_group_pk} does not exist")
return
user_group.update_oncall_members()
@shared_dedicated_queue_retry_task(
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def populate_slack_channels():
from apps.slack.models import SlackTeamIdentity
slack_team_identities = SlackTeamIdentity.objects.filter(detected_token_revoked__isnull=True)
delay = 0
counter = 0
for qs in batch_queryset(slack_team_identities, 5000):
for slack_team_identity in qs:
counter += 1
# increase delay to prevent slack ratelimit
if counter % 8 == 0:
delay += 60
start_populate_slack_channels_for_team(slack_team_identity.pk, delay)
def start_populate_slack_channels_for_team(
slack_team_identity_id: int, delay: int, cursor: typing.Optional[str] = None
) -> None:
# save active task id in cache to make only one populate task active per team
task_id = celery_uuid()
cache_key = get_populate_slack_channel_task_id_key(slack_team_identity_id)
cache.set(cache_key, task_id)
populate_slack_channels_for_team.apply_async((slack_team_identity_id, cursor), countdown=delay, task_id=task_id)
@shared_dedicated_queue_retry_task(
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def populate_slack_channels_for_team(slack_team_identity_id: int, cursor: typing.Optional[str] = None) -> None:
"""
Make paginated request to get slack channels. On ratelimit - update info for got channels, save collected channels
ids in cache and restart the task with the last successful pagination cursor to avoid any data loss during delay
time.
"""
from apps.slack.models import SlackChannel, SlackTeamIdentity
slack_team_identity = SlackTeamIdentity.objects.get(pk=slack_team_identity_id)
sc = SlackClient(slack_team_identity, enable_ratelimit_retry=True)
active_task_id_key = get_populate_slack_channel_task_id_key(slack_team_identity_id)
active_task_id = cache.get(active_task_id_key)
current_task_id = populate_slack_channels_for_team.request.id
if active_task_id and active_task_id != current_task_id:
logger.info(
f"Stop populate_slack_channels_for_team for SlackTeamIdentity pk: {slack_team_identity_id} due to "
f"incorrect active task id"
)
return
collected_channels_key = f"SLACK_CHANNELS_TEAM_{slack_team_identity_id}"
collected_channels = cache.get(collected_channels_key, set())
if cursor and not collected_channels:
# means the task was restarted after rate limit exception but collected channels were lost
logger.warning(
f"Restart slack channel sync for SlackTeamIdentity pk: {slack_team_identity_id} due to empty "
f"'collected_channels' after rate limit"
)
delay = 60
return start_populate_slack_channels_for_team(slack_team_identity_id, delay)
try:
response, cursor, rate_limited = sc.paginated_api_call_with_ratelimit(
"conversations_list",
paginated_key="channels",
types="public_channel,private_channel",
limit=1000,
cursor=cursor,
)
except (SlackAPITokenError, SlackAPIInvalidAuthError):
return
else:
today = timezone.now().date()
slack_channels = {channel["id"]: channel for channel in response["channels"]}
collected_channels.update(slack_channels.keys())
existing_channels = slack_team_identity.cached_channels.all()
existing_channel_ids = set(existing_channels.values_list("slack_id", flat=True))
# create missing channels
channels_to_create = tuple(
SlackChannel(
slack_team_identity=slack_team_identity,
slack_id=channel["id"],
name=channel["name"],
is_archived=channel["is_archived"],
is_shared=channel["is_shared"],
last_populated=today,
)
for channel in slack_channels.values()
if channel["id"] not in existing_channel_ids
)
SlackChannel.objects.bulk_create(channels_to_create, batch_size=5000)
# update existing channels
channels_to_update = existing_channels.filter(slack_id__in=slack_channels.keys()).exclude(last_populated=today)
for channel in channels_to_update:
slack_channel = slack_channels[channel.slack_id]
channel.name = slack_channel["name"]
channel.is_archived = slack_channel["is_archived"]
channel.is_shared = slack_channel["is_shared"]
channel.last_populated = today
SlackChannel.objects.bulk_update(
channels_to_update, fields=("name", "is_archived", "is_shared", "last_populated"), batch_size=5000
)
if rate_limited:
# save collected channels ids to cache and restart the task with the current pagination cursor
cache.set(collected_channels_key, collected_channels, timeout=3600)
delay = random.randint(1, 3) * 60
logger.warning(
f"'conversations.list' slack api error: rate_limited. SlackTeamIdentity pk: {slack_team_identity_id}. "
f"Delay populate_slack_channels_for_team task for {delay//60} min."
)
start_populate_slack_channels_for_team(slack_team_identity_id, delay, cursor)
else:
# delete excess channels
assert collected_channels
channel_ids_to_delete = existing_channel_ids - collected_channels
slack_team_identity.cached_channels.filter(slack_id__in=channel_ids_to_delete).delete()
cache.delete(collected_channels_key)
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=0)
def clean_slack_integration_leftovers(organization_id: int, *args, **kwargs) -> None:
"""
This task removes binding to slack (e.g ChannelFilter's slack channel) for a given organization.
It is used when user changes slack integration.
"""
from apps.alerts.models import ChannelFilter
from apps.schedules.models import OnCallSchedule
logger.info(f"Cleaning up for organization {organization_id}")
ChannelFilter.objects.filter(alert_receive_channel__organization_id=organization_id).update(slack_channel=None)
OnCallSchedule.objects.filter(organization_id=organization_id).update(slack_channel=None, user_group=None)
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=10)
def clean_slack_channel_leftovers(slack_team_identity_id: int, slack_channel_id: str) -> None:
"""
This task removes binding to slack channel after a channel is archived in Slack.
**NOTE**: this is only needed for Slack Channel archive. If a channel is deleted, we simply remove references
to that channel via `on_delete=models.SET_NULL`.
"""
from apps.alerts.models import ChannelFilter
from apps.schedules.models import OnCallSchedule
from apps.slack.models import SlackTeamIdentity
from apps.user_management.models import Organization
orgs_to_clean_default_slack_channel: typing.List[Organization] = []
try:
sti = SlackTeamIdentity.objects.get(id=slack_team_identity_id)
except SlackTeamIdentity.DoesNotExist:
logger.info(
f"Failed to clean_slack_channel_leftovers slack_channel_id={slack_channel_id} slack_team_identity_id={slack_team_identity_id} : Invalid slack_team_identity_id"
)
return
for org in sti.organizations.all():
org_id = org.id
if org.default_slack_channel_slack_id == slack_channel_id:
logger.info(
f"Set default_slack_channel to None for org_id={org_id} slack_channel_id={slack_channel_id} since slack_channel is arcived or deleted"
)
org.default_slack_channel = None
orgs_to_clean_default_slack_channel.append(org)
# The channel no longer exists, so update any integration routes (ie. ChannelFilter) or schedules
# that reference it
ChannelFilter.objects.filter(
alert_receive_channel__organization=org,
slack_channel__slack_id=slack_channel_id,
).update(slack_channel=None)
OnCallSchedule.objects.filter(
organization_id=org_id,
slack_channel__slack_id=slack_channel_id,
).update(slack_channel=None)
Organization.objects.bulk_update(orgs_to_clean_default_slack_channel, ["default_slack_channel"], batch_size=5000)