telegram: Refactor download media code.
patch 8167f7dfd2712a9a103504fe7763e2a497adf723
Author: E. Bosch <>
Date: Sun Nov 19 00:43:04 CET 2023
* telegram: Refactor download media code.
If named media files already downloaded, don't try to download again
diff -rN -u old-irgramd/ new-irgramd/
--- old-irgramd/ 2024-05-18 08:36:18.718236268 +0200
+++ new-irgramd/ 2024-05-18 08:36:18.718236268 +0200
@@ -688,6 +688,20 @@
to_download = True
media_url_or_data = ''
size = 0
+ filename = None
+ def scan_doc_attributes(document):
+ attrib_file = attrib_video = filename = None
+ size = document.size
+ h_size = get_human_size(size)
+ for x in document.attributes:
+ if isinstance(x, tgty.DocumentAttributeVideo):
+ attrib_video = x
+ if isinstance(x, tgty.DocumentAttributeFilename):
+ attrib_file = x
+ filename = attrib_file.file_name if attrib_file else None
+ return size, h_size, attrib_video, filename
if isinstance(, tgty.MessageMediaWebPage):
to_download = False
@@ -716,17 +730,14 @@
elif media_type = 'audio'
elif message.voice: media_type = 'rec'
- size =
- h_size = get_human_size(size)
- attrib = next(x for x in if isinstance(x, tgty.DocumentAttributeVideo))
- dur = get_human_duration(attrib.duration)
+ size, h_size, attrib_video, filename = scan_doc_attributes(
+ dur = get_human_duration(attrib_video.duration) if attrib_video else ''
media_type = 'video:{},{}'.format(h_size, dur)
elif message.video_note: media_type = 'videorec'
elif message.gif: media_type = 'anim'
elif message.sticker: media_type = 'sticker'
elif message.document:
- size =
- h_size = get_human_size(size)
+ size, h_size, _, filename = scan_doc_attributes(
media_type = 'file:{}'.format(h_size)
media_type = 'contact'
@@ -781,10 +792,8 @@
media_url_or_data = message.message
if to_download:
- if and size > self.notice_size:
- await self.relay_telegram_message(message, user, '[{}] [{}] [Downloading]'.format(mid, media_type))
- media_url_or_data = await self.download_telegram_media(message)
+ relay_attr = (message, user, mid, media_type)
+ media_url_or_data = await self.download_telegram_media(message, filename, size, relay_attr)
return self.format_media(media_type, media_url_or_data, caption)
@@ -835,26 +844,38 @@
def format_media(self, media_type, media_url_or_data, caption):
return '[{}] {}{}'.format(media_type, media_url_or_data, caption)
- async def download_telegram_media(self, message):
- local_path = None
- if
- local_path = await message.download_media(self.telegram_media_dir)
- if not local_path: return ''
- if message.document:
- new_file = sanitize_filename(os.path.basename(local_path))
+ async def download_telegram_media(self, message, filename=None, size=0, relay_attr=None):
+ if not
+ return ''
+ if filename:
+ new_file = sanitize_filename(filename)
+ new_path = os.path.join(self.telegram_media_dir, new_file)
+ if os.path.exists(new_path):
+ local_path = new_path
+ else:
+ await self.notice_downloading(size, relay_attr)
+ local_path = await message.download_media(new_path)
+ if not local_path: return ''
+ await self.notice_downloading(size, relay_attr)
+ local_path = await message.download_media(self.telegram_media_dir)
+ if not local_path: return ''
filetype = os.path.splitext(local_path)[1]
new_file = str(self.media_cn) + filetype
self.media_cn += 1
+ new_path = os.path.join(self.telegram_media_dir, new_file)
- new_path = os.path.join(self.telegram_media_dir, new_file)
if local_path != new_path:
os.replace(local_path, new_path)
if self.media_url[-1:] != '/':
self.media_url += '/'
return self.media_url + new_file
+ async def notice_downloading(self, size, relay_attr):
+ if relay_attr and size > self.notice_size:
+ message, user, mid, media_type = relay_attr
+ await self.relay_telegram_message(message, user, '[{}] [{}] [Downloading]'.format(mid, media_type))
class mesg_id:
def __init__(self, alpha):
self.alpha = alpha