patch 0015ec4a3cf416b0e597b5f97681b4f443977e37 Author: E. Bosch Date: Tue Feb 8 02:04:55 CET 2022 * telegram: Improve webpage (media) handler Add suport for webpage update after webpending diff -rN -u old-irgramd/telegram.py new-irgramd/telegram.py --- old-irgramd/telegram.py 2024-11-22 21:22:39.052123140 +0100 +++ new-irgramd/telegram.py 2024-11-22 21:22:39.052123140 +0100 @@ -10,7 +10,7 @@ from include import CHAN_MAX_LENGHT, NICK_MAX_LENGTH from irc import IRCUser -from utils import sanitize_filename +from utils import sanitize_filename, remove_slash, remove_http_s # Constants @@ -39,6 +39,7 @@ self.tg_username = None self.channels_date = {} self.mid = mesg_id('0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!#$%+./_~') + self.webpending = {} async def initialize_telegram(self): # Setup media folder @@ -63,6 +64,7 @@ # Register Telegram callbacks callbacks = ( (self.handle_telegram_message , telethon.events.NewMessage), + (self.handle_raw, telethon.events.Raw), (self.handle_telegram_chat_action, telethon.events.ChatAction), ) for handler, event in callbacks: @@ -241,7 +243,13 @@ def get_entity_type(self, entity): return type(entity).__name__ - async def handle_telegram_message(self, event): + async def handle_raw(self, update): + if isinstance(update, tgty.UpdateWebPage) and isinstance(update.webpage, tgty.WebPage): + event = self.webpending.pop(update.webpage.id, None) + if event: + await self.handle_telegram_message(event, update.webpage) + + async def handle_telegram_message(self, event, upd_to_webpend=None): self.logger.debug('Handling Telegram Message: %s', event) if self.mid.mesg_base is None: @@ -250,8 +258,10 @@ user = self.get_irc_user_from_telegram(event.sender_id) mid = self.mid.num_to_id(event.message.id - self.mid.mesg_base) - if event.message.media: - text = await self.handle_telegram_media(event.message) + if upd_to_webpend: + text = await self.handle_webpage(upd_to_webpend, event.message) + elif event.message.media: + text = await self.handle_telegram_media(event) else: text = event.message.message @@ -307,25 +317,26 @@ self.irc.iid_to_tid[channel] = chat.id await self.irc.join_irc_channel(self.irc.irc_nick, channel, True) - async def handle_telegram_media(self, message): + async def handle_telegram_media(self, event): + message = event.message caption = ' | {}'.format(message.message) if message.message else '' to_download = True media_url_or_data = '' - if message.web_preview: - media_type = 'web' - logo = await self.download_telegram_media(message) + if isinstance(message.media, tgty.MessageMediaWebPage): to_download = False - media_url_or_data = message.message - if message.media.webpage.title and logo: - caption = ' | {} | {}'.format(message.media.webpage.title, logo) - elif message.media.webpage.title: - caption = ' | {}'.format(message.media.webpage.title) - elif logo: - caption = ' | {}'.format(logo) + if isinstance(message.media.webpage, tgty.WebPage): + # web + return await self.handle_webpage(message.media.webpage, message) + elif isinstance(message.media.webpage, tgty.WebPagePending): + media_type = 'webpending' + media_url_or_data = message.message + caption = '' + self.webpending[message.media.webpage.id] = event else: + media_type = 'webunknown' + media_url_or_data = message.message caption = '' - elif message.photo: size = message.media.photo.sizes[-1] if hasattr(size, 'w') and hasattr(size, 'h'): @@ -381,10 +392,48 @@ caption = '' to_download = False media_url_or_data = '' + else: + media_type = 'unknown' + caption = '' + to_download = False + media_url_or_data = message.message if to_download: media_url_or_data = await self.download_telegram_media(message) + return self.format_media(media_type, media_url_or_data, caption) + + async def handle_webpage(self, webpage, message): + media_type = 'web' + logo = await self.download_telegram_media(message) + if webpage.url != webpage.display_url \ + and remove_slash(webpage.url) != webpage.display_url \ + and remove_http_s(webpage.url) != webpage.display_url: + media_url_or_data = '{} | {}'.format(webpage.url, webpage.display_url) + else: + media_url_or_data = webpage.url + if message: + # sometimes the 1st line of message contains the title, don't repeat it + message_line = message.message.splitlines()[0] + if message_line != webpage.title: + title = webpage.title + else: + title = '' + else: + title = webpage.title + + if title and logo: + caption = ' | {} | {}'.format(title, logo) + elif title: + caption = ' | {}'.format(title) + elif logo: + caption = ' | {}'.format(logo) + else: + caption = '' + + return self.format_media(media_type, media_url_or_data, caption) + + def format_media(self, media_type, media_url_or_data, caption): return '[{}] {}{}'.format(media_type, media_url_or_data, caption) async def download_telegram_media(self, message): diff -rN -u old-irgramd/utils.py new-irgramd/utils.py --- old-irgramd/utils.py 2024-11-22 21:22:39.052123140 +0100 +++ new-irgramd/utils.py 2024-11-22 21:22:39.052123140 +0100 @@ -45,3 +45,15 @@ def sanitize_filename(fn): return FILENAME_INVALID_CHARS.sub('', fn).strip('-').replace(' ','_') + +def remove_slash(url): + return url[:-1] if url[-1:] == '/' else url + +def remove_http_s(url): + if url[:8] == 'https://': + surl = url[8:] + elif url[:7] == 'http://': + surl = url[7:] + else: + surl = url + return remove_slash(surl)