1 # irgramd: IRC-Telegram gateway 2 # service.py: IRC service/control command handlers 3 # 4 # Copyright (c) 2022,2023 E. Bosch <presidev@AT@gmail.com> 5 # 6 # Use of this source code is governed by a MIT style license that 7 # can be found in the LICENSE file included in this project. 8 9 from utils import compact_date, command, HELP 10 from telethon import utils as tgutils 11 12 class service(command): 13 def __init__(self, settings, telegram): 14 self.commands = \ 15 { # Command Handler Arguments Min Max Maxsplit 16 'code': (self.handle_command_code, 1, 1, -1), 17 'dialog': (self.handle_command_dialog, 1, 2, -1), 18 'get': (self.handle_command_get, 2, 2, -1), 19 'help': (self.handle_command_help, 0, 1, -1), 20 'history': (self.handle_command_history, 1, 3, -1), 21 'mark_read': (self.handle_command_mark_read, 1, 1, -1), 22 } 23 self.ask_code = settings['ask_code'] 24 self.tg = telegram 25 self.irc = telegram.irc 26 self.tmp_ircnick = None 27 28 def initial_help(self): 29 return ( 30 'Welcome to irgramd service', 31 'use /msg {} help'.format(self.irc.service_user.irc_nick), 32 'or equivalent in your IRC client', 33 'to get help', 34 ) 35 36 async def handle_command_code(self, code=None, help=None): 37 if not help: 38 if self.ask_code: 39 reply = ('Code will be asked on console',) 40 elif code.isdigit(): 41 try: 42 await self.tg.telegram_client.sign_in(code=code) 43 except: 44 reply = ('Invalid code',) 45 else: 46 reply = ('Valid code', 'Telegram account authorized') 47 await self.tg.continue_auth() 48 else: # not isdigit 49 reply = ('Code must be numeric',) 50 51 else: # HELP.brief or HELP.desc (first line) 52 reply = (' code Enter authorization code',) 53 if help == HELP.desc: # rest of HELP.desc 54 reply += \ 55 ( 56 ' code <code>', 57 'Enter authorization code sent by Telegram to the phone or to', 58 'another client connected.', 59 'This authorization code usually is only needed the first time', 60 'that irgramd connects to Telegram with a given account.', 61 ) 62 return reply 63 64 async def handle_command_dialog(self, command=None, id=None, help=None): 65 if not help: 66 if command == 'archive': 67 pass 68 elif command == 'delete': 69 pass 70 elif command == 'list': 71 reply = \ 72 ( 73 'Dialogs:', 74 ' {:<11} {:<9} {:<9} {:5} {:<3} {:<4} {:<6} {}'.format( 75 'Id', 'Unread', 'Mentions', 'Type', 'Pin', 'Arch', 'Last', 'Name'), 76 ) 77 async for dialog in self.tg.telegram_client.iter_dialogs(): 78 id, type = tgutils.resolve_id(dialog.id) 79 unr = dialog.unread_count 80 men = dialog.unread_mentions_count 81 ty = self.tg.get_entity_type(dialog.entity, format='short') 82 pin = 'Yes' if dialog.pinned else 'No' 83 arch = 'Yes' if dialog.archived else 'No' 84 last = compact_date(dialog.date, self.tg.timezone) 85 if id == self.tg.id: 86 name_in_irc = self.tmp_ircnick 87 else: 88 name_in_irc = self.tg.get_irc_name_from_telegram_id(id) 89 90 reply += (' {:<11d} {:<9d} {:<9d} {:5} {:<3} {:<4} {:<6} {}'.format( 91 id, unr, men, ty, pin, arch, last, name_in_irc), 92 ) 93 94 else: # HELP.brief or HELP.desc (first line) 95 reply = (' dialog Manage conversations (dialogs)',) 96 if help == HELP.desc: # rest of HELP.desc 97 reply += \ 98 ( 99 ' dialog <subcommand> [id]', 100 'Manage conversations (dialogs) established in Telegram, the', 101 'following subcommands are available:', 102 # ' archive <id> Archive the dialog specified by id', 103 # ' delete <id> Delete the dialog specified by id', 104 ' list Show all dialogs', 105 ) 106 return reply 107 108 async def handle_command_get(self, peer=None, mid=None, help=None): 109 if not help: 110 msg = None 111 peer_id, reply = self.get_peer_id(peer.lower()) 112 if reply: return reply 113 else: reply = () 114 115 # If the ID starts with '=' is absolute ID, not compact ID 116 # character '=' is not used by compact IDs 117 if mid[0] == '=': 118 id = int(mid[1:]) 119 else: 120 id = self.tg.mid.id_to_num_offset(peer_id, mid) 121 if id is not None: 122 msg = await self.tg.telegram_client.get_messages(entity=peer_id, ids=id) 123 if msg is not None: 124 await self.tg.handle_telegram_message(event=None, message=msg, history=True) 125 else: 126 reply = ('Message not found',) 127 return reply 128 129 else: # HELP.brief or HELP.desc (first line) 130 reply = (' get Get a message by id and peer',) 131 if help == HELP.desc: # rest of HELP.desc 132 reply += \ 133 ( 134 ' get <peer> <compact_id|=ID>', 135 'Get one message from peer with the compact or absolute ID', 136 ) 137 return reply 138 139 async def handle_command_help(self, help_command=None, help=None): 140 141 start_help = ('*** Telegram Service Help ***',) 142 end_help = ('*** End of Help ***',) 143 144 if help == HELP.brief: 145 help_text = (' help This help',) 146 elif not help_command or help_command == 'help': 147 help_text = start_help 148 help_text += \ 149 ( 150 'This service contains specific Telegram commands that irgramd', 151 'cannot map to IRC commands. The following commands are available:', 152 ) 153 for command in self.commands.values(): 154 handler = command[0] 155 help_text += await handler(help=HELP.brief) 156 help_text += \ 157 ( 158 'The commands begining with ! (exclamation) must be used directly', 159 'in channels or chats. The following ! commands are available:', 160 ) 161 for command in self.irc.exclam.commands.values(): 162 handler = command[0] 163 help_text += await handler(help=HELP.brief) 164 help_text += \ 165 ( 166 'If you need more information about a specific command you can use', 167 'help <command>', 168 ) 169 help_text += end_help 170 elif help_command in (all_commands := dict(**self.commands, **self.irc.exclam.commands)).keys(): 171 handler = all_commands[help_command][0] 172 help_text = start_help 173 help_text += await handler(help=HELP.desc) 174 help_text += end_help 175 else: 176 help_text = ('help: Unknown command',) 177 return help_text 178 179 async def handle_command_history(self, peer=None, limit='10', add_unread=None, help=None): 180 if not help: 181 async def get_unread(tgt): 182 async for dialog in self.tg.telegram_client.iter_dialogs(): 183 id, type = tgutils.resolve_id(dialog.id) 184 if id in self.tg.tid_to_iid.keys(): 185 name = self.tg.tid_to_iid[id] 186 if tgt == name.lower(): 187 count = dialog.unread_count 188 reply = None 189 break 190 else: 191 count = None 192 reply = ('Unknown unread',) 193 return count, reply 194 195 def conv_int(num_str): 196 if num_str.isdigit(): 197 n = int(num_str) 198 err = None 199 else: 200 n = None 201 err = ('Invalid argument',) 202 return n, err 203 204 tgt = peer.lower() 205 peer_id, reply = self.get_peer_id(tgt) 206 if reply: return reply 207 208 if limit == 'unread': 209 add_unread = '0' if add_unread is None else add_unread 210 add_unread_int, reply = conv_int(add_unread) 211 if reply: return reply 212 213 li, reply = await get_unread(tgt) 214 if reply: return reply 215 li += add_unread_int 216 elif add_unread is not None: 217 reply = ('Wrong number of arguments',) 218 return reply 219 elif limit == 'all': 220 li = None 221 else: 222 li, reply = conv_int(limit) 223 if reply: return reply 224 225 his = await self.tg.telegram_client.get_messages(peer_id, limit=li) 226 for msg in reversed(his): 227 await self.tg.handle_telegram_message(event=None, message=msg, history=True) 228 reply = () 229 return reply 230 231 else: # HELP.brief or HELP.desc (first line) 232 reply = (' history Get messages from history',) 233 if help == HELP.desc: # rest of HELP.desc 234 reply += \ 235 ( 236 ' history <peer> [<limit>|all|unread [<plusN>]]', 237 'Get last <limit> number of messages already sent to <peer>', 238 '(channel or user). If not set <limit> is 10.', 239 'Instead of <limit>, "unread" is for messages not marked as read,', 240 'optionally <plusN> number of previous messages to the first unread.', 241 'Instead of <limit>, "all" is for retrieving all available messages', 242 'in <peer>.', 243 ) 244 return reply 245 246 async def handle_command_mark_read(self, peer=None, help=None): 247 if not help: 248 peer_id, reply = self.get_peer_id(peer.lower()) 249 if reply: return reply 250 251 await self.tg.telegram_client.send_read_acknowledge(peer_id, clear_mentions=True) 252 reply = ('',) 253 else: # HELP.brief or HELP.desc (first line) 254 reply = (' mark_read Mark messages as read',) 255 if help == HELP.desc: # rest of HELP.desc 256 reply += \ 257 ( 258 ' mark_read <peer>', 259 'Mark all messages on <peer> (channel or user) as read, this also will', 260 'reset the number of mentions to you on <peer>.', 261 ) 262 return reply 263 264 def get_peer_id(self, tgt): 265 if tgt in self.irc.users or tgt in self.irc.irc_channels: 266 peer_id = self.tg.get_tid(tgt) 267 reply = None 268 else: 269 peer_id = None 270 reply = ('Unknown user or channel',) 271 return peer_id, reply