/
service.py
  1 # irgramd: IRC-Telegram gateway
  2 # service.py: IRC service/control command handlers
  3 #
  4 # Copyright (c) 2022,2023 E. Bosch <presidev@AT@gmail.com>
  5 #
  6 # Use of this source code is governed by a MIT style license that
  7 # can be found in the LICENSE file included in this project.
  8 
  9 from utils import compact_date, command, HELP
 10 from telethon import utils as tgutils
 11 
 12 class service(command):
 13     def __init__(self, settings, telegram):
 14         self.commands = \
 15         { # Command         Handler                       Arguments  Min Max Maxsplit
 16             'code':        (self.handle_command_code,                 1,  1, -1),
 17             'dialog':      (self.handle_command_dialog,               1,  2, -1),
 18             'get':         (self.handle_command_get,                  2,  2, -1),
 19             'help':        (self.handle_command_help,                 0,  1, -1),
 20             'history':     (self.handle_command_history,              1,  3, -1),
 21             'mark_read':   (self.handle_command_mark_read,            1,  1, -1),
 22         }
 23         self.ask_code = settings['ask_code']
 24         self.tg = telegram
 25         self.irc = telegram.irc
 26         self.tmp_ircnick = None
 27 
 28     def initial_help(self):
 29         return (
 30                   'Welcome to irgramd service',
 31                   'use /msg {} help'.format(self.irc.service_user.irc_nick),
 32                   'or equivalent in your IRC client',
 33                   'to get help',
 34                )
 35 
 36     async def handle_command_code(self, code=None, help=None):
 37         if not help:
 38             if self.ask_code:
 39                 reply = ('Code will be asked on console',)
 40             elif code.isdigit():
 41                 try:
 42                     await self.tg.telegram_client.sign_in(code=code)
 43                 except:
 44                     reply = ('Invalid code',)
 45                 else:
 46                     reply = ('Valid code', 'Telegram account authorized')
 47                     await self.tg.continue_auth()
 48             else: # not isdigit
 49                 reply = ('Code must be numeric',)
 50 
 51         else: # HELP.brief or HELP.desc (first line)
 52             reply = ('   code        Enter authorization code',)
 53         if help == HELP.desc:  # rest of HELP.desc
 54             reply += \
 55             (
 56               '   code <code>',
 57               'Enter authorization code sent by Telegram to the phone or to',
 58               'another client connected.',
 59               'This authorization code usually is only needed the first time',
 60               'that irgramd connects to Telegram with a given account.',
 61             )
 62         return reply
 63 
 64     async def handle_command_dialog(self, command=None, id=None, help=None):
 65         if not help:
 66             if command == 'archive':
 67                 pass
 68             elif command == 'delete':
 69                 pass
 70             elif command == 'list':
 71                 reply = \
 72                 (
 73                   'Dialogs:',
 74                   ' {:<11} {:<9} {:<9} {:5} {:<3} {:<4} {:<6}  {}'.format(
 75                       'Id', 'Unread', 'Mentions', 'Type', 'Pin', 'Arch', 'Last', 'Name'),
 76                 )
 77                 async for dialog in self.tg.telegram_client.iter_dialogs():
 78                     id, type = tgutils.resolve_id(dialog.id)
 79                     unr = dialog.unread_count
 80                     men = dialog.unread_mentions_count
 81                     ty = self.tg.get_entity_type(dialog.entity, format='short')
 82                     pin = 'Yes' if dialog.pinned else 'No'
 83                     arch = 'Yes' if dialog.archived else 'No'
 84                     last = compact_date(dialog.date, self.tg.timezone)
 85                     if id == self.tg.id:
 86                         name_in_irc = self.tmp_ircnick
 87                     else:
 88                         name_in_irc = self.tg.get_irc_name_from_telegram_id(id)
 89 
 90                     reply += (' {:<11d} {:<9d} {:<9d} {:5} {:<3} {:<4} {:<6}  {}'.format(
 91                                 id,     unr,   men,   ty,  pin,  arch, last, name_in_irc),
 92                              )
 93 
 94         else: # HELP.brief or HELP.desc (first line)
 95             reply = ('   dialog      Manage conversations (dialogs)',)
 96         if help == HELP.desc:  # rest of HELP.desc
 97             reply += \
 98             (
 99               '   dialog <subcommand> [id]',
100               'Manage conversations (dialogs) established in Telegram, the',
101               'following subcommands are available:',
102 #              '   archive <id>   Archive the dialog specified by id',
103 #              '   delete <id>    Delete the dialog specified by id',
104               '   list           Show all dialogs',
105             )
106         return reply
107 
108     async def handle_command_get(self, peer=None, mid=None, help=None):
109         if not help:
110             msg = None
111             peer_id, reply = self.get_peer_id(peer.lower())
112             if reply: return reply
113             else: reply = ()
114 
115             # If the ID starts with '=' is absolute ID, not compact ID
116             # character '=' is not used by compact IDs
117             if mid[0] == '=':
118                 id = int(mid[1:])
119             else:
120                 id = self.tg.mid.id_to_num_offset(peer_id, mid)
121             if id is not None:
122                 msg = await self.tg.telegram_client.get_messages(entity=peer_id, ids=id)
123             if msg is not None:
124                 await self.tg.handle_telegram_message(event=None, message=msg, history=True)
125             else:
126                 reply = ('Message not found',)
127             return reply
128 
129         else: # HELP.brief or HELP.desc (first line)
130             reply = ('   get         Get a message by id and peer',)
131         if help == HELP.desc:  # rest of HELP.desc
132             reply += \
133             (
134               '   get <peer> <compact_id|=ID>',
135               'Get one message from peer with the compact or absolute ID',
136             )
137         return reply
138 
139     async def handle_command_help(self, help_command=None, help=None):
140 
141         start_help = ('*** Telegram Service Help ***',)
142         end_help = ('*** End of Help ***',)
143 
144         if help == HELP.brief:
145             help_text = ('   help        This help',)
146         elif not help_command or help_command == 'help':
147             help_text = start_help
148             help_text += \
149             (
150               'This service contains specific Telegram commands that irgramd',
151               'cannot map to IRC commands. The following commands are available:',
152             )
153             for command in self.commands.values():
154                 handler = command[0]
155                 help_text += await handler(help=HELP.brief)
156             help_text += \
157             (
158               'The commands begining with ! (exclamation) must be used directly',
159               'in channels or chats. The following ! commands are available:',
160             )
161             for command in self.irc.exclam.commands.values():
162                 handler = command[0]
163                 help_text += await handler(help=HELP.brief)
164             help_text += \
165             (
166               'If you need more information about a specific command you can use',
167               'help <command>',
168             )
169             help_text += end_help
170         elif help_command in (all_commands := dict(**self.commands, **self.irc.exclam.commands)).keys():
171             handler = all_commands[help_command][0]
172             help_text = start_help
173             help_text += await handler(help=HELP.desc)
174             help_text += end_help
175         else:
176             help_text = ('help: Unknown command',)
177         return help_text
178 
179     async def handle_command_history(self, peer=None, limit='10', add_unread=None, help=None):
180         if not help:
181             async def get_unread(tgt):
182                 async for dialog in self.tg.telegram_client.iter_dialogs():
183                     id, type = tgutils.resolve_id(dialog.id)
184                     if id in self.tg.tid_to_iid.keys():
185                         name = self.tg.tid_to_iid[id]
186                         if tgt == name.lower():
187                             count = dialog.unread_count
188                             reply = None
189                             break
190                 else:
191                     count = None
192                     reply = ('Unknown unread',)
193                 return count, reply
194 
195             def conv_int(num_str):
196                 if num_str.isdigit():
197                     n = int(num_str)
198                     err = None
199                 else:
200                     n = None
201                     err = ('Invalid argument',)
202                 return n, err
203 
204             tgt = peer.lower()
205             peer_id, reply = self.get_peer_id(tgt)
206             if reply: return reply
207 
208             if limit == 'unread':
209                 add_unread = '0' if add_unread is None else add_unread
210                 add_unread_int, reply = conv_int(add_unread)
211                 if reply: return reply
212 
213                 li, reply = await get_unread(tgt)
214                 if reply: return reply
215                 li += add_unread_int
216             elif add_unread is not None:
217                 reply = ('Wrong number of arguments',)
218                 return reply
219             elif limit == 'all':
220                 li = None
221             else:
222                 li, reply = conv_int(limit)
223                 if reply: return reply
224 
225             his = await self.tg.telegram_client.get_messages(peer_id, limit=li)
226             for msg in reversed(his):
227                 await self.tg.handle_telegram_message(event=None, message=msg, history=True)
228             reply = ()
229             return reply
230 
231         else: # HELP.brief or HELP.desc (first line)
232             reply = ('   history     Get messages from history',)
233         if help == HELP.desc:  # rest of HELP.desc
234             reply += \
235             (
236               '   history <peer> [<limit>|all|unread [<plusN>]]',
237               'Get last <limit> number of messages already sent to <peer>',
238               '(channel or user). If not set <limit> is 10.',
239               'Instead of <limit>, "unread" is for messages not marked as read,',
240               'optionally <plusN> number of previous messages to the first unread.',
241               'Instead of <limit>, "all" is for retrieving all available messages',
242               'in <peer>.',
243             )
244         return reply
245 
246     async def handle_command_mark_read(self, peer=None, help=None):
247         if not help:
248             peer_id, reply = self.get_peer_id(peer.lower())
249             if reply: return reply
250 
251             await self.tg.telegram_client.send_read_acknowledge(peer_id, clear_mentions=True)
252             reply = ('',)
253         else: # HELP.brief or HELP.desc (first line)
254             reply = ('   mark_read   Mark messages as read',)
255         if help == HELP.desc:  # rest of HELP.desc
256             reply += \
257             (
258               '   mark_read <peer>',
259               'Mark all messages on <peer> (channel or user) as read, this also will',
260               'reset the number of mentions to you on <peer>.',
261             )
262         return reply
263 
264     def get_peer_id(self, tgt):
265         if tgt in self.irc.users or tgt in self.irc.irc_channels:
266             peer_id = self.tg.get_tid(tgt)
267             reply = None
268         else:
269             peer_id = None
270             reply = ('Unknown user or channel',)
271         return peer_id, reply