patch 0015ec4a3cf416b0e597b5f97681b4f443977e37
Author: E. Bosch <presidev@AT@gmail.com>
Date: Tue Feb 8 02:04:55 CET 2022
* telegram: Improve webpage (media) handler
Add suport for webpage update after webpending
diff -rN -u old-irgramd/telegram.py new-irgramd/telegram.py
--- old-irgramd/telegram.py 2024-11-22 16:20:38.324870603 +0100
+++ new-irgramd/telegram.py 2024-11-22 16:20:38.328870597 +0100
@@ -10,7 +10,7 @@
from include import CHAN_MAX_LENGHT, NICK_MAX_LENGTH
from irc import IRCUser
-from utils import sanitize_filename
+from utils import sanitize_filename, remove_slash, remove_http_s
# Constants
@@ -39,6 +39,7 @@
self.tg_username = None
self.channels_date = {}
self.mid = mesg_id('0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!#$%+./_~')
+ self.webpending = {}
async def initialize_telegram(self):
# Setup media folder
@@ -63,6 +64,7 @@
# Register Telegram callbacks
callbacks = (
(self.handle_telegram_message , telethon.events.NewMessage),
+ (self.handle_raw, telethon.events.Raw),
(self.handle_telegram_chat_action, telethon.events.ChatAction),
)
for handler, event in callbacks:
@@ -241,7 +243,13 @@
def get_entity_type(self, entity):
return type(entity).__name__
- async def handle_telegram_message(self, event):
+ async def handle_raw(self, update):
+ if isinstance(update, tgty.UpdateWebPage) and isinstance(update.webpage, tgty.WebPage):
+ event = self.webpending.pop(update.webpage.id, None)
+ if event:
+ await self.handle_telegram_message(event, update.webpage)
+
+ async def handle_telegram_message(self, event, upd_to_webpend=None):
self.logger.debug('Handling Telegram Message: %s', event)
if self.mid.mesg_base is None:
@@ -250,8 +258,10 @@
user = self.get_irc_user_from_telegram(event.sender_id)
mid = self.mid.num_to_id(event.message.id - self.mid.mesg_base)
- if event.message.media:
- text = await self.handle_telegram_media(event.message)
+ if upd_to_webpend:
+ text = await self.handle_webpage(upd_to_webpend, event.message)
+ elif event.message.media:
+ text = await self.handle_telegram_media(event)
else:
text = event.message.message
@@ -307,25 +317,26 @@
self.irc.iid_to_tid[channel] = chat.id
await self.irc.join_irc_channel(self.irc.irc_nick, channel, True)
- async def handle_telegram_media(self, message):
+ async def handle_telegram_media(self, event):
+ message = event.message
caption = ' | {}'.format(message.message) if message.message else ''
to_download = True
media_url_or_data = ''
- if message.web_preview:
- media_type = 'web'
- logo = await self.download_telegram_media(message)
+ if isinstance(message.media, tgty.MessageMediaWebPage):
to_download = False
- media_url_or_data = message.message
- if message.media.webpage.title and logo:
- caption = ' | {} | {}'.format(message.media.webpage.title, logo)
- elif message.media.webpage.title:
- caption = ' | {}'.format(message.media.webpage.title)
- elif logo:
- caption = ' | {}'.format(logo)
+ if isinstance(message.media.webpage, tgty.WebPage):
+ # web
+ return await self.handle_webpage(message.media.webpage, message)
+ elif isinstance(message.media.webpage, tgty.WebPagePending):
+ media_type = 'webpending'
+ media_url_or_data = message.message
+ caption = ''
+ self.webpending[message.media.webpage.id] = event
else:
+ media_type = 'webunknown'
+ media_url_or_data = message.message
caption = ''
-
elif message.photo:
size = message.media.photo.sizes[-1]
if hasattr(size, 'w') and hasattr(size, 'h'):
@@ -381,10 +392,48 @@
caption = ''
to_download = False
media_url_or_data = ''
+ else:
+ media_type = 'unknown'
+ caption = ''
+ to_download = False
+ media_url_or_data = message.message
if to_download:
media_url_or_data = await self.download_telegram_media(message)
+ return self.format_media(media_type, media_url_or_data, caption)
+
+ async def handle_webpage(self, webpage, message):
+ media_type = 'web'
+ logo = await self.download_telegram_media(message)
+ if webpage.url != webpage.display_url \
+ and remove_slash(webpage.url) != webpage.display_url \
+ and remove_http_s(webpage.url) != webpage.display_url:
+ media_url_or_data = '{} | {}'.format(webpage.url, webpage.display_url)
+ else:
+ media_url_or_data = webpage.url
+ if message:
+ # sometimes the 1st line of message contains the title, don't repeat it
+ message_line = message.message.splitlines()[0]
+ if message_line != webpage.title:
+ title = webpage.title
+ else:
+ title = ''
+ else:
+ title = webpage.title
+
+ if title and logo:
+ caption = ' | {} | {}'.format(title, logo)
+ elif title:
+ caption = ' | {}'.format(title)
+ elif logo:
+ caption = ' | {}'.format(logo)
+ else:
+ caption = ''
+
+ return self.format_media(media_type, media_url_or_data, caption)
+
+ def format_media(self, media_type, media_url_or_data, caption):
return '[{}] {}{}'.format(media_type, media_url_or_data, caption)
async def download_telegram_media(self, message):
diff -rN -u old-irgramd/utils.py new-irgramd/utils.py
--- old-irgramd/utils.py 2024-11-22 16:20:38.324870603 +0100
+++ new-irgramd/utils.py 2024-11-22 16:20:38.328870597 +0100
@@ -45,3 +45,15 @@
def sanitize_filename(fn):
return FILENAME_INVALID_CHARS.sub('', fn).strip('-').replace(' ','_')
+
+def remove_slash(url):
+ return url[:-1] if url[-1:] == '/' else url
+
+def remove_http_s(url):
+ if url[:8] == 'https://':
+ surl = url[8:]
+ elif url[:7] == 'http://':
+ surl = url[7:]
+ else:
+ surl = url
+ return remove_slash(surl)