Fix slack channels sync (#2571)

# What this PR does
- Fixes issue with slack channels sync periodic tasks when we get slack
rate limit exception.
- Adds check for active task id to avoid starting multiple tasks for one
slack team.

Collecting channels for slack for some teams causes rate limit
exception, which causes the task to restart and start collecting slack
channels from the beginning. This PR adds new paginated api call and
refactors the slack channel sync task to continue collect data after
rate limit from the step before it was raised using `cursor` value from
the slack response.


## Checklist

- [x] Unit, integration, and e2e (if applicable) tests updated
- [ ] Documentation added (or `pr:no public docs` PR label added if not
required)
- [x] `CHANGELOG.md` updated (or `pr:no changelog` PR label added if not
required)

---------

Co-authored-by: Joey Orlando <joey.orlando@grafana.com>
This commit is contained in:
Yulya Artyukhina 2023-07-19 09:17:21 +02:00 committed by GitHub
parent 239729d654
commit adfb496a81
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 241 additions and 27 deletions

View file

@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fix duplicate orders on routes and escalation policies by @vadimkerr ([#2568](https://github.com/grafana/oncall/pull/2568))
- Fixed Slack channels sync by @Ferril ([#2571](https://github.com/grafana/oncall/pull/2571))
- Fixed rendering of slack connection errors ([#2526](https://github.com/grafana/oncall/pull/2526))
## v1.3.14 (2023-07-17)

View file

@ -1,4 +1,5 @@
import logging
from typing import Optional, Tuple
from django.apps import apps
from django.utils import timezone
@ -54,6 +55,39 @@ class SlackClientWithErrorHandling(SlackClient):
return cumulative_response
def paginated_api_call_with_ratelimit(self, *args, **kwargs) -> Tuple[dict, Optional[str], bool]:
"""
This method do paginated api call and handle slack rate limit error in order to return collected data and have
ability to continue doing paginated requests from the last successful cursor. Return last successful cursor
instead of next cursor to avoid data loss during delay time
"""
# It's a key from response which is paginated. For example "users" or "channels"
listed_key = kwargs["paginated_key"]
cumulative_response = {}
cursor = kwargs.get("cursor")
rate_limited = False
try:
response = self.api_call(*args, **kwargs)
cumulative_response = response
cursor = response["response_metadata"]["next_cursor"]
while (
"response_metadata" in response
and "next_cursor" in response["response_metadata"]
and response["response_metadata"]["next_cursor"] != ""
):
next_cursor = response["response_metadata"]["next_cursor"]
kwargs["cursor"] = next_cursor
response = self.api_call(*args, **kwargs)
cumulative_response[listed_key] += response[listed_key]
cursor = next_cursor
except SlackAPIRateLimitException:
rate_limited = True
return cumulative_response, cursor, rate_limited
def api_call(self, *args, **kwargs):
DynamicSetting = apps.get_model("base", "DynamicSetting")

View file

@ -1,6 +1,8 @@
import logging
import random
from typing import Optional
from celery import uuid as celery_uuid
from celery.utils.log import get_task_logger
from django.apps import apps
from django.conf import settings
@ -13,7 +15,11 @@ from apps.slack.constants import CACHE_UPDATE_INCIDENT_SLACK_MESSAGE_LIFETIME, S
from apps.slack.scenarios.scenario_step import ScenarioStep
from apps.slack.slack_client import SlackClientWithErrorHandling
from apps.slack.slack_client.exceptions import SlackAPIException, SlackAPITokenException
from apps.slack.utils import get_cache_key_update_incident_slack_message, post_message_to_channel
from apps.slack.utils import (
get_cache_key_update_incident_slack_message,
get_populate_slack_channel_task_id_key,
post_message_to_channel,
)
from common.custom_celery_tasks import shared_dedicated_queue_retry_task
from common.utils import batch_queryset
@ -501,22 +507,60 @@ def populate_slack_channels():
# increase delay to prevent slack ratelimit
if counter % 8 == 0:
delay += 60
populate_slack_channels_for_team.apply_async((slack_team_identity.pk,), countdown=delay)
start_populate_slack_channels_for_team(slack_team_identity.pk, delay)
def start_populate_slack_channels_for_team(
slack_team_identity_id: int, delay: int, cursor: Optional[str] = None
) -> None:
# save active task id in cache to make only one populate task active per team
task_id = celery_uuid()
cache_key = get_populate_slack_channel_task_id_key(slack_team_identity_id)
cache.set(cache_key, task_id)
populate_slack_channels_for_team.apply_async((slack_team_identity_id, cursor), countdown=delay, task_id=task_id)
@shared_dedicated_queue_retry_task(
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def populate_slack_channels_for_team(slack_team_identity_id):
def populate_slack_channels_for_team(slack_team_identity_id: int, cursor: Optional[str] = None) -> None:
"""
Make paginated request to get slack channels. On ratelimit - update info for got channels, save collected channels
ids in cache and restart the task with the last successful pagination cursor to avoid any data loss during delay
time.
"""
SlackTeamIdentity = apps.get_model("slack", "SlackTeamIdentity")
SlackChannel = apps.get_model("slack", "SlackChannel")
slack_team_identity = SlackTeamIdentity.objects.get(pk=slack_team_identity_id)
sc = SlackClientWithErrorHandling(slack_team_identity.bot_access_token)
active_task_id_key = get_populate_slack_channel_task_id_key(slack_team_identity_id)
active_task_id = cache.get(active_task_id_key)
current_task_id = populate_slack_channels_for_team.request.id
if active_task_id and active_task_id != current_task_id:
logger.info(
f"Stop populate_slack_channels_for_team for SlackTeamIdentity pk: {slack_team_identity_id} due to "
f"incorrect active task id"
)
return
collected_channels_key = f"SLACK_CHANNELS_TEAM_{slack_team_identity_id}"
collected_channels = cache.get(collected_channels_key, set())
if cursor and not collected_channels:
# means the task was restarted after rate limit exception but collected channels were lost
logger.warning(
f"Restart slack channel sync for SlackTeamIdentity pk: {slack_team_identity_id} due to empty "
f"'collected_channels' after rate limit"
)
delay = 60
return start_populate_slack_channels_for_team(slack_team_identity_id, delay)
try:
response = sc.paginated_api_call(
"conversations.list", types="public_channel,private_channel", paginated_key="channels", limit=1000
response, cursor, rate_limited = sc.paginated_api_call_with_ratelimit(
"conversations.list",
types="public_channel,private_channel",
paginated_key="channels",
limit=1000,
cursor=cursor,
)
except SlackAPITokenException as e:
logger.info(f"token revoked\n{e}")
@ -525,20 +569,11 @@ def populate_slack_channels_for_team(slack_team_identity_id):
logger.warning(
f"invalid_auth while populating slack channels, SlackTeamIdentity pk: {slack_team_identity.pk}"
)
# in some cases slack rate limit error looks like 'rate_limited', in some - 'ratelimited', be aware
elif e.response["error"] == "rate_limited" or e.response["error"] == "ratelimited":
delay = random.randint(5, 25) * 60
logger.warning(
f"'conversations.list' slack api error: rate_limited. SlackTeamIdentity pk: {slack_team_identity.pk}."
f"Delay populate_slack_channels_for_team task by {delay//60} min."
)
return populate_slack_channels_for_team.apply_async((slack_team_identity_id,), countdown=delay)
elif e.response["error"] == "missing_scope":
logger.warning(
f"conversations.list' slack api error: missing_scope. "
f"SlackTeamIdentity pk: {slack_team_identity.pk}.\n{e}"
)
return
else:
logger.error(f"'conversations.list' slack api error. SlackTeamIdentity pk: {slack_team_identity.pk}\n{e}")
raise e
@ -546,6 +581,8 @@ def populate_slack_channels_for_team(slack_team_identity_id):
today = timezone.now().date()
slack_channels = {channel["id"]: channel for channel in response["channels"]}
collected_channels.update(slack_channels.keys())
existing_channels = slack_team_identity.cached_channels.all()
existing_channel_ids = set(existing_channels.values_list("slack_id", flat=True))
@ -564,12 +601,8 @@ def populate_slack_channels_for_team(slack_team_identity_id):
)
SlackChannel.objects.bulk_create(channels_to_create, batch_size=5000)
# delete excess channels
channel_ids_to_delete = existing_channel_ids - slack_channels.keys()
slack_team_identity.cached_channels.filter(slack_id__in=channel_ids_to_delete).delete()
# update existing channels
channels_to_update = existing_channels.exclude(slack_id__in=channel_ids_to_delete)
channels_to_update = existing_channels.filter(slack_id__in=slack_channels.keys()).exclude(last_populated=today)
for channel in channels_to_update:
slack_channel = slack_channels[channel.slack_id]
channel.name = slack_channel["name"]
@ -580,6 +613,21 @@ def populate_slack_channels_for_team(slack_team_identity_id):
SlackChannel.objects.bulk_update(
channels_to_update, fields=("name", "is_archived", "is_shared", "last_populated"), batch_size=5000
)
if rate_limited:
# save collected channels ids to cache and restart the task with the current pagination cursor
cache.set(collected_channels_key, collected_channels, timeout=3600)
delay = random.randint(1, 3) * 60
logger.warning(
f"'conversations.list' slack api error: rate_limited. SlackTeamIdentity pk: {slack_team_identity_id}. "
f"Delay populate_slack_channels_for_team task for {delay//60} min."
)
start_populate_slack_channels_for_team(slack_team_identity_id, delay, cursor)
else:
# delete excess channels
assert collected_channels
channel_ids_to_delete = existing_channel_ids - collected_channels
slack_team_identity.cached_channels.filter(slack_id__in=channel_ids_to_delete).delete()
cache.delete(collected_channels_key)
@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=0)

View file

@ -23,14 +23,21 @@ def test_populate_slack_channels_for_team(make_organization_with_slack_team_iden
)
)
response = {
"channels": (
{"id": "C111111111", "name": "test1", "is_archived": False, "is_shared": False},
{"id": "C222222222", "name": "test_changed_name", "is_archived": False, "is_shared": True},
{"id": "C333333333", "name": "test3", "is_archived": False, "is_shared": True},
)
}
with patch.object(SlackClientWithErrorHandling, "paginated_api_call", return_value=response):
response, cursor, rate_limited = (
{
"channels": (
{"id": "C111111111", "name": "test1", "is_archived": False, "is_shared": False},
{"id": "C222222222", "name": "test_changed_name", "is_archived": False, "is_shared": True},
{"id": "C333333333", "name": "test3", "is_archived": False, "is_shared": True},
)
},
None,
False,
)
with patch.object(
SlackClientWithErrorHandling, "paginated_api_call_with_ratelimit", return_value=(response, cursor, rate_limited)
):
populate_slack_channels_for_team(slack_team_identity.pk)
channels = slack_team_identity.cached_channels.all()
@ -45,3 +52,123 @@ def test_populate_slack_channels_for_team(make_organization_with_slack_team_iden
assert second_channel.name == "test_changed_name"
assert not channels.filter(last_populated__lte=yesterday).exists()
@patch("apps.slack.tasks.start_populate_slack_channels_for_team")
@pytest.mark.django_db
def test_populate_slack_channels_for_team_ratelimit(
mocked_start_populate_slack_channels_for_team,
make_organization_with_slack_team_identity,
make_slack_channel,
):
organization, slack_team_identity = make_organization_with_slack_team_identity()
yesterday = (timezone.now() - timezone.timedelta(days=1)).date()
today = timezone.now().date()
_ = tuple(
make_slack_channel(
slack_team_identity=slack_team_identity, slack_id=slack_id, name=name, last_populated=yesterday
)
for slack_id, name in (
("C111111111", "test1"),
("C222222222", "test2"),
("C444444444", "test4"),
)
)
# first response with rate limit error
response_1, cursor_1, rate_limited_1 = (
{"channels": ({"id": "C111111111", "name": "test1", "is_archived": False, "is_shared": False},)},
"TESTCURSOR1",
True,
)
# second response with rate limit error
response_2, cursor_2, rate_limited_2 = (
{
"channels": (
{"id": "C111111111", "name": "test1", "is_archived": False, "is_shared": False},
{"id": "C222222222", "name": "test_changed_name", "is_archived": False, "is_shared": True},
)
},
"TESTCURSOR2",
True,
)
# third response without rate limit error
response_3, cursor_3, rate_limited_3 = (
{
"channels": (
{"id": "C222222222", "name": "test_changed_name", "is_archived": False, "is_shared": True},
{"id": "C333333333", "name": "test3", "is_archived": False, "is_shared": True},
)
},
"",
False,
)
# these channels should exist after finishing populate_slack_channels_for_team
expected_channel_ids = {"C111111111", "C222222222", "C333333333"}
with patch.object(
SlackClientWithErrorHandling,
"paginated_api_call_with_ratelimit",
return_value=(response_1, cursor_1, rate_limited_1),
):
populate_slack_channels_for_team(slack_team_identity.pk)
# expected only one channel to update and no channel to delete
# start_populate_slack_channels_for_team should be called
channels = slack_team_identity.cached_channels.all()
channel_1 = channels.get(slack_id="C111111111")
assert channel_1.last_populated == today
channel_2 = channels.get(slack_id="C222222222")
assert channel_2.last_populated == yesterday
assert channels.filter(slack_id="C444444444").exists()
assert mocked_start_populate_slack_channels_for_team.called
assert mocked_start_populate_slack_channels_for_team.call_count == 1
with patch.object(
SlackClientWithErrorHandling,
"paginated_api_call_with_ratelimit",
return_value=(response_2, cursor_2, rate_limited_2),
):
populate_slack_channels_for_team(slack_team_identity.pk, cursor_1)
# expected another one channel to update and no channel to delete
# start_populate_slack_channels_for_team should be called
channels = slack_team_identity.cached_channels.all()
channel_2 = channels.get(slack_id="C222222222")
assert channel_2.last_populated == today
assert channel_2.name == "test_changed_name"
assert not channels.filter(slack_id="C333333333").exists()
assert channels.filter(slack_id="C444444444").exists()
assert mocked_start_populate_slack_channels_for_team.called
assert mocked_start_populate_slack_channels_for_team.call_count == 2
with patch.object(
SlackClientWithErrorHandling,
"paginated_api_call_with_ratelimit",
return_value=(response_3, cursor_3, rate_limited_3),
):
populate_slack_channels_for_team(slack_team_identity.pk, cursor_1)
# expected one new channel and one deleted channel. List of channel ids in response and in db should be the same
# start_populate_slack_channels_for_team should NOT be called
channels = slack_team_identity.cached_channels.all()
actual_channel_ids = set(channels.values_list("slack_id", flat=True))
assert actual_channel_ids == expected_channel_ids
assert not channels.filter(slack_id="C444444444").exists()
channel_2 = channels.get(slack_id="C222222222")
assert channel_2.name == "test_changed_name"
assert not channels.filter(last_populated__lte=yesterday).exists()
assert mocked_start_populate_slack_channels_for_team.call_count == 2

View file

@ -65,3 +65,7 @@ def format_datetime_to_slack(timestamp, format="date_short"):
def get_cache_key_update_incident_slack_message(alert_group_pk):
CACHE_KEY_PREFIX = "update_incident_slack_message"
return f"{CACHE_KEY_PREFIX}_{alert_group_pk}"
def get_populate_slack_channel_task_id_key(slack_team_identity_id):
return f"SLACK_CHANNELS_TASK_ID_TEAM_{slack_team_identity_id}"