1 # irgramd: IRC-Telegram gateway 2 # service.py: IRC service/control command handlers 3 # 4 # Copyright (c) 2022,2023 E. Bosch <presidev@AT@gmail.com> 5 # 6 # Use of this source code is governed by a MIT style license that 7 # can be found in the LICENSE file included in this project. 8 9 from utils import compact_date, command, HELP 10 from telethon import utils as tgutils 11 12 class service(command): 13 def __init__(self, settings, telegram): 14 self.commands = \ 15 { # Command Handler Arguments Min Max Maxsplit 16 'code': (self.handle_command_code, 1, 1, -1), 17 'dialog': (self.handle_command_dialog, 1, 2, -1), 18 'get': (self.handle_command_get, 2, 2, -1), 19 'help': (self.handle_command_help, 0, 1, -1), 20 'history': (self.handle_command_history, 1, 3, -1), 21 'mark_read': (self.handle_command_mark_read, 1, 1, -1), 22 } 23 self.ask_code = settings['ask_code'] 24 self.tg = telegram 25 self.irc = telegram.irc 26 self.tmp_ircnick = None 27 28 async def handle_command_code(self, code=None, help=None): 29 if not help: 30 if self.ask_code: 31 reply = ('Code will be asked on console',) 32 elif code.isdigit(): 33 try: 34 await self.tg.telegram_client.sign_in(code=code) 35 except: 36 reply = ('Invalid code',) 37 else: 38 reply = ('Valid code', 'Telegram account authorized') 39 await self.tg.continue_auth() 40 else: # not isdigit 41 reply = ('Code must be numeric',) 42 43 else: # HELP.brief or HELP.desc (first line) 44 reply = (' code Enter authorization code',) 45 if help == HELP.desc: # rest of HELP.desc 46 reply += \ 47 ( 48 ' code <code>', 49 'Enter authorization code sent by Telegram to the phone or to', 50 'another client connected.', 51 'This authorization code usually is only needed the first time', 52 'that irgramd connects to Telegram with a given account.', 53 ) 54 return reply 55 56 async def handle_command_dialog(self, command=None, id=None, help=None): 57 if not help: 58 if command == 'archive': 59 pass 60 elif command == 'delete': 61 pass 62 elif command == 'list': 63 reply = \ 64 ( 65 'Dialogs:', 66 ' {:<11} {:<9} {:<9} {:5} {:<3} {:<4} {:<6} {}'.format( 67 'Id', 'Unread', 'Mentions', 'Type', 'Pin', 'Arch', 'Last', 'Name'), 68 ) 69 async for dialog in self.tg.telegram_client.iter_dialogs(): 70 id, type = tgutils.resolve_id(dialog.id) 71 unr = dialog.unread_count 72 men = dialog.unread_mentions_count 73 ty = self.tg.get_entity_type(dialog.entity, format='short') 74 pin = 'Yes' if dialog.pinned else 'No' 75 arch = 'Yes' if dialog.archived else 'No' 76 last = compact_date(dialog.date, self.tg.timezone) 77 if id == self.tg.id: 78 name_in_irc = self.tmp_ircnick 79 else: 80 name_in_irc = self.tg.get_irc_name_from_telegram_id(id) 81 82 reply += (' {:<11d} {:<9d} {:<9d} {:5} {:<3} {:<4} {:<6} {}'.format( 83 id, unr, men, ty, pin, arch, last, name_in_irc), 84 ) 85 86 else: # HELP.brief or HELP.desc (first line) 87 reply = (' dialog Manage conversations (dialogs)',) 88 if help == HELP.desc: # rest of HELP.desc 89 reply += \ 90 ( 91 ' dialog <subcommand> [id]', 92 'Manage conversations (dialogs) established in Telegram, the', 93 'following subcommands are available:', 94 # ' archive <id> Archive the dialog specified by id', 95 # ' delete <id> Delete the dialog specified by id', 96 ' list Show all dialogs', 97 ) 98 return reply 99 100 async def handle_command_get(self, peer=None, mid=None, help=None): 101 if not help: 102 msg = None 103 peer_id, reply = self.get_peer_id(peer.lower()) 104 if reply: return reply 105 else: reply = () 106 107 # If the ID starts with '=' is absolute ID, not compact ID 108 # character '=' is not used by compact IDs 109 if mid[0] == '=': 110 id = int(mid[1:]) 111 else: 112 id = self.tg.mid.id_to_num_offset(peer_id, mid) 113 if id is not None: 114 msg = await self.tg.telegram_client.get_messages(entity=peer_id, ids=id) 115 if msg is not None: 116 await self.tg.handle_telegram_message(event=None, message=msg, history=True) 117 else: 118 reply = ('Message not found',) 119 return reply 120 121 else: # HELP.brief or HELP.desc (first line) 122 reply = (' get Get a message by id and peer',) 123 if help == HELP.desc: # rest of HELP.desc 124 reply += \ 125 ( 126 ' get <peer> <compact_id|=ID>', 127 'Get one message from peer with the compact or absolute ID', 128 ) 129 return reply 130 131 async def handle_command_help(self, help_command=None, help=None): 132 133 start_help = ('*** Telegram Service Help ***',) 134 end_help = ('*** End of Help ***',) 135 136 if help == HELP.brief: 137 help_text = (' help This help',) 138 elif not help_command or help_command == 'help': 139 help_text = start_help 140 help_text += \ 141 ( 142 'This service contains specific Telegram commands that irgramd', 143 'cannot map to IRC commands. The following commands are available:', 144 ) 145 for command in self.commands.values(): 146 handler = command[0] 147 help_text += await handler(help=HELP.brief) 148 help_text += \ 149 ( 150 'The commands begining with ! (exclamation) must be used directly', 151 'in channels or chats. The following ! commands are available:', 152 ) 153 for command in self.irc.exclam.commands.values(): 154 handler = command[0] 155 help_text += await handler(help=HELP.brief) 156 help_text += \ 157 ( 158 'If you need more information about a specific command you can use', 159 'help <command>', 160 ) 161 help_text += end_help 162 elif help_command in (all_commands := dict(**self.commands, **self.irc.exclam.commands)).keys(): 163 handler = all_commands[help_command][0] 164 help_text = start_help 165 help_text += await handler(help=HELP.desc) 166 help_text += end_help 167 else: 168 help_text = ('help: Unknown command',) 169 return help_text 170 171 async def handle_command_history(self, peer=None, limit='10', add_unread=None, help=None): 172 if not help: 173 async def get_unread(tgt): 174 async for dialog in self.tg.telegram_client.iter_dialogs(): 175 id, type = tgutils.resolve_id(dialog.id) 176 if id in self.tg.tid_to_iid.keys(): 177 name = self.tg.tid_to_iid[id] 178 if tgt == name.lower(): 179 count = dialog.unread_count 180 reply = None 181 break 182 else: 183 count = None 184 reply = ('Unknown unread',) 185 return count, reply 186 187 def conv_int(num_str): 188 if num_str.isdigit(): 189 n = int(num_str) 190 err = None 191 else: 192 n = None 193 err = ('Invalid argument',) 194 return n, err 195 196 tgt = peer.lower() 197 peer_id, reply = self.get_peer_id(tgt) 198 if reply: return reply 199 200 if limit == 'unread': 201 add_unread = '0' if add_unread is None else add_unread 202 add_unread_int, reply = conv_int(add_unread) 203 if reply: return reply 204 205 li, reply = await get_unread(tgt) 206 if reply: return reply 207 li += add_unread_int 208 elif add_unread is not None: 209 reply = ('Wrong number of arguments',) 210 return reply 211 elif limit == 'all': 212 li = None 213 else: 214 li, reply = conv_int(limit) 215 if reply: return reply 216 217 his = await self.tg.telegram_client.get_messages(peer_id, limit=li) 218 for msg in reversed(his): 219 await self.tg.handle_telegram_message(event=None, message=msg, history=True) 220 reply = () 221 return reply 222 223 else: # HELP.brief or HELP.desc (first line) 224 reply = (' history Get messages from history',) 225 if help == HELP.desc: # rest of HELP.desc 226 reply += \ 227 ( 228 ' history <peer> [<limit>|all|unread [<plusN>]]', 229 'Get last <limit> number of messages already sent to <peer>', 230 '(channel or user). If not set <limit> is 10.', 231 'Instead of <limit>, "unread" is for messages not marked as read,', 232 'optionally <plusN> number of previous messages to the first unread.', 233 'Instead of <limit>, "all" is for retrieving all available messages', 234 'in <peer>.', 235 ) 236 return reply 237 238 async def handle_command_mark_read(self, peer=None, help=None): 239 if not help: 240 peer_id, reply = self.get_peer_id(peer.lower()) 241 if reply: return reply 242 243 await self.tg.telegram_client.send_read_acknowledge(peer_id, clear_mentions=True) 244 reply = ('',) 245 else: # HELP.brief or HELP.desc (first line) 246 reply = (' mark_read Mark messages as read',) 247 if help == HELP.desc: # rest of HELP.desc 248 reply += \ 249 ( 250 ' mark_read <peer>', 251 'Mark all messages on <peer> (channel or user) as read, this also will', 252 'reset the number of mentions to you on <peer>.' 253 ) 254 return reply 255 256 def get_peer_id(self, tgt): 257 if tgt in self.irc.users or tgt in self.irc.irc_channels: 258 peer_id = self.tg.get_tid(tgt) 259 reply = None 260 else: 261 peer_id = None 262 reply = ('Unknown user or channel',) 263 return peer_id, reply