1 # irgramd: IRC-Telegram gateway 2 # telegram.py: Interface to Telethon Telegram library 3 # 4 # Copyright (c) 2019 Peter Bui <pbui@bx612.space> 5 # Copyright (c) 2020-2024 E. Bosch <presidev@AT@gmail.com> 6 # 7 # Use of this source code is governed by a MIT style license that 8 # can be found in the LICENSE file included in this project. 9 10 import logging 11 import os 12 import datetime 13 import re 14 import aioconsole 15 import asyncio 16 import collections 17 import telethon 18 from telethon import types as tgty, utils as tgutils 19 from telethon.tl.functions.messages import GetMessagesReactionsRequest 20 21 # Local modules 22 23 from include import CHAN_MAX_LENGHT, NICK_MAX_LENGTH 24 from irc import IRCUser 25 from utils import sanitize_filename, add_filename, is_url_equiv, extract_url, get_human_size, get_human_duration, get_highlighted, fix_braces, format_timestamp 26 import emoji2emoticon as e 27 28 # Test IP table 29 30 TEST_IPS = { 1: '149.154.175.10', 31 2: '149.154.167.40', 32 3: '149.154.175.117' 33 } 34 35 # Telegram 36 37 class TelegramHandler(object): 38 def __init__(self, irc, settings): 39 self.logger = logging.getLogger() 40 self.config_dir = settings['config_dir'] 41 self.cache_dir = settings['cache_dir'] 42 self.download = settings['download_media'] 43 self.notice_size = settings['download_notice'] * 1048576 44 self.media_dir = settings['media_dir'] 45 self.media_url = settings['media_url'] 46 self.upload_dir = settings['upload_dir'] 47 self.api_id = settings['api_id'] 48 self.api_hash = settings['api_hash'] 49 self.phone = settings['phone'] 50 self.test = settings['test'] 51 self.test_dc = settings['test_datacenter'] 52 self.test_ip = settings['test_host'] if settings['test_host'] else TEST_IPS[self.test_dc] 53 self.test_port = settings['test_port'] 54 self.ask_code = settings['ask_code'] 55 self.quote_len = settings['quote_length'] 56 self.hist_fmt = settings['hist_timestamp_format'] 57 self.timezone = settings['timezone'] 58 self.geo_url = settings['geo_url'] 59 if not settings['emoji_ascii']: 60 e.emo = {} 61 self.media_cn = 0 62 self.irc = irc 63 self.authorized = False 64 self.id = None 65 self.tg_username = None 66 self.channels_date = {} 67 self.mid = mesg_id('0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!#$%+./_~') 68 self.webpending = {} 69 self.refwd_me = False 70 self.cache = collections.OrderedDict() 71 self.sorted_len_usernames = [] 72 # Set event to be waited by irc.check_telegram_auth() 73 self.auth_checked = asyncio.Event() 74 75 async def initialize_telegram(self): 76 # Setup media folder 77 self.telegram_media_dir = os.path.expanduser(self.media_dir or os.path.join(self.cache_dir, 'media')) 78 if not os.path.exists(self.telegram_media_dir): 79 os.makedirs(self.telegram_media_dir) 80 81 # Setup upload folder 82 self.telegram_upload_dir = os.path.expanduser(self.upload_dir or os.path.join(self.cache_dir, 'upload')) 83 if not os.path.exists(self.telegram_upload_dir): 84 os.makedirs(self.telegram_upload_dir) 85 86 # Setup session folder 87 self.telegram_session_dir = os.path.join(self.config_dir, 'session') 88 if not os.path.exists(self.telegram_session_dir): 89 os.makedirs(self.telegram_session_dir) 90 91 # Construct Telegram client 92 if self.test: 93 self.telegram_client = telethon.TelegramClient(None, self.api_id, self.api_hash) 94 self.telegram_client.session.set_dc(self.test_dc, self.test_ip, self.test_port) 95 else: 96 telegram_session = os.path.join(self.telegram_session_dir, 'telegram') 97 self.telegram_client = telethon.TelegramClient(telegram_session, self.api_id, self.api_hash) 98 99 # Initialize Telegram ID to IRC nick mapping 100 self.tid_to_iid = {} 101 102 # Register Telegram callbacks 103 callbacks = ( 104 (self.handle_telegram_message , telethon.events.NewMessage), 105 (self.handle_raw, telethon.events.Raw), 106 (self.handle_telegram_chat_action, telethon.events.ChatAction), 107 (self.handle_telegram_deleted , telethon.events.MessageDeleted), 108 (self.handle_telegram_edited, telethon.events.MessageEdited), 109 ) 110 for handler, event in callbacks: 111 self.telegram_client.add_event_handler(handler, event) 112 113 # Start Telegram client 114 if self.test: 115 await self.telegram_client.start(self.phone, code_callback=lambda: str(self.test_dc) * 5) 116 else: 117 await self.telegram_client.connect() 118 119 while not await self.telegram_client.is_user_authorized(): 120 self.logger.info('Telegram account not authorized') 121 await self.telegram_client.send_code_request(self.phone) 122 self.auth_checked.set() 123 if not self.ask_code: 124 return 125 self.logger.info('You must provide the Login code that Telegram will ' 126 'sent you via SMS or another connected client') 127 code = await aioconsole.ainput('Login code: ') 128 try: 129 await self.telegram_client.sign_in(code=code) 130 except: 131 pass 132 133 await self.continue_auth() 134 135 async def continue_auth(self): 136 self.logger.info('Telegram account authorized') 137 self.authorized = True 138 self.auth_checked.set() 139 await self.init_mapping() 140 141 async def init_mapping(self): 142 # Update IRC <-> Telegram mapping 143 tg_user = await self.telegram_client.get_me() 144 self.id = tg_user.id 145 self.tg_username = self.get_telegram_nick(tg_user) 146 self.add_sorted_len_usernames(self.tg_username) 147 self.set_ircuser_from_telegram(tg_user) 148 async for dialog in self.telegram_client.iter_dialogs(): 149 chat = dialog.entity 150 if isinstance(chat, tgty.User): 151 self.set_ircuser_from_telegram(chat) 152 else: 153 await self.set_irc_channel_from_telegram(chat) 154 155 def set_ircuser_from_telegram(self, user): 156 if user.id not in self.tid_to_iid: 157 tg_nick = self.get_telegram_nick(user) 158 tg_ni = tg_nick.lower() 159 if not user.is_self: 160 irc_user = IRCUser(None, ('Telegram',''), tg_nick, user.id, self.get_telegram_display_name(user)) 161 self.irc.users[tg_ni] = irc_user 162 self.add_sorted_len_usernames(tg_ni) 163 self.tid_to_iid[user.id] = tg_nick 164 self.irc.iid_to_tid[tg_ni] = user.id 165 else: 166 tg_nick = self.tid_to_iid[user.id] 167 return tg_nick 168 169 async def set_irc_channel_from_telegram(self, chat): 170 channel = self.get_telegram_channel(chat) 171 self.tid_to_iid[chat.id] = channel 172 chan = channel.lower() 173 self.irc.iid_to_tid[chan] = chat.id 174 self.irc.irc_channels[chan] = set() 175 # Add users from the channel 176 async for user in self.telegram_client.iter_participants(chat.id): 177 user_nick = self.set_ircuser_from_telegram(user) 178 if not user.is_self: 179 self.irc.irc_channels[chan].add(user_nick) 180 # Add admin users as ops in irc 181 if isinstance(user.participant, tgty.ChatParticipantAdmin): 182 self.irc.irc_channels_ops[chan].add(user_nick) 183 # Add creator users as founders in irc 184 elif isinstance(user.participant, tgty.ChatParticipantCreator): 185 self.irc.irc_channels_founder[chan].add(user_nick) 186 187 def get_telegram_nick(self, user): 188 nick = (user.username 189 or self.get_telegram_display_name(user) 190 or str(user.id)) 191 nick = nick[:NICK_MAX_LENGTH] 192 while nick in self.irc.iid_to_tid: 193 nick += '_' 194 return nick 195 196 def get_telegram_display_name(self, user): 197 name = telethon.utils.get_display_name(user) 198 name = name.replace(' ', '_') 199 return name 200 201 async def get_telegram_display_name_me(self): 202 tg_user = await self.telegram_client.get_me() 203 return self.get_telegram_display_name(tg_user) 204 205 def get_telegram_channel(self, chat): 206 chan = '#' + chat.title.replace(' ', '-').replace(',', '-') 207 while chan.lower() in self.irc.iid_to_tid: 208 chan += '_' 209 return chan 210 211 def get_irc_user_from_telegram(self, tid): 212 nick = self.tid_to_iid[tid] 213 if nick == self.tg_username: return None 214 return self.irc.users[nick.lower()] 215 216 def get_irc_name_from_telegram_id(self, tid): 217 if tid in self.tid_to_iid.keys(): 218 name_in_irc = self.tid_to_iid[tid] 219 else: 220 name_in_irc = '<Unknown>' 221 return name_in_irc 222 223 async def get_irc_name_from_telegram_forward(self, fwd, saved): 224 from_id = fwd.saved_from_peer if saved else fwd.from_id 225 if from_id is None: 226 # telegram user has privacy options to show only the name 227 # or was a broadcast from a channel (no user) 228 name = fwd.from_name 229 else: 230 peer_id, type = self.get_peer_id_and_type(from_id) 231 if type == 'user': 232 try: 233 user = self.get_irc_user_from_telegram(peer_id) 234 except: 235 name = str(peer_id) 236 else: 237 if user is None: 238 name = '{}' 239 self.refwd_me = True 240 else: 241 name = user.irc_nick 242 else: 243 try: 244 name = await self.get_irc_channel_from_telegram_id(peer_id) 245 except: 246 name = '' 247 return name 248 249 async def get_irc_nick_from_telegram_id(self, tid, entity=None): 250 if tid not in self.tid_to_iid: 251 user = entity or await self.telegram_client.get_entity(tid) 252 nick = self.get_telegram_nick(user) 253 self.tid_to_iid[tid] = nick 254 self.irc.iid_to_tid[nick] = tid 255 256 return self.tid_to_iid[tid] 257 258 async def get_irc_channel_from_telegram_id(self, tid, entity=None): 259 rtid, type = tgutils.resolve_id(tid) 260 if rtid not in self.tid_to_iid: 261 chat = entity or await self.telegram_client.get_entity(tid) 262 channel = self.get_telegram_channel(chat) 263 self.tid_to_iid[rtid] = channel 264 self.irc.iid_to_tid[channel] = rtid 265 266 return self.tid_to_iid[rtid] 267 268 async def get_telegram_channel_participants(self, tid): 269 channel = self.tid_to_iid[tid] 270 nicks = [] 271 async for user in self.telegram_client.iter_participants(tid): 272 user_nick = await self.get_irc_nick_from_telegram_id(user.id, user) 273 274 nicks.append(user_nick) 275 self.irc.irc_channels[channel].add(user_nick) 276 277 return nicks 278 279 async def get_telegram_idle(self, irc_nick, tid=None): 280 if self.irc.users[irc_nick].is_service: 281 return None 282 tid = self.get_tid(irc_nick, tid) 283 user = await self.telegram_client.get_entity(tid) 284 if isinstance(user.status,tgty.UserStatusRecently) or \ 285 isinstance(user.status,tgty.UserStatusOnline): 286 idle = 0 287 elif isinstance(user.status,tgty.UserStatusOffline): 288 last = user.status.was_online 289 current = datetime.datetime.now(datetime.timezone.utc) 290 idle = int((current - last).total_seconds()) 291 elif isinstance(user.status,tgty.UserStatusLastWeek): 292 idle = 604800 293 elif isinstance(user.status,tgty.UserStatusLastMonth): 294 idle = 2678400 295 else: 296 idle = None 297 return idle 298 299 async def get_channel_topic(self, channel, entity_cache): 300 tid = self.get_tid(channel) 301 # entity_cache should be a list to be a persistent and by reference value 302 if entity_cache[0]: 303 entity = entity_cache[0] 304 else: 305 entity = await self.telegram_client.get_entity(tid) 306 entity_cache[0] = entity 307 entity_type = self.get_entity_type(entity, format='long') 308 return 'Telegram ' + entity_type + ' ' + str(tid) + ': ' + entity.title 309 310 async def get_channel_creation(self, channel, entity_cache): 311 tid = self.get_tid(channel) 312 if tid in self.channels_date.keys(): 313 timestamp = self.channels_date[tid] 314 else: 315 # entity_cache should be a list to be a persistent and by reference value 316 if entity_cache[0]: 317 entity = entity_cache[0] 318 else: 319 entity = await self.telegram_client.get_entity(tid) 320 entity_cache[0] = entity 321 timestamp = entity.date.timestamp() 322 self.channels_date[tid] = timestamp 323 return int(timestamp) 324 325 def get_tid(self, irc_item, tid=None): 326 it = irc_item.lower() 327 if tid: 328 pass 329 elif it in self.irc.iid_to_tid: 330 tid = self.irc.iid_to_tid[it] 331 else: 332 tid = self.id 333 return tid 334 335 def get_entity_type(self, entity, format): 336 if isinstance(entity, tgty.User): 337 short = long = 'User' 338 elif isinstance(entity, tgty.Chat): 339 short = 'Chat' 340 long = 'Chat/Basic Group' 341 elif isinstance(entity, tgty.Channel): 342 if entity.broadcast: 343 short = 'Broad' 344 long = 'Broadcast Channel' 345 elif entity.megagroup: 346 short = 'Mega' 347 long = 'Super/Megagroup Channel' 348 elif entity.gigagroup: 349 short = 'Giga' 350 long = 'Broadcast Gigagroup Channel' 351 352 return short if format == 'short' else long 353 354 def get_peer_id_and_type(self, peer): 355 if isinstance(peer, tgty.PeerChannel): 356 id = peer.channel_id 357 type = 'chan' 358 elif isinstance(peer, tgty.PeerChat): 359 id = peer.chat_id 360 type = 'chan' 361 elif isinstance(peer, tgty.PeerUser): 362 id = peer.user_id 363 type = 'user' 364 else: 365 id = peer 366 type = '' 367 return id, type 368 369 async def is_bot(self, irc_nick, tid=None): 370 user = self.irc.users[irc_nick] 371 if user.stream or user.is_service: 372 bot = False 373 else: 374 bot = user.bot 375 if bot == None: 376 tid = self.get_tid(irc_nick, tid) 377 tg_user = await self.telegram_client.get_entity(tid) 378 bot = tg_user.bot 379 user.bot = bot 380 return bot 381 382 async def edition_case(self, msg): 383 def msg_edited(m): 384 return m.id in self.cache and \ 385 ( m.message != self.cache[m.id]['text'] 386 or m.media != self.cache[m.id]['media'] 387 ) 388 async def get_reactions(m): 389 react = await self.telegram_client(GetMessagesReactionsRequest(m.peer_id, id=[m.id])) 390 return react.updates[0].reactions.recent_reactions 391 392 react = None 393 if msg.reactions is None: 394 case = 'edition' 395 elif (reactions := await get_reactions(msg)) is None: 396 if msg_edited(msg): 397 case = 'edition' 398 else: 399 case = 'react-del' 400 elif react := next((x for x in reactions if x.date == msg.edit_date), None): 401 case = 'react-add' 402 else: 403 if msg_edited(msg): 404 case = 'edition' 405 else: 406 case = 'react-del' 407 react = None 408 return case, react 409 410 def to_cache(self, id, mid, message, proc_message, user, chan, media): 411 if len(self.cache) >= 10000: 412 self.cache.popitem(last=False) 413 self.cache[id] = { 414 'mid': mid, 415 'text': message, 416 'rendered_text': proc_message, 417 'user': user, 418 'channel': chan, 419 'media': media 420 } 421 422 def replace_mentions(self, text, me_nick='', received=True): 423 # For received replace @mention to ~mention~ 424 # For sent replace mention: to @mention 425 rargs = {} 426 def repl_mentioned(text, me_nick, received, mark, repl_pref, repl_suff): 427 new_text = text 428 429 for user in self.sorted_len_usernames: 430 if user == self.tg_username: 431 if me_nick: 432 username = me_nick 433 else: 434 continue 435 else: 436 username = self.irc.users[user].irc_nick 437 438 if received: 439 mention = mark + user 440 mention_case = mark + username 441 else: # sent 442 mention = user + mark 443 mention_case = username + mark 444 replcmnt = repl_pref + username + repl_suff 445 446 # Start of the text 447 for ment in (mention, mention_case): 448 if new_text.startswith(ment): 449 new_text = new_text.replace(ment, replcmnt, 1) 450 451 # Next words (with space as separator) 452 mention = ' ' + mention 453 mention_case = ' ' + mention_case 454 replcmnt = ' ' + replcmnt 455 new_text = new_text.replace(mention, replcmnt).replace(mention_case, replcmnt) 456 457 return new_text 458 459 if received: 460 mark = '@' 461 rargs['repl_pref'] = '~' 462 rargs['repl_suff'] = '~' 463 else: # sent 464 mark = ':' 465 rargs['repl_pref'] = '@' 466 rargs['repl_suff'] = '' 467 468 if text.find(mark) != -1: 469 text_replaced = repl_mentioned(text, me_nick, received, mark, **rargs) 470 else: 471 text_replaced = text 472 return text_replaced 473 474 def filters(self, text): 475 filtered = e.replace_mult(text, e.emo) 476 filtered = self.replace_mentions(filtered) 477 return filtered 478 479 def add_sorted_len_usernames(self, username): 480 self.sorted_len_usernames.append(username) 481 self.sorted_len_usernames.sort(key=lambda k: len(k), reverse=True) 482 483 async def handle_telegram_edited(self, event): 484 self.logger.debug('Handling Telegram Message Edited: %s', event) 485 486 id = event.message.id 487 mid = self.mid.num_to_id_offset(event.message.peer_id, id) 488 fmid = '[{}]'.format(mid) 489 message = self.filters(event.message.message) 490 message_rendered = await self.render_text(event.message, mid, upd_to_webpend=None) 491 492 edition_case, reaction = await self.edition_case(event.message) 493 if edition_case == 'edition': 494 action = 'Edited' 495 user = self.get_irc_user_from_telegram(event.sender_id) 496 if id in self.cache: 497 t = self.filters(self.cache[id]['text']) 498 rt = self.cache[id]['rendered_text'] 499 500 ht, is_ht = get_highlighted(t, message) 501 else: 502 rt = fmid 503 is_ht = False 504 505 if is_ht: 506 edition_react = ht 507 text_old = fmid 508 else: 509 edition_react = message 510 text_old = rt 511 if user is None: 512 self.refwd_me = True 513 514 # Reactions 515 else: 516 action = 'React' 517 if len(message_rendered) > self.quote_len: 518 text_old = '{}...'.format(message_rendered[:self.quote_len]) 519 text_old = fix_braces(text_old) 520 else: 521 text_old = message_rendered 522 523 if edition_case == 'react-add': 524 user = self.get_irc_user_from_telegram(reaction.peer_id.user_id) 525 emoji = reaction.reaction.emoticon 526 react_action = '+' 527 react_icon = e.emo[emoji] if emoji in e.emo else emoji 528 elif edition_case == 'react-del': 529 user = self.get_irc_user_from_telegram(event.sender_id) 530 react_action = '-' 531 react_icon = '' 532 edition_react = '{}{}'.format(react_action, react_icon) 533 534 text = '|{} {}| {}'.format(action, text_old, edition_react) 535 536 chan = await self.relay_telegram_message(event, user, text) 537 538 self.to_cache(id, mid, message, message_rendered, user, chan, event.message.media) 539 540 async def handle_telegram_deleted(self, event): 541 self.logger.debug('Handling Telegram Message Deleted: %s', event) 542 543 for deleted_id in event.original_update.messages: 544 if deleted_id in self.cache: 545 recovered_text = self.cache[deleted_id]['rendered_text'] 546 text = '|Deleted| {}'.format(recovered_text) 547 user = self.cache[deleted_id]['user'] 548 chan = self.cache[deleted_id]['channel'] 549 await self.relay_telegram_message(message=None, user=user, text=text, channel=chan) 550 else: 551 text = 'Message id {} deleted not in cache'.format(deleted_id) 552 await self.relay_telegram_private_message(self.irc.service_user, text) 553 554 async def handle_raw(self, update): 555 self.logger.debug('Handling Telegram Raw Event: %s', update) 556 557 if isinstance(update, tgty.UpdateWebPage) and isinstance(update.webpage, tgty.WebPage): 558 message = self.webpending.pop(update.webpage.id, None) 559 if message: 560 await self.handle_telegram_message(event=None, message=message, upd_to_webpend=update.webpage) 561 562 async def handle_telegram_message(self, event, message=None, upd_to_webpend=None, history=False): 563 self.logger.debug('Handling Telegram Message: %s', event or message) 564 565 msg = event.message if event else message 566 567 user = self.get_irc_user_from_telegram(msg.sender_id) 568 mid = self.mid.num_to_id_offset(msg.peer_id, msg.id) 569 text = await self.render_text(msg, mid, upd_to_webpend, user) 570 text_send = self.set_history_timestamp(text, history, msg.date, msg.action) 571 chan = await self.relay_telegram_message(msg, user, text_send) 572 573 self.to_cache(msg.id, mid, msg.message, text, user, chan, msg.media) 574 575 self.refwd_me = False 576 577 async def render_text(self, message, mid, upd_to_webpend, user=None): 578 if upd_to_webpend: 579 text = await self.handle_webpage(upd_to_webpend, message, mid) 580 elif message.media: 581 text = await self.handle_telegram_media(message, user, mid) 582 else: 583 text = message.message 584 585 if message.action: 586 final_text = await self.handle_telegram_action(message, mid) 587 return final_text 588 elif message.is_reply: 589 refwd_text = await self.handle_telegram_reply(message) 590 elif message.forward: 591 refwd_text = await self.handle_telegram_forward(message) 592 else: 593 refwd_text = '' 594 595 target_mine = self.handle_target_mine(message.peer_id, user) 596 597 final_text = '[{}] {}{}{}'.format(mid, target_mine, refwd_text, text) 598 final_text = self.filters(final_text) 599 return final_text 600 601 def set_history_timestamp(self, text, history, date, action): 602 if history and self.hist_fmt: 603 timestamp = format_timestamp(self.hist_fmt, self.timezone, date) 604 if action: 605 res = '{} {}'.format(text, timestamp) 606 else: 607 res = '{} {}'.format(timestamp, text) 608 else: 609 res = text 610 return res 611 612 async def relay_telegram_message(self, message, user, text, channel=None): 613 private = (message and message.is_private) or (not message and not channel) 614 action = (message and message.action) 615 if private: 616 await self.relay_telegram_private_message(user, text, action) 617 chan = None 618 else: 619 chan = await self.relay_telegram_channel_message(message, user, text, channel, action) 620 return chan 621 622 async def relay_telegram_private_message(self, user, message, action=None): 623 self.logger.debug('Handling Telegram Private Message: %s, %s', user, message) 624 625 if action: 626 await self.irc.send_action(user, None, message) 627 else: 628 await self.irc.send_msg(user, None, message) 629 630 async def relay_telegram_channel_message(self, message, user, text, channel, action): 631 self.logger.debug('Handling Telegram Channel Message: %s', message or text) 632 633 if message: 634 entity = await message.get_chat() 635 chan = await self.get_irc_channel_from_telegram_id(message.chat_id, entity) 636 else: 637 chan = channel 638 639 if action: 640 await self.irc.send_action(user, chan, text) 641 else: 642 await self.irc.send_msg(user, chan, text) 643 644 return chan 645 646 async def handle_telegram_chat_action(self, event): 647 self.logger.debug('Handling Telegram Chat Action: %s', event) 648 649 try: 650 tid = event.action_message.to_id.channel_id 651 except AttributeError: 652 tid = event.action_message.to_id.chat_id 653 finally: 654 irc_channel = await self.get_irc_channel_from_telegram_id(tid) 655 await self.get_telegram_channel_participants(tid) 656 657 try: # Join Chats 658 irc_nick = await self.get_irc_nick_from_telegram_id(event.action_message.action.users[0]) 659 except (IndexError, AttributeError): 660 try: # Kick 661 irc_nick = await self.get_irc_nick_from_telegram_id(event.action_message.action.user_id) 662 except (IndexError, AttributeError): # Join Channels 663 irc_nick = await self.get_irc_nick_from_telegram_id(event.action_message.sender_id) 664 665 if event.user_added or event.user_joined: 666 await self.irc.join_irc_channel(irc_nick, irc_channel, full_join=False) 667 elif event.user_kicked or event.user_left: 668 await self.irc.part_irc_channel(irc_nick, irc_channel) 669 670 async def join_all_telegram_channels(self): 671 async for dialog in self.telegram_client.iter_dialogs(): 672 chat = dialog.entity 673 if not isinstance(chat, tgty.User): 674 channel = self.get_telegram_channel(chat) 675 self.tid_to_iid[chat.id] = channel 676 self.irc.iid_to_tid[channel] = chat.id 677 await self.irc.join_irc_channel(self.irc.irc_nick, channel, full_join=True) 678 679 async def handle_telegram_action(self, message, mid): 680 if isinstance(message.action, tgty.MessageActionPinMessage): 681 replied = await message.get_reply_message() 682 cid = self.mid.num_to_id_offset(replied.peer_id, replied.id) 683 action_text = 'has pinned message [{}]'.format(cid) 684 elif isinstance(message.action, tgty.MessageActionChatEditPhoto): 685 _, media_type = self.scan_photo_attributes(message.action.photo) 686 photo_url = await self.download_telegram_media(message, mid) 687 action_text = 'has changed chat [{}] {}'.format(media_type, photo_url) 688 else: 689 action_text = '' 690 return action_text 691 692 async def handle_telegram_reply(self, message): 693 space = ' ' 694 trunc = '' 695 replied = await message.get_reply_message() 696 replied_msg = replied.message 697 cid = self.mid.num_to_id_offset(replied.peer_id, replied.id) 698 if not replied_msg: 699 replied_msg = '' 700 space = '' 701 elif len(replied_msg) > self.quote_len: 702 replied_msg = replied_msg[:self.quote_len] 703 trunc = '...' 704 replied_user = self.get_irc_user_from_telegram(replied.sender_id) 705 if replied_user is None: 706 replied_nick = '{}' 707 self.refwd_me = True 708 else: 709 replied_nick = replied_user.irc_nick 710 711 return '|Re {}: [{}]{}{}{}| '.format(replied_nick, cid, space, replied_msg, trunc) 712 713 async def handle_telegram_forward(self, message): 714 space = space2 = ' ' 715 if not (forwarded_peer_name := await self.get_irc_name_from_telegram_forward(message.fwd_from, saved=False)): 716 space = '' 717 saved_peer_name = await self.get_irc_name_from_telegram_forward(message.fwd_from, saved=True) 718 if saved_peer_name and saved_peer_name != forwarded_peer_name: 719 secondary_name = saved_peer_name 720 else: 721 # if it's from me I want to know who was the destination of a message (user) 722 if self.refwd_me and (saved_from_peer := message.fwd_from.saved_from_peer) is not None: 723 secondary_name = self.get_irc_user_from_telegram(saved_from_peer.user_id).irc_nick 724 else: 725 secondary_name = '' 726 space2 = '' 727 728 return '|Fwd{}{}{}{}| '.format(space, forwarded_peer_name, space2, secondary_name) 729 730 async def handle_telegram_media(self, message, user, mid): 731 caption = ' | {}'.format(message.message) if message.message else '' 732 to_download = True 733 media_url_or_data = '' 734 size = 0 735 filename = None 736 737 def scan_doc_attributes(document): 738 attrib_file = attrib_av = filename = None 739 size = document.size 740 h_size = get_human_size(size) 741 for x in document.attributes: 742 if isinstance(x, tgty.DocumentAttributeVideo) or isinstance(x, tgty.DocumentAttributeAudio): 743 attrib_av = x 744 if isinstance(x, tgty.DocumentAttributeFilename): 745 attrib_file = x 746 filename = attrib_file.file_name if attrib_file else None 747 748 return size, h_size, attrib_av, filename 749 750 if isinstance(message.media, tgty.MessageMediaWebPage): 751 to_download = False 752 if isinstance(message.media.webpage, tgty.WebPage): 753 # web 754 return await self.handle_webpage(message.media.webpage, message, mid) 755 elif isinstance(message.media.webpage, tgty.WebPagePending): 756 media_type = 'webpending' 757 media_url_or_data = message.message 758 caption = '' 759 self.webpending[message.media.webpage.id] = message 760 else: 761 media_type = 'webunknown' 762 media_url_or_data = message.message 763 caption = '' 764 elif message.photo: 765 size, media_type = self.scan_photo_attributes(message.media.photo) 766 elif message.audio: 767 size, h_size, attrib_audio, filename = scan_doc_attributes(message.media.document) 768 dur = get_human_duration(attrib_audio.duration) if attrib_audio else '' 769 per = attrib_audio.performer or '' 770 tit = attrib_audio.title or '' 771 theme = ',{}/{}'.format(per, tit) if per or tit else '' 772 media_type = 'audio:{},{}{}'.format(h_size, dur, theme) 773 elif message.voice: 774 size, _, attrib_audio, filename = scan_doc_attributes(message.media.document) 775 dur = get_human_duration(attrib_audio.duration) if attrib_audio else '' 776 media_type = 'rec:{}'.format(dur) 777 elif message.video: 778 size, h_size, attrib_video, filename = scan_doc_attributes(message.media.document) 779 dur = get_human_duration(attrib_video.duration) if attrib_video else '' 780 media_type = 'video:{},{}'.format(h_size, dur) 781 elif message.video_note: media_type = 'videorec' 782 elif message.gif: media_type = 'anim' 783 elif message.sticker: media_type = 'sticker' 784 elif message.document: 785 size, h_size, _, filename = scan_doc_attributes(message.media.document) 786 media_type = 'file:{}'.format(h_size) 787 elif message.contact: 788 media_type = 'contact' 789 caption = '' 790 to_download = False 791 if message.media.first_name: 792 media_url_or_data += message.media.first_name + ' ' 793 if message.media.last_name: 794 media_url_or_data += message.media.last_name + ' ' 795 if message.media.phone_number: 796 media_url_or_data += message.media.phone_number 797 798 elif message.game: 799 media_type = 'game' 800 caption = '' 801 to_download = False 802 if message.media.game.title: 803 media_url_or_data = message.media.game.title 804 805 elif message.geo: 806 media_type = 'geo' 807 caption = '' 808 to_download = False 809 if self.geo_url: 810 geo_url = ' | ' + self.geo_url 811 else: 812 geo_url = '' 813 lat_long_template = 'lat: {lat}, long: {long}' + geo_url 814 media_url_or_data = lat_long_template.format(lat=message.media.geo.lat, long=message.media.geo.long) 815 816 elif message.invoice: 817 media_type = 'invoice' 818 caption = '' 819 to_download = False 820 media_url_or_data = '' 821 822 elif message.poll: 823 media_type = 'poll' 824 caption = '' 825 to_download = False 826 media_url_or_data = self.handle_poll(message.media.poll) 827 828 elif message.venue: 829 media_type = 'venue' 830 caption = '' 831 to_download = False 832 media_url_or_data = '' 833 else: 834 media_type = 'unknown' 835 caption = '' 836 to_download = False 837 media_url_or_data = message.message 838 839 if to_download: 840 relay_attr = (message, user, mid, media_type) 841 media_url_or_data = await self.download_telegram_media(message, mid, filename, size, relay_attr) 842 843 return self.format_media(media_type, media_url_or_data, caption) 844 845 def handle_poll(self, poll): 846 text = poll.question 847 for ans in poll.answers: 848 text += '\n* ' + ans.text 849 return text 850 851 def handle_target_mine(self, target, user): 852 # Add the target of messages sent by self user (me) 853 # received in other clients 854 target_id, target_type = self.get_peer_id_and_type(target) 855 if user is None and target_type == 'user' and target_id != self.id: 856 # self user^ 857 # as sender 858 irc_id = self.get_irc_name_from_telegram_id(target_id) 859 target_mine = '[T: {}] '.format(irc_id) 860 else: 861 target_mine = '' 862 return target_mine 863 864 async def handle_webpage(self, webpage, message, mid): 865 media_type = 'web' 866 logo = await self.download_telegram_media(message, mid) 867 if is_url_equiv(webpage.url, webpage.display_url): 868 url_data = webpage.url 869 else: 870 url_data = '{} | {}'.format(webpage.url, webpage.display_url) 871 if message: 872 # sometimes the 1st line of message contains the title, don't repeat it 873 message_line = message.message.splitlines()[0] 874 if message_line != webpage.title: 875 title = webpage.title 876 else: 877 title = '' 878 # extract the URL in the message, don't repeat it 879 message_url = extract_url(message.message) 880 if is_url_equiv(message_url, webpage.url): 881 if is_url_equiv(message_url, webpage.display_url): 882 media_url_or_data = message.message 883 else: 884 media_url_or_data = '{} | {}'.format(message.message, webpage.display_url) 885 else: 886 media_url_or_data = '{} | {}'.format(message.message, url_data) 887 else: 888 title = webpage.title 889 media_url_or_data = url_data 890 891 if title and logo: 892 caption = ' | {} | {}'.format(title, logo) 893 elif title: 894 caption = ' | {}'.format(title) 895 elif logo: 896 caption = ' | {}'.format(logo) 897 else: 898 caption = '' 899 900 return self.format_media(media_type, media_url_or_data, caption) 901 902 def format_media(self, media_type, media_url_or_data, caption): 903 return '[{}] {}{}'.format(media_type, media_url_or_data, caption) 904 905 def scan_photo_attributes(self, photo): 906 size = 0 907 sizes = photo.sizes 908 ph_size = sizes[-1] 909 if isinstance(ph_size, tgty.PhotoSizeProgressive): 910 size = ph_size.sizes[-1] 911 else: 912 for x in sizes: 913 if isinstance(x, tgty.PhotoSize): 914 if x.size > size: 915 size = x.size 916 ph_size = x 917 if hasattr(ph_size, 'w') and hasattr(ph_size, 'h'): 918 media_type = 'photo:{}x{}'.format(ph_size.w, ph_size.h) 919 else: 920 media_type = 'photo' 921 922 return size, media_type 923 924 async def download_telegram_media(self, message, mid, filename=None, size=0, relay_attr=None): 925 if not self.download: 926 return '' 927 if filename: 928 idd_file = add_filename(filename, mid) 929 new_file = sanitize_filename(idd_file) 930 new_path = os.path.join(self.telegram_media_dir, new_file) 931 if os.path.exists(new_path): 932 local_path = new_path 933 else: 934 await self.notice_downloading(size, relay_attr) 935 local_path = await message.download_media(new_path) 936 if not local_path: return '' 937 else: 938 await self.notice_downloading(size, relay_attr) 939 local_path = await message.download_media(self.telegram_media_dir) 940 if not local_path: return '' 941 filetype = os.path.splitext(local_path)[1] 942 gen_file = str(self.media_cn) + filetype 943 idd_file = add_filename(gen_file, mid) 944 new_file = sanitize_filename(idd_file) 945 self.media_cn += 1 946 new_path = os.path.join(self.telegram_media_dir, new_file) 947 948 if local_path != new_path: 949 os.replace(local_path, new_path) 950 if self.media_url[-1:] != '/': 951 self.media_url += '/' 952 return self.media_url + new_file 953 954 async def notice_downloading(self, size, relay_attr): 955 if relay_attr and size > self.notice_size: 956 message, user, mid, media_type = relay_attr 957 await self.relay_telegram_message(message, user, '[{}] [{}] [Downloading]'.format(mid, media_type)) 958 959 class mesg_id: 960 def __init__(self, alpha): 961 self.alpha = alpha 962 self.base = len(alpha) 963 self.alphaval = { i:v for v, i in enumerate(alpha) } 964 self.mesg_base = {} 965 966 def num_to_id(self, num, neg=''): 967 if num < 0: return self.num_to_id(-num, '-') 968 (high, low) = divmod(num, self.base) 969 if high >= self.base: 970 aux = self.num_to_id(high) 971 return neg + aux + self.alpha[low] 972 else: 973 return neg + self.alpha[high] + self.alpha[low] 974 975 def num_to_id_offset(self, peer, num): 976 peer_id = self.get_peer_id(peer) 977 if peer_id not in self.mesg_base: 978 self.mesg_base[peer_id] = num 979 return self.num_to_id(num - self.mesg_base[peer_id]) 980 981 def id_to_num(self, id, n=1): 982 if id: 983 if id[0] == '-': return self.id_to_num(id[1:], -1) 984 aux = self.alphaval[id[-1:]] * n 985 sum = self.id_to_num(id[:-1], n * self.base) 986 return sum + aux 987 else: 988 return 0 989 990 def id_to_num_offset(self, peer, mid): 991 peer_id = self.get_peer_id(peer) 992 if peer_id in self.mesg_base: 993 id_rel = self.id_to_num(mid) 994 id = id_rel + self.mesg_base[peer_id] 995 else: 996 id = None 997 return id 998 999 def get_peer_id(self, peer): 1000 if isinstance(peer, tgty.PeerChannel): 1001 id = peer.channel_id 1002 elif isinstance(peer, tgty.PeerChat): 1003 id = peer.chat_id 1004 elif isinstance(peer, tgty.PeerUser): 1005 id = peer.user_id 1006 else: 1007 id = peer 1008 return id