telegram: Support media in messages for using a webserver (not included in
patch 3576c9bc94562bf539d31f44a3ac08d9ed4a7d05
Author: E. Bosch <presidev@AT@gmail.com>
Date: Sun Jan 30 01:29:08 CET 2022
* telegram: Support media in messages for using a webserver (not included in
irgramd) to get the downloaded media from Telegram, also support
non-downloadable media (geo, contact, etc.)
diff -rN -u old-irgramd/irgramd new-irgramd/irgramd
--- old-irgramd/irgramd 2024-10-23 06:37:49.906812274 +0200
+++ new-irgramd/irgramd 2024-10-23 06:37:49.910812268 +0200
@@ -65,6 +65,7 @@
tornado.options.define('tls', default=False, help='Use TLS/SSL encrypted connection for IRC server')
tornado.options.define('tls_cert', default=None, metavar='CERTFILE', help='IRC server certificate chain for TLS/SSL, also can contain private key if not defined with `tls_key`')
tornado.options.define('tls_key', default=None, metavar='KEYFILE', help='IRC server private key for TLS/SSL')
+ tornado.options.define('media_url', default=None, metavar='BASE_URL', help='Base URL for media files, should be configured in the external (to irgramd) webserver')
tornado.options.define('pam', default=False, help='Use PAM for IRC authentication, if not set you should set `irc_password`')
tornado.options.define('pam_group', default=None, metavar='GROUP', help='Unix group allowed if `pam` enabled, if empty any user is allowed')
tornado.options.define('irc_nicks', type=str, multiple=True, metavar='nick,..', help='List of nicks allowed for IRC, if `pam` and optionally `pam_group` are set, PAM authentication will be used instead')
diff -rN -u old-irgramd/telegram.py new-irgramd/telegram.py
--- old-irgramd/telegram.py 2024-10-23 06:37:49.906812274 +0200
+++ new-irgramd/telegram.py 2024-10-23 06:37:49.910812268 +0200
@@ -10,6 +10,7 @@
from include import CHAN_MAX_LENGHT, NICK_MAX_LENGTH
from irc import IRCUser
+from utils import sanitize_filename
# Constants
@@ -30,6 +31,8 @@
def __init__(self, irc, settings):
self.logger = logging.getLogger()
self.config_dir = settings['config_dir']
+ self.media_url = settings['media_url']
+ self.media_cn = 0
self.irc = irc
self.authorized = False
self.id = None
@@ -240,7 +243,13 @@
user = self.get_irc_user_from_telegram(event.sender_id)
mid = self.mid.num_to_id(event.message.id - self.mid.mesg_base)
- message = '[{}] {}'.format(mid, event.message.message)
+
+ if event.message.media:
+ text = await self.handle_telegram_media(event.message)
+ else:
+ text = event.message.message
+
+ message = '[{}] {}'.format(mid, text)
if event.message.is_private:
await self.handle_telegram_private_message(user, message)
@@ -257,15 +266,6 @@
entity = await event.message.get_chat()
channel = await self.get_irc_channel_from_telegram_id(event.message.chat_id, entity)
-
- # Format messages with media
-# if event.message.media and (event.message.photo or event.message.gif):
-# message = await self.download_telegram_media(event.message, 'Image')
-# if message:
-# messages.insert(0, message)
-# elif event.message.media and (event.message.sticker):
-# messages.insert(0, 'Sticker: {}'.format(event.message.sticker.id))
-
await self.irc.send_msg(user, channel, message)
async def handle_telegram_chat_action(self, event):
@@ -301,21 +301,98 @@
self.irc.iid_to_tid[channel] = chat.id
await self.irc.join_irc_channel(self.irc.irc_nick, channel, True)
- async def download_telegram_media(self, message, tag):
+ async def handle_telegram_media(self, message):
+ caption = ' | {}'.format(message.message) if message.message else ''
+ to_download = True
+ media_url_or_data = ''
+
+ if message.web_preview:
+ media_type = 'web'
+ logo = await self.download_telegram_media(message)
+ to_download = False
+ media_url_or_data = message.message
+ if message.media.webpage.title and logo:
+ caption = ' | {} | {}'.format(message.media.webpage.title, logo)
+ elif message.media.webpage.title:
+ caption = ' | {}'.format(message.media.webpage.title)
+ elif logo:
+ caption = ' | {}'.format(logo)
+ else:
+ caption = ''
+
+ elif message.photo: media_type = 'photo'
+ elif message.audio: media_type = 'audio'
+ elif message.voice: media_type = 'rec'
+ elif message.video: media_type = 'video'
+ elif message.video_note: media_type = 'videorec'
+ elif message.gif: media_type = 'anim'
+ elif message.sticker: media_type = 'sticker'
+ elif message.document: media_type = 'file'
+
+ elif message.contact:
+ media_type = 'contact'
+ caption = ''
+ to_download = False
+ if message.media.contact.first_name:
+ media_url_or_data += message.media.contact.first_name + ' '
+ if message.media.contact.last_name:
+ media_url_or_data += message.media.contact.last_name + ' '
+ if message.media.contact.phone_number:
+ media_url_or_data += message.media.contact.phone_number
+
+ elif message.game:
+ media_type = 'game'
+ caption = ''
+ to_download = False
+ if message.media.game.title:
+ media_url_or_data = message.media.game.title
+
+ elif message.geo:
+ media_type = 'geo'
+ caption = ''
+ to_download = False
+ media_url_or_data = 'lat: {}, long: {}'.format(message.media.geo.lat, message.media.geo.long)
+
+ elif message.invoice:
+ media_type = 'invoice'
+ caption = ''
+ to_download = False
+ media_url_or_data = ''
+
+ elif message.poll:
+ media_type = 'poll'
+ caption = ''
+ to_download = False
+ media_url_or_data = ''
+
+ elif message.venue:
+ media_type = 'venue'
+ caption = ''
+ to_download = False
+ media_url_or_data = ''
+
+ if to_download:
+ media_url_or_data = await self.download_telegram_media(message)
+
+ return '[{}] {}{}'.format(media_type, media_url_or_data, caption)
+
+ async def download_telegram_media(self, message):
local_path = await message.download_media(self.telegram_media_dir)
- return
- if not local_path:
- return
-
- request = tornado.httpclient.HTTPRequest(
- url = 'https://yld.me/paste',
- method = 'POST',
- body = open(local_path, 'rb').read(),
- )
- response = await tornado.httpclient.AsyncHTTPClient().fetch(request)
+ if not local_path: return ''
- os.unlink(local_path)
- return tag + ': ' + response.body.decode().strip()
+ if message.document:
+ new_file = sanitize_filename(os.path.basename(local_path))
+ else:
+ filetype = os.path.splitext(local_path)[1]
+ new_file = str(self.media_cn) + filetype
+ self.media_cn += 1
+
+ new_path = os.path.join(self.telegram_media_dir, new_file)
+ if local_path != new_path:
+ os.replace(local_path, new_path)
+ if self.media_url[-1:] != '/':
+ self.media_url += '/'
+ return self.media_url + new_file
class mesg_id:
def __init__(self, alpha):
diff -rN -u old-irgramd/utils.py new-irgramd/utils.py
--- old-irgramd/utils.py 2024-10-23 06:37:49.906812274 +0200
+++ new-irgramd/utils.py 2024-10-23 06:37:49.910812268 +0200
@@ -1,6 +1,7 @@
import itertools
import textwrap
+import re
# Utilities
@@ -41,3 +42,6 @@
messages_limited += wrapped
del wr
return messages_limited
+
+def sanitize_filename(fn):
+ return re.sub('[/{}<>()"\'\\|&]', '', fn).strip('-')