# What this PR does - Stops writing `SlackMessage.organization` + removes references to this field. [As we discussed](https://raintank-corp.slack.com/archives/C083TU81TCH/p1733315887463279?thread_ts=1733311105.095309&cid=C083TU81TCH), we do not need this field on this model/table, `SlackMessage._slack_team_identity` is sufficient (`organization` will be dropped in a separate PR) - Adds a data migration script which: - drops orphaned `SlackMessage` records; ie. ones which, even after the [`engine/apps/slack/migrations/0007_migrate_slackmessage_channel_id.py`](https://github.com/grafana/oncall/blob/dev/engine/apps/slack/migrations/0007_migrate_slackmessage_channel_id.py) migration, still don't have a `SlackMessage.channel` id filled in (we discussed + agreed on dropping these records [here](https://raintank-corp.slack.com/archives/C083TU81TCH/p1733329914516859?thread_ts=1733311105.095309&cid=C083TU81TCH)) - fills in empty `SlackMessage.slack_team_identity` values (from `slack_message.channel.slack_team_identity`) ### Other notes On the `organization` topic. We store records in `SlackMessage` for two purposes (AFAICT), and in both cases, we have references back to the `organization`: - alert groups - `slack_message.alert_group.channel.organization` - shift swap requests - `shift_swap_request.schedule.organization` ## Checklist - [x] Unit, integration, and e2e (if applicable) tests updated - [x] Documentation added (or `pr:no public docs` PR label added if not required) - [x] Added the relevant release notes label (see labels prefixed w/ `release:`). These labels dictate how your PR will show up in the autogenerated release notes.
664 lines
28 KiB
Python
664 lines
28 KiB
Python
import logging
|
|
import random
|
|
import typing
|
|
|
|
from celery import uuid as celery_uuid
|
|
from celery.exceptions import Retry
|
|
from celery.utils.log import get_task_logger
|
|
from django.conf import settings
|
|
from django.core.cache import cache
|
|
from django.utils import timezone
|
|
|
|
from apps.slack.alert_group_slack_service import AlertGroupSlackService
|
|
from apps.slack.client import SlackClient
|
|
from apps.slack.constants import SLACK_BOT_ID
|
|
from apps.slack.errors import (
|
|
SlackAPICantUpdateMessageError,
|
|
SlackAPIChannelInactiveError,
|
|
SlackAPIChannelNotFoundError,
|
|
SlackAPIInvalidAuthError,
|
|
SlackAPIMessageNotFoundError,
|
|
SlackAPIPlanUpgradeRequiredError,
|
|
SlackAPIRatelimitError,
|
|
SlackAPITokenError,
|
|
SlackAPIUsergroupNotFoundError,
|
|
)
|
|
from apps.slack.utils import get_populate_slack_channel_task_id_key, post_message_to_channel
|
|
from common.custom_celery_tasks import shared_dedicated_queue_retry_task
|
|
from common.utils import batch_queryset
|
|
|
|
logger = get_task_logger(__name__)
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
|
|
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True)
|
|
def update_alert_group_slack_message(slack_message_pk: int) -> None:
|
|
"""
|
|
Background task to update the Slack message for an alert group.
|
|
|
|
This function is intended to be executed as a Celery task. It performs the following:
|
|
- Compares the current task ID with the task ID stored in the cache.
|
|
- If they do not match, it means a newer task has been scheduled, so the current task exits to prevent duplicated updates.
|
|
- Does the actual update of the Slack message.
|
|
- Upon successful completion, clears the task ID from the cache to allow future updates (also note that
|
|
the task ID is set in the cache with a timeout, so it will be automatically cleared after a certain period, even
|
|
if this task fails to clear it. See `SlackMessage.update_alert_groups_message` for more details).
|
|
|
|
Args:
|
|
slack_message_pk (int): The primary key of the `SlackMessage` instance to update.
|
|
"""
|
|
from apps.slack.models import SlackMessage
|
|
|
|
current_task_id = update_alert_group_slack_message.request.id
|
|
|
|
logger.info(
|
|
f"update_alert_group_slack_message for slack message {slack_message_pk} started with task_id {current_task_id}"
|
|
)
|
|
|
|
try:
|
|
slack_message = SlackMessage.objects.get(pk=slack_message_pk)
|
|
except SlackMessage.DoesNotExist:
|
|
logger.warning(f"SlackMessage {slack_message_pk} doesn't exist")
|
|
return
|
|
|
|
active_update_task_id = slack_message.get_active_update_task_id()
|
|
if current_task_id != active_update_task_id:
|
|
logger.warning(
|
|
f"update_alert_group_slack_message skipped, because current_task_id ({current_task_id}) "
|
|
f"does not equal to active_update_task_id ({active_update_task_id}) "
|
|
)
|
|
return
|
|
|
|
alert_group = slack_message.alert_group
|
|
if not alert_group:
|
|
logger.warning(
|
|
f"skipping update_alert_group_slack_message as SlackMessage {slack_message_pk} "
|
|
"doesn't have an alert group associated with it"
|
|
)
|
|
return
|
|
|
|
alert_group_pk = alert_group.pk
|
|
alert_receive_channel = alert_group.channel
|
|
alert_receive_channel_is_rate_limited = alert_receive_channel.is_rate_limited_in_slack
|
|
|
|
if alert_group.skip_escalation_in_slack:
|
|
logger.warning(
|
|
f"skipping update_alert_group_slack_message as AlertGroup {alert_group_pk} "
|
|
"has skip_escalation_in_slack set to True"
|
|
)
|
|
return
|
|
elif alert_receive_channel_is_rate_limited:
|
|
logger.warning(
|
|
f"skipping update_alert_group_slack_message as AlertGroup {alert_group.pk}'s "
|
|
f"integration ({alert_receive_channel.pk}) is rate-limited"
|
|
)
|
|
return
|
|
|
|
slack_client = SlackClient(slack_message.slack_team_identity)
|
|
|
|
try:
|
|
slack_client.chat_update(
|
|
channel=slack_message.channel.slack_id,
|
|
ts=slack_message.slack_id,
|
|
attachments=alert_group.render_slack_attachments(),
|
|
blocks=alert_group.render_slack_blocks(),
|
|
)
|
|
|
|
logger.info(f"Message has been updated for alert_group {alert_group_pk}")
|
|
except SlackAPIRatelimitError as e:
|
|
if not alert_receive_channel.is_maintenace_integration:
|
|
if not alert_receive_channel_is_rate_limited:
|
|
alert_receive_channel.start_send_rate_limit_message_task("Updating", e.retry_after)
|
|
logger.info(f"Message has not been updated for alert_group {alert_group_pk} due to slack rate limit.")
|
|
else:
|
|
raise
|
|
except (
|
|
SlackAPIMessageNotFoundError,
|
|
SlackAPICantUpdateMessageError,
|
|
SlackAPIChannelInactiveError,
|
|
SlackAPITokenError,
|
|
SlackAPIChannelNotFoundError,
|
|
):
|
|
pass
|
|
|
|
slack_message.mark_active_update_task_as_complete()
|
|
|
|
|
|
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True)
|
|
def check_slack_message_exists_before_post_message_to_thread(
|
|
alert_group_pk,
|
|
text,
|
|
escalation_policy_pk=None,
|
|
escalation_policy_step=None,
|
|
step_specific_info=None,
|
|
):
|
|
"""
|
|
Check if slack message for current alert group exists before before posting a message to a thread in slack.
|
|
If it does not exist - restart task every 10 seconds for 24 hours.
|
|
"""
|
|
from apps.alerts.models import AlertGroup, AlertGroupLogRecord, EscalationPolicy
|
|
|
|
alert_group = AlertGroup.objects.get(pk=alert_group_pk)
|
|
slack_team_identity = alert_group.channel.organization.slack_team_identity
|
|
# get escalation policy object if it exists to save it in log record
|
|
escalation_policy = EscalationPolicy.objects.filter(pk=escalation_policy_pk).first()
|
|
|
|
# we cannot post message to thread if team does not have slack team identity
|
|
if not slack_team_identity:
|
|
AlertGroupLogRecord(
|
|
type=AlertGroupLogRecord.TYPE_ESCALATION_FAILED,
|
|
alert_group=alert_group,
|
|
escalation_policy=escalation_policy,
|
|
escalation_error_code=AlertGroupLogRecord.ERROR_ESCALATION_NOTIFY_IN_SLACK,
|
|
escalation_policy_step=escalation_policy_step,
|
|
step_specific_info=step_specific_info,
|
|
).save()
|
|
logger.debug(
|
|
f"Failed to post message to thread in Slack for alert_group {alert_group_pk} because "
|
|
f"slack team identity doesn't exist"
|
|
)
|
|
return
|
|
retry_timeout_hours = 24
|
|
|
|
if alert_group.slack_message:
|
|
AlertGroupSlackService(slack_team_identity).publish_message_to_alert_group_thread(alert_group, text=text)
|
|
|
|
# check how much time has passed since alert group was created
|
|
# to prevent eternal loop of restarting check_slack_message_before_post_message_to_thread
|
|
elif timezone.now() < alert_group.started_at + timezone.timedelta(hours=retry_timeout_hours):
|
|
logger.debug(
|
|
f"check_slack_message_exists_before_post_message_to_thread for alert_group {alert_group.pk} failed "
|
|
f"because slack message does not exist. Restarting check_slack_message_before_post_message_to_thread."
|
|
)
|
|
restart_delay_seconds = 10
|
|
check_slack_message_exists_before_post_message_to_thread.apply_async(
|
|
(
|
|
alert_group_pk,
|
|
text,
|
|
escalation_policy_pk,
|
|
escalation_policy_step,
|
|
step_specific_info,
|
|
),
|
|
countdown=restart_delay_seconds,
|
|
)
|
|
else:
|
|
logger.debug(
|
|
f"check_slack_message_exists_before_post_message_to_thread for alert_group {alert_group.pk} failed "
|
|
f"because slack message after {retry_timeout_hours} hours still does not exist"
|
|
)
|
|
# create log if it was triggered by escalation step
|
|
if escalation_policy_step:
|
|
AlertGroupLogRecord(
|
|
type=AlertGroupLogRecord.TYPE_ESCALATION_FAILED,
|
|
alert_group=alert_group,
|
|
escalation_policy=escalation_policy,
|
|
escalation_error_code=AlertGroupLogRecord.ERROR_ESCALATION_NOTIFY_IN_SLACK,
|
|
escalation_policy_step=escalation_policy_step,
|
|
step_specific_info=step_specific_info,
|
|
).save()
|
|
|
|
|
|
@shared_dedicated_queue_retry_task(
|
|
autoretry_for=(Exception,),
|
|
dont_autoretry_for=(Retry,),
|
|
retry_backoff=True,
|
|
max_retries=1,
|
|
)
|
|
def send_message_to_thread_if_bot_not_in_channel(
|
|
alert_group_pk: int, slack_team_identity_pk: int, channel_id: int
|
|
) -> None:
|
|
"""
|
|
Send message to alert group's thread if bot is not in current channel
|
|
"""
|
|
from apps.alerts.models import AlertGroup
|
|
from apps.slack.models import SlackTeamIdentity
|
|
|
|
logger.info(
|
|
f"Starting send_message_to_thread_if_bot_not_in_channel alert_group={alert_group_pk} "
|
|
f"slack_team_identity={slack_team_identity_pk} channel_id={channel_id}"
|
|
)
|
|
|
|
slack_team_identity = SlackTeamIdentity.objects.get(pk=slack_team_identity_pk)
|
|
alert_group = AlertGroup.objects.get(pk=alert_group_pk)
|
|
|
|
sc = SlackClient(slack_team_identity, enable_ratelimit_retry=True)
|
|
|
|
bot_user_id = slack_team_identity.bot_user_id
|
|
members = slack_team_identity.get_conversation_members(sc, channel_id)
|
|
if bot_user_id not in members:
|
|
text = f"Please invite <@{bot_user_id}> to this channel to make all features available :wink:"
|
|
|
|
try:
|
|
logger.info("Attempting to send message to thread in Slack")
|
|
|
|
AlertGroupSlackService(slack_team_identity, sc).publish_message_to_alert_group_thread(
|
|
alert_group, text=text
|
|
)
|
|
except SlackAPIRatelimitError as e:
|
|
logger.warning(f"Slack API rate limit error: {e}, retrying task")
|
|
|
|
raise send_message_to_thread_if_bot_not_in_channel.retry(
|
|
(alert_group_pk, slack_team_identity_pk, channel_id), countdown=e.retry_after, exc=e
|
|
)
|
|
|
|
|
|
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=0)
|
|
def unpopulate_slack_user_identities(organization_pk, force=False, ts=None):
|
|
from apps.user_management.models import Organization, User
|
|
|
|
organization = Organization.objects.get(pk=organization_pk)
|
|
|
|
# Reset slack_user_identity for organization users (make sure to include deleted users in the queryset)
|
|
User.objects.filter_with_deleted(organization=organization).update(slack_user_identity=None)
|
|
|
|
if force:
|
|
organization.slack_team_identity = None
|
|
organization.default_slack_channel = None
|
|
organization.save(update_fields=["slack_team_identity", "default_slack_channel"])
|
|
|
|
|
|
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=0)
|
|
def populate_slack_user_identities(organization_pk):
|
|
from apps.slack.models import SlackUserIdentity
|
|
from apps.user_management.models import Organization
|
|
|
|
organization = Organization.objects.get(pk=organization_pk)
|
|
unpopulate_slack_user_identities(organization_pk)
|
|
slack_team_identity = organization.slack_team_identity
|
|
|
|
slack_user_identity_installed = slack_team_identity.installed_by
|
|
slack_user_identities_to_update = []
|
|
|
|
for member in slack_team_identity.members:
|
|
profile = member.get("profile")
|
|
email = profile.get("email", None)
|
|
|
|
# Don't collect bots, invited users and users from other workspaces
|
|
if (
|
|
member.get("id", None) == SLACK_BOT_ID
|
|
or member.get("is_bot", False)
|
|
or not email
|
|
or member.get("is_invited_user", False)
|
|
or member.get("is_restricted")
|
|
or member.get("is_ultra_restricted")
|
|
):
|
|
continue
|
|
|
|
# For user which installs bot
|
|
if member.get("id", None) == slack_user_identity_installed.slack_id:
|
|
slack_user_identity = slack_user_identity_installed
|
|
else:
|
|
try:
|
|
slack_user_identity, _ = slack_team_identity.slack_user_identities.get(
|
|
slack_id=member["id"],
|
|
)
|
|
except SlackUserIdentity.DoesNotExist:
|
|
continue
|
|
|
|
slack_user_identity.cached_slack_login = member.get("name", None)
|
|
slack_user_identity.cached_name = member.get("real_name") or profile.get("real_name", None)
|
|
slack_user_identity.cached_slack_email = profile.get("email", "")
|
|
|
|
slack_user_identity.profile_real_name = profile.get("real_name", None)
|
|
slack_user_identity.profile_real_name_normalized = profile.get("real_name_normalized", None)
|
|
slack_user_identity.profile_display_name = profile.get("display_name", None)
|
|
slack_user_identity.profile_display_name_normalized = profile.get("display_name_normalized", None)
|
|
slack_user_identity.cached_avatar = profile.get("image_512", None)
|
|
slack_user_identity.cached_timezone = member.get("tz", None)
|
|
|
|
slack_user_identity.deleted = member.get("deleted", None)
|
|
slack_user_identity.is_admin = member.get("is_admin", None)
|
|
slack_user_identity.is_owner = member.get("is_owner", None)
|
|
slack_user_identity.is_primary_owner = member.get("is_primary_owner", None)
|
|
slack_user_identity.is_restricted = member.get("is_restricted", None)
|
|
slack_user_identity.is_ultra_restricted = member.get("is_ultra_restricted", None)
|
|
slack_user_identity.cached_is_bot = member.get("is_bot", None) # This fields already existed
|
|
slack_user_identity.is_app_user = member.get("is_app_user", None)
|
|
|
|
slack_user_identities_to_update.append(slack_user_identity)
|
|
|
|
fields_to_update = [
|
|
"cached_slack_login",
|
|
"cached_name",
|
|
"cached_slack_email",
|
|
"profile_real_name",
|
|
"profile_real_name_normalized",
|
|
"profile_display_name",
|
|
"profile_display_name_normalized",
|
|
"cached_avatar",
|
|
"cached_timezone",
|
|
"deleted",
|
|
"is_admin",
|
|
"is_owner",
|
|
"is_primary_owner",
|
|
"is_restricted",
|
|
"is_ultra_restricted",
|
|
"cached_is_bot",
|
|
"is_app_user",
|
|
]
|
|
SlackUserIdentity.objects.bulk_update(slack_user_identities_to_update, fields_to_update, batch_size=5000)
|
|
|
|
|
|
@shared_dedicated_queue_retry_task(
|
|
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
|
|
)
|
|
def post_slack_rate_limit_message(integration_id: int, error_message_verb: typing.Optional[str] = None) -> None:
|
|
"""
|
|
NOTE: error_message_verb was added to the function signature to allow for more descriptive error messages.
|
|
|
|
We set it to None by default to maintain backwards compatibility with existing tasks. The default of None
|
|
can likely be removed in the near future (once existing tasks on the queue have been processed).
|
|
"""
|
|
from apps.alerts.models import AlertReceiveChannel
|
|
|
|
try:
|
|
integration = AlertReceiveChannel.objects.get(pk=integration_id)
|
|
except AlertReceiveChannel.DoesNotExist:
|
|
logger.warning(f"AlertReceiveChannel {integration_id} doesn't exist")
|
|
return
|
|
|
|
if post_slack_rate_limit_message.request.id != integration.rate_limit_message_task_id:
|
|
logger.info(
|
|
f"post_slack_rate_limit_message. integration {integration_id}. ID mismatch. "
|
|
f"Active: {integration.rate_limit_message_task_id}"
|
|
)
|
|
return
|
|
|
|
default_route = integration.channel_filters.get(is_default=True)
|
|
if (slack_channel := default_route.slack_channel_or_org_default) is not None:
|
|
# NOTE: see function docstring above 👆
|
|
if error_message_verb is None:
|
|
error_message_verb = "Sending messages for"
|
|
|
|
text = (
|
|
f"{error_message_verb} Alert Groups in Slack, for integration {integration.verbal_name}, is "
|
|
f"temporarily rate-limited (due to a Slack rate-limit). Meanwhile, you can still find new Alert Groups "
|
|
f'in the <{integration.new_incidents_web_link}|"Alert Groups" web page>'
|
|
)
|
|
post_message_to_channel(integration.organization, slack_channel.slack_id, text)
|
|
|
|
|
|
@shared_dedicated_queue_retry_task(
|
|
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
|
|
)
|
|
def populate_slack_usergroups():
|
|
from apps.slack.models import SlackTeamIdentity
|
|
|
|
slack_team_identities = SlackTeamIdentity.objects.filter(detected_token_revoked__isnull=True)
|
|
|
|
delay = 0
|
|
counter = 0
|
|
|
|
for qs in batch_queryset(slack_team_identities, 5000):
|
|
for slack_team_identity in qs:
|
|
counter += 1
|
|
# increase delay to prevent slack ratelimit
|
|
if counter % 8 == 0:
|
|
delay += 60
|
|
populate_slack_usergroups_for_team.apply_async((slack_team_identity.pk,), countdown=delay)
|
|
|
|
|
|
@shared_dedicated_queue_retry_task(
|
|
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
|
|
)
|
|
def populate_slack_usergroups_for_team(slack_team_identity_id):
|
|
from apps.slack.models import SlackTeamIdentity, SlackUserGroup
|
|
|
|
slack_team_identity = SlackTeamIdentity.objects.get(pk=slack_team_identity_id)
|
|
sc = SlackClient(slack_team_identity, enable_ratelimit_retry=True)
|
|
|
|
try:
|
|
usergroups = sc.usergroups_list()["usergroups"]
|
|
except SlackAPIRatelimitError as e:
|
|
populate_slack_usergroups_for_team.apply_async((slack_team_identity_id,), countdown=e.retry_after)
|
|
return
|
|
except (SlackAPITokenError, SlackAPIInvalidAuthError, SlackAPIPlanUpgradeRequiredError):
|
|
return
|
|
|
|
today = timezone.now().date()
|
|
populated_user_groups_ids = slack_team_identity.usergroups.filter(last_populated=today).values_list(
|
|
"slack_id", flat=True
|
|
)
|
|
|
|
for usergroup in usergroups:
|
|
# skip groups that were recently populated
|
|
if usergroup["id"] in populated_user_groups_ids:
|
|
continue
|
|
|
|
try:
|
|
members = sc.usergroups_users_list(usergroup=usergroup["id"])["users"]
|
|
except SlackAPIRatelimitError as e:
|
|
populate_slack_usergroups_for_team.apply_async((slack_team_identity_id,), countdown=e.retry_after)
|
|
return
|
|
except (SlackAPIUsergroupNotFoundError, SlackAPIInvalidAuthError):
|
|
return
|
|
|
|
SlackUserGroup.objects.update_or_create(
|
|
slack_id=usergroup["id"],
|
|
slack_team_identity=slack_team_identity,
|
|
defaults={
|
|
"name": usergroup["name"],
|
|
"handle": usergroup["handle"],
|
|
"members": members,
|
|
"is_active": usergroup["date_delete"] == 0,
|
|
"last_populated": today,
|
|
},
|
|
)
|
|
|
|
|
|
@shared_dedicated_queue_retry_task()
|
|
def start_update_slack_user_group_for_schedules():
|
|
from apps.slack.models import SlackUserGroup
|
|
|
|
user_group_pks = (
|
|
SlackUserGroup.objects.filter(
|
|
oncall_schedules__isnull=False, # has oncall schedules connected
|
|
oncall_schedules__organization__deleted_at__isnull=True, # organization is not deleted
|
|
)
|
|
.distinct()
|
|
.values_list("pk", flat=True)
|
|
)
|
|
|
|
for user_group_pk in user_group_pks:
|
|
update_slack_user_group_for_schedules.delay(user_group_pk=user_group_pk)
|
|
|
|
|
|
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=3)
|
|
def update_slack_user_group_for_schedules(user_group_pk):
|
|
from apps.slack.models import SlackUserGroup
|
|
|
|
try:
|
|
user_group = SlackUserGroup.objects.get(pk=user_group_pk)
|
|
except SlackUserGroup.DoesNotExist:
|
|
logger.warning(f"Slack user group {user_group_pk} does not exist")
|
|
return
|
|
|
|
user_group.update_oncall_members()
|
|
|
|
|
|
@shared_dedicated_queue_retry_task(
|
|
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
|
|
)
|
|
def populate_slack_channels():
|
|
from apps.slack.models import SlackTeamIdentity
|
|
|
|
slack_team_identities = SlackTeamIdentity.objects.filter(detected_token_revoked__isnull=True)
|
|
|
|
delay = 0
|
|
counter = 0
|
|
|
|
for qs in batch_queryset(slack_team_identities, 5000):
|
|
for slack_team_identity in qs:
|
|
counter += 1
|
|
# increase delay to prevent slack ratelimit
|
|
if counter % 8 == 0:
|
|
delay += 60
|
|
start_populate_slack_channels_for_team(slack_team_identity.pk, delay)
|
|
|
|
|
|
def start_populate_slack_channels_for_team(
|
|
slack_team_identity_id: int, delay: int, cursor: typing.Optional[str] = None
|
|
) -> None:
|
|
# save active task id in cache to make only one populate task active per team
|
|
task_id = celery_uuid()
|
|
cache_key = get_populate_slack_channel_task_id_key(slack_team_identity_id)
|
|
cache.set(cache_key, task_id)
|
|
populate_slack_channels_for_team.apply_async((slack_team_identity_id, cursor), countdown=delay, task_id=task_id)
|
|
|
|
|
|
@shared_dedicated_queue_retry_task(
|
|
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
|
|
)
|
|
def populate_slack_channels_for_team(slack_team_identity_id: int, cursor: typing.Optional[str] = None) -> None:
|
|
"""
|
|
Make paginated request to get slack channels. On ratelimit - update info for got channels, save collected channels
|
|
ids in cache and restart the task with the last successful pagination cursor to avoid any data loss during delay
|
|
time.
|
|
"""
|
|
from apps.slack.models import SlackChannel, SlackTeamIdentity
|
|
|
|
slack_team_identity = SlackTeamIdentity.objects.get(pk=slack_team_identity_id)
|
|
sc = SlackClient(slack_team_identity, enable_ratelimit_retry=True)
|
|
|
|
active_task_id_key = get_populate_slack_channel_task_id_key(slack_team_identity_id)
|
|
active_task_id = cache.get(active_task_id_key)
|
|
current_task_id = populate_slack_channels_for_team.request.id
|
|
if active_task_id and active_task_id != current_task_id:
|
|
logger.info(
|
|
f"Stop populate_slack_channels_for_team for SlackTeamIdentity pk: {slack_team_identity_id} due to "
|
|
f"incorrect active task id"
|
|
)
|
|
return
|
|
collected_channels_key = f"SLACK_CHANNELS_TEAM_{slack_team_identity_id}"
|
|
collected_channels = cache.get(collected_channels_key, set())
|
|
if cursor and not collected_channels:
|
|
# means the task was restarted after rate limit exception but collected channels were lost
|
|
logger.warning(
|
|
f"Restart slack channel sync for SlackTeamIdentity pk: {slack_team_identity_id} due to empty "
|
|
f"'collected_channels' after rate limit"
|
|
)
|
|
delay = 60
|
|
return start_populate_slack_channels_for_team(slack_team_identity_id, delay)
|
|
try:
|
|
response, cursor, rate_limited = sc.paginated_api_call_with_ratelimit(
|
|
"conversations_list",
|
|
paginated_key="channels",
|
|
types="public_channel,private_channel",
|
|
limit=1000,
|
|
cursor=cursor,
|
|
)
|
|
except (SlackAPITokenError, SlackAPIInvalidAuthError):
|
|
return
|
|
else:
|
|
today = timezone.now().date()
|
|
|
|
slack_channels = {channel["id"]: channel for channel in response["channels"]}
|
|
collected_channels.update(slack_channels.keys())
|
|
|
|
existing_channels = slack_team_identity.cached_channels.all()
|
|
existing_channel_ids = set(existing_channels.values_list("slack_id", flat=True))
|
|
|
|
# create missing channels
|
|
channels_to_create = tuple(
|
|
SlackChannel(
|
|
slack_team_identity=slack_team_identity,
|
|
slack_id=channel["id"],
|
|
name=channel["name"],
|
|
is_archived=channel["is_archived"],
|
|
is_shared=channel["is_shared"],
|
|
last_populated=today,
|
|
)
|
|
for channel in slack_channels.values()
|
|
if channel["id"] not in existing_channel_ids
|
|
)
|
|
SlackChannel.objects.bulk_create(channels_to_create, batch_size=5000)
|
|
|
|
# update existing channels
|
|
channels_to_update = existing_channels.filter(slack_id__in=slack_channels.keys()).exclude(last_populated=today)
|
|
for channel in channels_to_update:
|
|
slack_channel = slack_channels[channel.slack_id]
|
|
channel.name = slack_channel["name"]
|
|
channel.is_archived = slack_channel["is_archived"]
|
|
channel.is_shared = slack_channel["is_shared"]
|
|
channel.last_populated = today
|
|
|
|
SlackChannel.objects.bulk_update(
|
|
channels_to_update, fields=("name", "is_archived", "is_shared", "last_populated"), batch_size=5000
|
|
)
|
|
if rate_limited:
|
|
# save collected channels ids to cache and restart the task with the current pagination cursor
|
|
cache.set(collected_channels_key, collected_channels, timeout=3600)
|
|
delay = random.randint(1, 3) * 60
|
|
logger.warning(
|
|
f"'conversations.list' slack api error: rate_limited. SlackTeamIdentity pk: {slack_team_identity_id}. "
|
|
f"Delay populate_slack_channels_for_team task for {delay//60} min."
|
|
)
|
|
start_populate_slack_channels_for_team(slack_team_identity_id, delay, cursor)
|
|
else:
|
|
# delete excess channels
|
|
assert collected_channels
|
|
channel_ids_to_delete = existing_channel_ids - collected_channels
|
|
slack_team_identity.cached_channels.filter(slack_id__in=channel_ids_to_delete).delete()
|
|
cache.delete(collected_channels_key)
|
|
|
|
|
|
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=0)
|
|
def clean_slack_integration_leftovers(organization_id: int, *args, **kwargs) -> None:
|
|
"""
|
|
This task removes binding to slack (e.g ChannelFilter's slack channel) for a given organization.
|
|
It is used when user changes slack integration.
|
|
"""
|
|
from apps.alerts.models import ChannelFilter
|
|
from apps.schedules.models import OnCallSchedule
|
|
|
|
logger.info(f"Cleaning up for organization {organization_id}")
|
|
ChannelFilter.objects.filter(alert_receive_channel__organization_id=organization_id).update(slack_channel=None)
|
|
OnCallSchedule.objects.filter(organization_id=organization_id).update(slack_channel=None, user_group=None)
|
|
|
|
|
|
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=10)
|
|
def clean_slack_channel_leftovers(slack_team_identity_id: int, slack_channel_id: str) -> None:
|
|
"""
|
|
This task removes binding to slack channel after a channel is archived in Slack.
|
|
|
|
**NOTE**: this is only needed for Slack Channel archive. If a channel is deleted, we simply remove references
|
|
to that channel via `on_delete=models.SET_NULL`.
|
|
"""
|
|
from apps.alerts.models import ChannelFilter
|
|
from apps.schedules.models import OnCallSchedule
|
|
from apps.slack.models import SlackTeamIdentity
|
|
from apps.user_management.models import Organization
|
|
|
|
orgs_to_clean_default_slack_channel: typing.List[Organization] = []
|
|
|
|
try:
|
|
sti = SlackTeamIdentity.objects.get(id=slack_team_identity_id)
|
|
except SlackTeamIdentity.DoesNotExist:
|
|
logger.info(
|
|
f"Failed to clean_slack_channel_leftovers slack_channel_id={slack_channel_id} slack_team_identity_id={slack_team_identity_id} : Invalid slack_team_identity_id"
|
|
)
|
|
return
|
|
|
|
for org in sti.organizations.all():
|
|
org_id = org.id
|
|
|
|
if org.default_slack_channel_slack_id == slack_channel_id:
|
|
logger.info(
|
|
f"Set default_slack_channel to None for org_id={org_id} slack_channel_id={slack_channel_id} since slack_channel is arcived or deleted"
|
|
)
|
|
org.default_slack_channel = None
|
|
orgs_to_clean_default_slack_channel.append(org)
|
|
|
|
# The channel no longer exists, so update any integration routes (ie. ChannelFilter) or schedules
|
|
# that reference it
|
|
ChannelFilter.objects.filter(
|
|
alert_receive_channel__organization=org,
|
|
slack_channel__slack_id=slack_channel_id,
|
|
).update(slack_channel=None)
|
|
|
|
OnCallSchedule.objects.filter(
|
|
organization_id=org_id,
|
|
slack_channel__slack_id=slack_channel_id,
|
|
).update(slack_channel=None)
|
|
|
|
Organization.objects.bulk_update(orgs_to_clean_default_slack_channel, ["default_slack_channel"], batch_size=5000)
|