diff --git a/CHANGELOG.md b/CHANGELOG.md index 6bbec5f3..4cb0465a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - Fix duplicate orders on routes and escalation policies by @vadimkerr ([#2568](https://github.com/grafana/oncall/pull/2568)) +- Fixed Slack channels sync by @Ferril ([#2571](https://github.com/grafana/oncall/pull/2571)) - Fixed rendering of slack connection errors ([#2526](https://github.com/grafana/oncall/pull/2526)) ## v1.3.14 (2023-07-17) diff --git a/engine/apps/slack/slack_client/slack_client.py b/engine/apps/slack/slack_client/slack_client.py index 7cac9ab6..e2ca21b5 100644 --- a/engine/apps/slack/slack_client/slack_client.py +++ b/engine/apps/slack/slack_client/slack_client.py @@ -1,4 +1,5 @@ import logging +from typing import Optional, Tuple from django.apps import apps from django.utils import timezone @@ -54,6 +55,39 @@ class SlackClientWithErrorHandling(SlackClient): return cumulative_response + def paginated_api_call_with_ratelimit(self, *args, **kwargs) -> Tuple[dict, Optional[str], bool]: + """ + This method do paginated api call and handle slack rate limit error in order to return collected data and have + ability to continue doing paginated requests from the last successful cursor. Return last successful cursor + instead of next cursor to avoid data loss during delay time + """ + # It's a key from response which is paginated. For example "users" or "channels" + listed_key = kwargs["paginated_key"] + cumulative_response = {} + cursor = kwargs.get("cursor") + rate_limited = False + + try: + response = self.api_call(*args, **kwargs) + cumulative_response = response + cursor = response["response_metadata"]["next_cursor"] + + while ( + "response_metadata" in response + and "next_cursor" in response["response_metadata"] + and response["response_metadata"]["next_cursor"] != "" + ): + next_cursor = response["response_metadata"]["next_cursor"] + kwargs["cursor"] = next_cursor + response = self.api_call(*args, **kwargs) + cumulative_response[listed_key] += response[listed_key] + cursor = next_cursor + + except SlackAPIRateLimitException: + rate_limited = True + + return cumulative_response, cursor, rate_limited + def api_call(self, *args, **kwargs): DynamicSetting = apps.get_model("base", "DynamicSetting") diff --git a/engine/apps/slack/tasks.py b/engine/apps/slack/tasks.py index 1b5d006b..7a934f0f 100644 --- a/engine/apps/slack/tasks.py +++ b/engine/apps/slack/tasks.py @@ -1,6 +1,8 @@ import logging import random +from typing import Optional +from celery import uuid as celery_uuid from celery.utils.log import get_task_logger from django.apps import apps from django.conf import settings @@ -13,7 +15,11 @@ from apps.slack.constants import CACHE_UPDATE_INCIDENT_SLACK_MESSAGE_LIFETIME, S from apps.slack.scenarios.scenario_step import ScenarioStep from apps.slack.slack_client import SlackClientWithErrorHandling from apps.slack.slack_client.exceptions import SlackAPIException, SlackAPITokenException -from apps.slack.utils import get_cache_key_update_incident_slack_message, post_message_to_channel +from apps.slack.utils import ( + get_cache_key_update_incident_slack_message, + get_populate_slack_channel_task_id_key, + post_message_to_channel, +) from common.custom_celery_tasks import shared_dedicated_queue_retry_task from common.utils import batch_queryset @@ -501,22 +507,60 @@ def populate_slack_channels(): # increase delay to prevent slack ratelimit if counter % 8 == 0: delay += 60 - populate_slack_channels_for_team.apply_async((slack_team_identity.pk,), countdown=delay) + start_populate_slack_channels_for_team(slack_team_identity.pk, delay) + + +def start_populate_slack_channels_for_team( + slack_team_identity_id: int, delay: int, cursor: Optional[str] = None +) -> None: + # save active task id in cache to make only one populate task active per team + task_id = celery_uuid() + cache_key = get_populate_slack_channel_task_id_key(slack_team_identity_id) + cache.set(cache_key, task_id) + populate_slack_channels_for_team.apply_async((slack_team_identity_id, cursor), countdown=delay, task_id=task_id) @shared_dedicated_queue_retry_task( autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None ) -def populate_slack_channels_for_team(slack_team_identity_id): +def populate_slack_channels_for_team(slack_team_identity_id: int, cursor: Optional[str] = None) -> None: + """ + Make paginated request to get slack channels. On ratelimit - update info for got channels, save collected channels + ids in cache and restart the task with the last successful pagination cursor to avoid any data loss during delay + time. + """ SlackTeamIdentity = apps.get_model("slack", "SlackTeamIdentity") SlackChannel = apps.get_model("slack", "SlackChannel") slack_team_identity = SlackTeamIdentity.objects.get(pk=slack_team_identity_id) sc = SlackClientWithErrorHandling(slack_team_identity.bot_access_token) + active_task_id_key = get_populate_slack_channel_task_id_key(slack_team_identity_id) + active_task_id = cache.get(active_task_id_key) + current_task_id = populate_slack_channels_for_team.request.id + if active_task_id and active_task_id != current_task_id: + logger.info( + f"Stop populate_slack_channels_for_team for SlackTeamIdentity pk: {slack_team_identity_id} due to " + f"incorrect active task id" + ) + return + collected_channels_key = f"SLACK_CHANNELS_TEAM_{slack_team_identity_id}" + collected_channels = cache.get(collected_channels_key, set()) + if cursor and not collected_channels: + # means the task was restarted after rate limit exception but collected channels were lost + logger.warning( + f"Restart slack channel sync for SlackTeamIdentity pk: {slack_team_identity_id} due to empty " + f"'collected_channels' after rate limit" + ) + delay = 60 + return start_populate_slack_channels_for_team(slack_team_identity_id, delay) try: - response = sc.paginated_api_call( - "conversations.list", types="public_channel,private_channel", paginated_key="channels", limit=1000 + response, cursor, rate_limited = sc.paginated_api_call_with_ratelimit( + "conversations.list", + types="public_channel,private_channel", + paginated_key="channels", + limit=1000, + cursor=cursor, ) except SlackAPITokenException as e: logger.info(f"token revoked\n{e}") @@ -525,20 +569,11 @@ def populate_slack_channels_for_team(slack_team_identity_id): logger.warning( f"invalid_auth while populating slack channels, SlackTeamIdentity pk: {slack_team_identity.pk}" ) - # in some cases slack rate limit error looks like 'rate_limited', in some - 'ratelimited', be aware - elif e.response["error"] == "rate_limited" or e.response["error"] == "ratelimited": - delay = random.randint(5, 25) * 60 - logger.warning( - f"'conversations.list' slack api error: rate_limited. SlackTeamIdentity pk: {slack_team_identity.pk}." - f"Delay populate_slack_channels_for_team task by {delay//60} min." - ) - return populate_slack_channels_for_team.apply_async((slack_team_identity_id,), countdown=delay) elif e.response["error"] == "missing_scope": logger.warning( f"conversations.list' slack api error: missing_scope. " f"SlackTeamIdentity pk: {slack_team_identity.pk}.\n{e}" ) - return else: logger.error(f"'conversations.list' slack api error. SlackTeamIdentity pk: {slack_team_identity.pk}\n{e}") raise e @@ -546,6 +581,8 @@ def populate_slack_channels_for_team(slack_team_identity_id): today = timezone.now().date() slack_channels = {channel["id"]: channel for channel in response["channels"]} + collected_channels.update(slack_channels.keys()) + existing_channels = slack_team_identity.cached_channels.all() existing_channel_ids = set(existing_channels.values_list("slack_id", flat=True)) @@ -564,12 +601,8 @@ def populate_slack_channels_for_team(slack_team_identity_id): ) SlackChannel.objects.bulk_create(channels_to_create, batch_size=5000) - # delete excess channels - channel_ids_to_delete = existing_channel_ids - slack_channels.keys() - slack_team_identity.cached_channels.filter(slack_id__in=channel_ids_to_delete).delete() - # update existing channels - channels_to_update = existing_channels.exclude(slack_id__in=channel_ids_to_delete) + channels_to_update = existing_channels.filter(slack_id__in=slack_channels.keys()).exclude(last_populated=today) for channel in channels_to_update: slack_channel = slack_channels[channel.slack_id] channel.name = slack_channel["name"] @@ -580,6 +613,21 @@ def populate_slack_channels_for_team(slack_team_identity_id): SlackChannel.objects.bulk_update( channels_to_update, fields=("name", "is_archived", "is_shared", "last_populated"), batch_size=5000 ) + if rate_limited: + # save collected channels ids to cache and restart the task with the current pagination cursor + cache.set(collected_channels_key, collected_channels, timeout=3600) + delay = random.randint(1, 3) * 60 + logger.warning( + f"'conversations.list' slack api error: rate_limited. SlackTeamIdentity pk: {slack_team_identity_id}. " + f"Delay populate_slack_channels_for_team task for {delay//60} min." + ) + start_populate_slack_channels_for_team(slack_team_identity_id, delay, cursor) + else: + # delete excess channels + assert collected_channels + channel_ids_to_delete = existing_channel_ids - collected_channels + slack_team_identity.cached_channels.filter(slack_id__in=channel_ids_to_delete).delete() + cache.delete(collected_channels_key) @shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=0) diff --git a/engine/apps/slack/tests/test_populate_slack_channels.py b/engine/apps/slack/tests/test_populate_slack_channels.py index 42f031ae..bb7a1df8 100644 --- a/engine/apps/slack/tests/test_populate_slack_channels.py +++ b/engine/apps/slack/tests/test_populate_slack_channels.py @@ -23,14 +23,21 @@ def test_populate_slack_channels_for_team(make_organization_with_slack_team_iden ) ) - response = { - "channels": ( - {"id": "C111111111", "name": "test1", "is_archived": False, "is_shared": False}, - {"id": "C222222222", "name": "test_changed_name", "is_archived": False, "is_shared": True}, - {"id": "C333333333", "name": "test3", "is_archived": False, "is_shared": True}, - ) - } - with patch.object(SlackClientWithErrorHandling, "paginated_api_call", return_value=response): + response, cursor, rate_limited = ( + { + "channels": ( + {"id": "C111111111", "name": "test1", "is_archived": False, "is_shared": False}, + {"id": "C222222222", "name": "test_changed_name", "is_archived": False, "is_shared": True}, + {"id": "C333333333", "name": "test3", "is_archived": False, "is_shared": True}, + ) + }, + None, + False, + ) + + with patch.object( + SlackClientWithErrorHandling, "paginated_api_call_with_ratelimit", return_value=(response, cursor, rate_limited) + ): populate_slack_channels_for_team(slack_team_identity.pk) channels = slack_team_identity.cached_channels.all() @@ -45,3 +52,123 @@ def test_populate_slack_channels_for_team(make_organization_with_slack_team_iden assert second_channel.name == "test_changed_name" assert not channels.filter(last_populated__lte=yesterday).exists() + + +@patch("apps.slack.tasks.start_populate_slack_channels_for_team") +@pytest.mark.django_db +def test_populate_slack_channels_for_team_ratelimit( + mocked_start_populate_slack_channels_for_team, + make_organization_with_slack_team_identity, + make_slack_channel, +): + organization, slack_team_identity = make_organization_with_slack_team_identity() + + yesterday = (timezone.now() - timezone.timedelta(days=1)).date() + today = timezone.now().date() + + _ = tuple( + make_slack_channel( + slack_team_identity=slack_team_identity, slack_id=slack_id, name=name, last_populated=yesterday + ) + for slack_id, name in ( + ("C111111111", "test1"), + ("C222222222", "test2"), + ("C444444444", "test4"), + ) + ) + # first response with rate limit error + response_1, cursor_1, rate_limited_1 = ( + {"channels": ({"id": "C111111111", "name": "test1", "is_archived": False, "is_shared": False},)}, + "TESTCURSOR1", + True, + ) + + # second response with rate limit error + response_2, cursor_2, rate_limited_2 = ( + { + "channels": ( + {"id": "C111111111", "name": "test1", "is_archived": False, "is_shared": False}, + {"id": "C222222222", "name": "test_changed_name", "is_archived": False, "is_shared": True}, + ) + }, + "TESTCURSOR2", + True, + ) + + # third response without rate limit error + response_3, cursor_3, rate_limited_3 = ( + { + "channels": ( + {"id": "C222222222", "name": "test_changed_name", "is_archived": False, "is_shared": True}, + {"id": "C333333333", "name": "test3", "is_archived": False, "is_shared": True}, + ) + }, + "", + False, + ) + # these channels should exist after finishing populate_slack_channels_for_team + expected_channel_ids = {"C111111111", "C222222222", "C333333333"} + + with patch.object( + SlackClientWithErrorHandling, + "paginated_api_call_with_ratelimit", + return_value=(response_1, cursor_1, rate_limited_1), + ): + populate_slack_channels_for_team(slack_team_identity.pk) + + # expected only one channel to update and no channel to delete + # start_populate_slack_channels_for_team should be called + channels = slack_team_identity.cached_channels.all() + channel_1 = channels.get(slack_id="C111111111") + assert channel_1.last_populated == today + + channel_2 = channels.get(slack_id="C222222222") + assert channel_2.last_populated == yesterday + + assert channels.filter(slack_id="C444444444").exists() + + assert mocked_start_populate_slack_channels_for_team.called + assert mocked_start_populate_slack_channels_for_team.call_count == 1 + + with patch.object( + SlackClientWithErrorHandling, + "paginated_api_call_with_ratelimit", + return_value=(response_2, cursor_2, rate_limited_2), + ): + populate_slack_channels_for_team(slack_team_identity.pk, cursor_1) + + # expected another one channel to update and no channel to delete + # start_populate_slack_channels_for_team should be called + channels = slack_team_identity.cached_channels.all() + + channel_2 = channels.get(slack_id="C222222222") + assert channel_2.last_populated == today + assert channel_2.name == "test_changed_name" + + assert not channels.filter(slack_id="C333333333").exists() + assert channels.filter(slack_id="C444444444").exists() + + assert mocked_start_populate_slack_channels_for_team.called + assert mocked_start_populate_slack_channels_for_team.call_count == 2 + + with patch.object( + SlackClientWithErrorHandling, + "paginated_api_call_with_ratelimit", + return_value=(response_3, cursor_3, rate_limited_3), + ): + populate_slack_channels_for_team(slack_team_identity.pk, cursor_1) + + # expected one new channel and one deleted channel. List of channel ids in response and in db should be the same + # start_populate_slack_channels_for_team should NOT be called + channels = slack_team_identity.cached_channels.all() + + actual_channel_ids = set(channels.values_list("slack_id", flat=True)) + assert actual_channel_ids == expected_channel_ids + + assert not channels.filter(slack_id="C444444444").exists() + + channel_2 = channels.get(slack_id="C222222222") + assert channel_2.name == "test_changed_name" + + assert not channels.filter(last_populated__lte=yesterday).exists() + assert mocked_start_populate_slack_channels_for_team.call_count == 2 diff --git a/engine/apps/slack/utils.py b/engine/apps/slack/utils.py index 8e343c8b..fb76e677 100644 --- a/engine/apps/slack/utils.py +++ b/engine/apps/slack/utils.py @@ -65,3 +65,7 @@ def format_datetime_to_slack(timestamp, format="date_short"): def get_cache_key_update_incident_slack_message(alert_group_pk): CACHE_KEY_PREFIX = "update_incident_slack_message" return f"{CACHE_KEY_PREFIX}_{alert_group_pk}" + + +def get_populate_slack_channel_task_id_key(slack_team_identity_id): + return f"SLACK_CHANNELS_TASK_ID_TEAM_{slack_team_identity_id}"