oncall-engine/engine/apps/telegram/client.py
Alexander Cherepanov ec028eb9d9
Telegram long polling (#2250)
# What this PR does

Runs Telegram long polling to get updates. 
It's enabled by setting `FEATURE_TELEGRAM_LONG_POLLING_ENABLED=True`.
That will disable webhook and run separate deployment for telegram long
polling.

Telegram long polling is not very HA mode, but it does not need to
expose webhook url to internet and simplifies telegram integration.

## Which issue(s) this PR fixes

closes #561 

## Checklist

- [x] Unit, integration, and e2e (if applicable) tests updated
- [x] Documentation added (or `pr:no public docs` PR label added if not
required)
- [x] `CHANGELOG.md` updated (or `pr:no changelog` PR label added if not
required)
2023-08-24 09:12:24 +02:00

162 lines
6.1 KiB
Python

import logging
from typing import Optional, Tuple, Union
from telegram import Bot, InlineKeyboardMarkup, Message, ParseMode
from telegram.error import BadRequest, InvalidToken, Unauthorized
from telegram.utils.request import Request
from apps.alerts.models import AlertGroup
from apps.base.utils import live_settings
from apps.telegram.models import TelegramMessage
from apps.telegram.renderers.keyboard import TelegramKeyboardRenderer
from apps.telegram.renderers.message import TelegramMessageRenderer
from common.api_helpers.utils import create_engine_url
logger = logging.getLogger(__name__)
class TelegramClient:
ALLOWED_UPDATES = ("message", "callback_query")
PARSE_MODE = ParseMode.HTML
def __init__(self, token: Optional[str] = None):
self.token = token or live_settings.TELEGRAM_TOKEN
if self.token is None:
raise InvalidToken()
@property
def api_client(self) -> Bot:
return Bot(self.token, request=Request(read_timeout=15))
def is_chat_member(self, chat_id: Union[int, str]) -> bool:
try:
self.api_client.get_chat(chat_id=chat_id)
return True
except Unauthorized:
return False
def register_webhook(self, webhook_url: Optional[str] = None) -> None:
webhook_url = webhook_url or create_engine_url("/telegram/", override_base=live_settings.TELEGRAM_WEBHOOK_HOST)
# avoid unnecessary set_webhook calls to make sure Telegram rate limits are not exceeded
webhook_info = self.api_client.get_webhook_info()
if webhook_info.url == webhook_url:
return
self.api_client.set_webhook(webhook_url, allowed_updates=self.ALLOWED_UPDATES)
def delete_webhook(self):
webhook_info = self.api_client.get_webhook_info()
if webhook_info.url == "":
return
self.api_client.delete_webhook()
def send_message(
self,
chat_id: Union[int, str],
message_type: int,
alert_group: AlertGroup,
reply_to_message_id: Optional[int] = None,
) -> TelegramMessage:
text, keyboard = self._get_message_and_keyboard(message_type=message_type, alert_group=alert_group)
raw_message = self.send_raw_message(
chat_id=chat_id, text=text, keyboard=keyboard, reply_to_message_id=reply_to_message_id
)
message = TelegramMessage.create_from_message(
message=raw_message, alert_group=alert_group, message_type=message_type
)
return message
def send_raw_message(
self,
chat_id: Union[int, str],
text: str,
keyboard: Optional[InlineKeyboardMarkup] = None,
reply_to_message_id: Optional[int] = None,
) -> Message:
try:
message = self.api_client.send_message(
chat_id=chat_id,
text=text,
reply_markup=keyboard,
reply_to_message_id=reply_to_message_id,
parse_mode=self.PARSE_MODE,
disable_web_page_preview=False,
)
except BadRequest as e:
logger.warning("Telegram BadRequest: {}".format(e.message))
raise
return message
def edit_message(self, message: TelegramMessage) -> TelegramMessage:
text, keyboard = self._get_message_and_keyboard(
message_type=message.message_type, alert_group=message.alert_group
)
self.edit_raw_message(chat_id=message.chat_id, message_id=message.message_id, text=text, keyboard=keyboard)
return message
def edit_raw_message(
self,
chat_id: Union[int, str],
message_id: Union[int, str],
text: str,
keyboard: Optional[InlineKeyboardMarkup] = None,
) -> Union[Message, bool]:
return self.api_client.edit_message_text(
chat_id=chat_id,
message_id=message_id,
text=text,
reply_markup=keyboard,
parse_mode=self.PARSE_MODE,
disable_web_page_preview=False,
)
@staticmethod
def _get_message_and_keyboard(
message_type: int, alert_group: AlertGroup
) -> Tuple[str, Optional[InlineKeyboardMarkup]]:
message_renderer = TelegramMessageRenderer(alert_group=alert_group)
keyboard_renderer = TelegramKeyboardRenderer(alert_group=alert_group)
if message_type == TelegramMessage.ALERT_GROUP_MESSAGE:
text = message_renderer.render_alert_group_message()
keyboard = None
elif message_type == TelegramMessage.LOG_MESSAGE:
text = message_renderer.render_log_message()
keyboard = None
elif message_type == TelegramMessage.ACTIONS_MESSAGE:
text = message_renderer.render_actions_message()
keyboard = keyboard_renderer.render_actions_keyboard()
elif message_type == TelegramMessage.PERSONAL_MESSAGE:
text = message_renderer.render_personal_message()
keyboard = keyboard_renderer.render_actions_keyboard()
elif message_type == TelegramMessage.FORMATTING_ERROR:
text = message_renderer.render_formatting_error_message()
keyboard = None
elif message_type in (
TelegramMessage.LINK_TO_CHANNEL_MESSAGE,
TelegramMessage.LINK_TO_CHANNEL_MESSAGE_WITHOUT_TITLE,
):
alert_group_message = alert_group.telegram_messages.filter(
chat_id__startswith="-",
message_type__in=[TelegramMessage.ALERT_GROUP_MESSAGE, TelegramMessage.FORMATTING_ERROR],
).first()
if alert_group_message is None:
raise Exception("No alert group message found, probably it is not saved to database yet")
include_title = message_type == TelegramMessage.LINK_TO_CHANNEL_MESSAGE
link = alert_group_message.link
text = message_renderer.render_link_to_channel_message(include_title=include_title)
keyboard = keyboard_renderer.render_link_to_channel_keyboard(link=link)
else:
raise Exception(f"_get_message_and_keyboard with type {message_type} is not implemented")
return text, keyboard