oncall-engine/engine/apps/telegram/client.py
Joey Orlando 0c96427cfc
fix apps.telegram.tasks.send_log_and_actions_message retrying tasks (#4851)
# What this PR does

It _appears_ like Telegram may have changed one of the error messages
they return for `telegram.error.BadRequest`. This _may_ be causing us to
infinitely retry some of these tasks.

Previously we were checking for two variants of the same type of error
message:
- "Message to reply not found"
- "Replied message not found"

_However_, if I search for the following [in the
logs](https://ops.grafana-ops.net/goto/hMgBb8CSR?orgId=1):
```logql
{namespace="amixr-prod"} |~ `(Message to be replied not found|Message to reply not found|Replied message not found)`
````
I _only_ see references to "Message to be replied not found". I have
updated references to the former to this new error log message we are
seeing.

Also:
- deduplicate some of the words we check for in
`telegram.error.BadRequest` and `telegram.error.Unauthorized` into
`apps.telegram.client.TelegramClient.BadRequestMessage` and
`apps.telegram.client.TelegramClient.UnauthorizedMessage` respectively
- deduplicate some of the wording we use in the `reason` arg passed to
`TelegramToUserConnector.create_telegram_notification_error` into
`apps.telegram.models.connectors.personal.TelegramToUserConnector.NotificationErrorReason`
- standardize how we check the `message` attribute of
`telegram.error.TelegramError`s into a new `error_message_is` static
method on `apps.telegram.client.TelegramClient`
- previously we would check these error messages in two different ways:
  ```python3
  # style 1
  if "error message to check" in e.message:
    # do something

  # style 2
  if error.message == "error message to check":
    # do something
  ```

## Which issue(s) this PR closes

Closes https://github.com/grafana/oncall-private/issues/2868

## Checklist

- [x] Unit, integration, and e2e (if applicable) tests updated
- [x] Documentation added (or `pr:no public docs` PR label added if not
required)
- [x] Added the relevant release notes label (see labels prefixed w/
`release:`). These labels dictate how your PR will
    show up in the autogenerated release notes.
2024-08-19 14:05:40 -04:00

189 lines
7.3 KiB
Python

import logging
from typing import Optional, Tuple, Union
from django.conf import settings
from telegram import Bot, InlineKeyboardMarkup, Message, ParseMode
from telegram.error import BadRequest, InvalidToken, TelegramError, Unauthorized
from telegram.utils.request import Request
from apps.alerts.models import AlertGroup
from apps.base.utils import live_settings
from apps.telegram.exceptions import AlertGroupTelegramMessageDoesNotExist
from apps.telegram.models import TelegramMessage
from apps.telegram.renderers.keyboard import TelegramKeyboardRenderer
from apps.telegram.renderers.message import TelegramMessageRenderer
from common.api_helpers.utils import create_engine_url
logger = logging.getLogger(__name__)
class TelegramClient:
ALLOWED_UPDATES = ("message", "callback_query")
PARSE_MODE = ParseMode.HTML
def __init__(self, token: Optional[str] = None):
self.token = token or live_settings.TELEGRAM_TOKEN
if self.token is None:
raise InvalidToken()
class BadRequestMessage:
CHAT_NOT_FOUND = "Chat not found"
MESSAGE_IS_NOT_MODIFIED = "Message is not modified"
MESSAGE_TO_EDIT_NOT_FOUND = "Message to edit not found"
NEED_ADMIN_RIGHTS_IN_THE_CHANNEL = "Need administrator rights in the channel chat"
MESSAGE_TO_BE_REPLIED_NOT_FOUND = "Message to be replied not found"
class UnauthorizedMessage:
BOT_WAS_BLOCKED_BY_USER = "Forbidden: bot was blocked by the user"
INVALID_TOKEN = "Invalid token"
USER_IS_DEACTIVATED = "Forbidden: user is deactivated"
@property
def api_client(self) -> Bot:
return Bot(self.token, request=Request(read_timeout=15))
def is_chat_member(self, chat_id: Union[int, str]) -> bool:
try:
self.api_client.get_chat(chat_id=chat_id)
return True
except Unauthorized:
return False
def register_webhook(self, webhook_url: Optional[str] = None) -> None:
if settings.IS_OPEN_SOURCE:
webhook_url = webhook_url or create_engine_url(
"/telegram/", override_base=live_settings.TELEGRAM_WEBHOOK_HOST
)
else:
webhook_url = webhook_url or create_engine_url(
"api/v3/webhook/telegram/", override_base=live_settings.TELEGRAM_WEBHOOK_HOST
)
# avoid unnecessary set_webhook calls to make sure Telegram rate limits are not exceeded
webhook_info = self.api_client.get_webhook_info()
if webhook_info.url == webhook_url:
return
self.api_client.set_webhook(webhook_url, allowed_updates=self.ALLOWED_UPDATES)
def delete_webhook(self):
webhook_info = self.api_client.get_webhook_info()
if webhook_info.url == "":
return
self.api_client.delete_webhook()
def send_message(
self,
chat_id: Union[int, str],
message_type: int,
alert_group: AlertGroup,
reply_to_message_id: Optional[int] = None,
) -> TelegramMessage:
text, keyboard = self._get_message_and_keyboard(message_type=message_type, alert_group=alert_group)
raw_message = self.send_raw_message(
chat_id=chat_id, text=text, keyboard=keyboard, reply_to_message_id=reply_to_message_id
)
message = TelegramMessage.create_from_message(
message=raw_message, alert_group=alert_group, message_type=message_type
)
return message
def send_raw_message(
self,
chat_id: Union[int, str],
text: str,
keyboard: Optional[InlineKeyboardMarkup] = None,
reply_to_message_id: Optional[int] = None,
) -> Message:
try:
message = self.api_client.send_message(
chat_id=chat_id,
text=text,
reply_markup=keyboard,
reply_to_message_id=reply_to_message_id,
parse_mode=self.PARSE_MODE,
disable_web_page_preview=False,
)
except BadRequest as e:
logger.warning(f"Telegram BadRequest: {e.message}")
raise
return message
def edit_message(self, message: TelegramMessage) -> TelegramMessage:
text, keyboard = self._get_message_and_keyboard(
message_type=message.message_type, alert_group=message.alert_group
)
self.edit_raw_message(chat_id=message.chat_id, message_id=message.message_id, text=text, keyboard=keyboard)
return message
def edit_raw_message(
self,
chat_id: Union[int, str],
message_id: Union[int, str],
text: str,
keyboard: Optional[InlineKeyboardMarkup] = None,
) -> Union[Message, bool]:
return self.api_client.edit_message_text(
chat_id=chat_id,
message_id=message_id,
text=text,
reply_markup=keyboard,
parse_mode=self.PARSE_MODE,
disable_web_page_preview=False,
)
@staticmethod
def _get_message_and_keyboard(
message_type: int, alert_group: AlertGroup
) -> Tuple[str, Optional[InlineKeyboardMarkup]]:
message_renderer = TelegramMessageRenderer(alert_group=alert_group)
keyboard_renderer = TelegramKeyboardRenderer(alert_group=alert_group)
if message_type == TelegramMessage.ALERT_GROUP_MESSAGE:
text = message_renderer.render_alert_group_message()
keyboard = None
elif message_type == TelegramMessage.LOG_MESSAGE:
text = message_renderer.render_log_message()
keyboard = None
elif message_type == TelegramMessage.ACTIONS_MESSAGE:
text = message_renderer.render_actions_message()
keyboard = keyboard_renderer.render_actions_keyboard()
elif message_type == TelegramMessage.PERSONAL_MESSAGE:
text = message_renderer.render_personal_message()
keyboard = keyboard_renderer.render_actions_keyboard()
elif message_type == TelegramMessage.FORMATTING_ERROR:
text = message_renderer.render_formatting_error_message()
keyboard = None
elif message_type in (
TelegramMessage.LINK_TO_CHANNEL_MESSAGE,
TelegramMessage.LINK_TO_CHANNEL_MESSAGE_WITHOUT_TITLE,
):
alert_group_message = alert_group.telegram_messages.filter(
chat_id__startswith="-",
message_type__in=[TelegramMessage.ALERT_GROUP_MESSAGE, TelegramMessage.FORMATTING_ERROR],
).first()
if alert_group_message is None:
raise AlertGroupTelegramMessageDoesNotExist(
f"No alert group message found, probably it is not saved to database yet, "
f"alert group: {alert_group.id}"
)
include_title = message_type == TelegramMessage.LINK_TO_CHANNEL_MESSAGE
link = alert_group_message.link
text = message_renderer.render_link_to_channel_message(include_title=include_title)
keyboard = keyboard_renderer.render_link_to_channel_keyboard(link=link)
else:
raise Exception(f"_get_message_and_keyboard with type {message_type} is not implemented")
return text, keyboard
@staticmethod
def error_message_is(error: TelegramError, messages: list[str]) -> bool:
return error.message in messages