/
/service.py
  1 # irgramd: IRC-Telegram gateway
  2 # service.py: IRC service/control command handlers
  3 #
  4 # Copyright (c) 2022,2023 E. Bosch <presidev@AT@gmail.com>
  5 #
  6 # Use of this source code is governed by a MIT style license that
  7 # can be found in the LICENSE file included in this project.
  8 
  9 from utils import compact_date, command, HELP
 10 from telethon import utils as tgutils
 11 
 12 class service(command):
 13     def __init__(self, settings, telegram):
 14         self.commands = \
 15         { # Command         Handler                       Arguments  Min Max Maxsplit
 16             'code':        (self.handle_command_code,                 1,  1, -1),
 17             'dialog':      (self.handle_command_dialog,               1,  2, -1),
 18             'get':         (self.handle_command_get,                  2,  2, -1),
 19             'help':        (self.handle_command_help,                 0,  1, -1),
 20             'history':     (self.handle_command_history,              1,  3, -1),
 21             'mark_read':   (self.handle_command_mark_read,            1,  1, -1),
 22         }
 23         self.ask_code = settings['ask_code']
 24         self.tg = telegram
 25         self.irc = telegram.irc
 26         self.tmp_ircnick = None
 27 
 28     async def handle_command_code(self, code=None, help=None):
 29         if not help:
 30             if self.ask_code:
 31                 reply = ('Code will be asked on console',)
 32             elif code.isdigit():
 33                 try:
 34                     await self.tg.telegram_client.sign_in(code=code)
 35                 except:
 36                     reply = ('Invalid code',)
 37                 else:
 38                     reply = ('Valid code', 'Telegram account authorized')
 39                     await self.tg.continue_auth()
 40             else: # not isdigit
 41                 reply = ('Code must be numeric',)
 42 
 43         else: # HELP.brief or HELP.desc (first line)
 44             reply = ('   code        Enter authorization code',)
 45         if help == HELP.desc:  # rest of HELP.desc
 46             reply += \
 47             (
 48               '   code <code>',
 49               'Enter authorization code sent by Telegram to the phone or to',
 50               'another client connected.',
 51               'This authorization code usually is only needed the first time',
 52               'that irgramd connects to Telegram with a given account.',
 53             )
 54         return reply
 55 
 56     async def handle_command_dialog(self, command=None, id=None, help=None):
 57         if not help:
 58             if command == 'archive':
 59                 pass
 60             elif command == 'delete':
 61                 pass
 62             elif command == 'list':
 63                 reply = \
 64                 (
 65                   'Dialogs:',
 66                   ' {:<11} {:<9} {:<9} {:5} {:<3} {:<4} {:<6}  {}'.format(
 67                       'Id', 'Unread', 'Mentions', 'Type', 'Pin', 'Arch', 'Last', 'Name'),
 68                 )
 69                 async for dialog in self.tg.telegram_client.iter_dialogs():
 70                     id, type = tgutils.resolve_id(dialog.id)
 71                     unr = dialog.unread_count
 72                     men = dialog.unread_mentions_count
 73                     ty = self.tg.get_entity_type(dialog.entity, format='short')
 74                     pin = 'Yes' if dialog.pinned else 'No'
 75                     arch = 'Yes' if dialog.archived else 'No'
 76                     last = compact_date(dialog.date, self.tg.timezone)
 77                     if id == self.tg.id:
 78                         name_in_irc = self.tmp_ircnick
 79                     else:
 80                         name_in_irc = self.tg.get_irc_name_from_telegram_id(id)
 81 
 82                     reply += (' {:<11d} {:<9d} {:<9d} {:5} {:<3} {:<4} {:<6}  {}'.format(
 83                                 id,     unr,   men,   ty,  pin,  arch, last, name_in_irc),
 84                              )
 85 
 86         else: # HELP.brief or HELP.desc (first line)
 87             reply = ('   dialog      Manage conversations (dialogs)',)
 88         if help == HELP.desc:  # rest of HELP.desc
 89             reply += \
 90             (
 91               '   dialog <subcommand> [id]',
 92               'Manage conversations (dialogs) established in Telegram, the',
 93               'following subcommands are available:',
 94 #              '   archive <id>   Archive the dialog specified by id',
 95 #              '   delete <id>    Delete the dialog specified by id',
 96               '   list           Show all dialogs',
 97             )
 98         return reply
 99 
100     async def handle_command_get(self, peer=None, mid=None, help=None):
101         if not help:
102             msg = None
103             peer_id, reply = self.get_peer_id(peer.lower())
104             if reply: return reply
105             else: reply = ()
106 
107             # If the ID starts with '=' is absolute ID, not compact ID
108             # character '=' is not used by compact IDs
109             if mid[0] == '=':
110                 id = int(mid[1:])
111             else:
112                 id = self.tg.mid.id_to_num_offset(peer_id, mid)
113             if id is not None:
114                 msg = await self.tg.telegram_client.get_messages(entity=peer_id, ids=id)
115             if msg is not None:
116                 await self.tg.handle_telegram_message(event=None, message=msg, history=True)
117             else:
118                 reply = ('Message not found',)
119             return reply
120 
121         else: # HELP.brief or HELP.desc (first line)
122             reply = ('   get         Get a message by id and peer',)
123         if help == HELP.desc:  # rest of HELP.desc
124             reply += \
125             (
126               '   get <peer> <compact_id|=ID>',
127               'Get one message from peer with the compact or absolute ID',
128             )
129         return reply
130 
131     async def handle_command_help(self, help_command=None, help=None):
132 
133         start_help = ('*** Telegram Service Help ***',)
134         end_help = ('*** End of Help ***',)
135 
136         if help == HELP.brief:
137             help_text = ('   help        This help',)
138         elif not help_command or help_command == 'help':
139             help_text = start_help
140             help_text += \
141             (
142               'This service contains specific Telegram commands that irgramd',
143               'cannot map to IRC commands. The following commands are available:',
144             )
145             for command in self.commands.values():
146                 handler = command[0]
147                 help_text += await handler(help=HELP.brief)
148             help_text += \
149             (
150               'The commands begining with ! (exclamation) must be used directly',
151               'in channels or chats. The following ! commands are available:',
152             )
153             for command in self.irc.exclam.commands.values():
154                 handler = command[0]
155                 help_text += await handler(help=HELP.brief)
156             help_text += \
157             (
158               'If you need more information about a specific command you can use',
159               'help <command>',
160             )
161             help_text += end_help
162         elif help_command in (all_commands := dict(**self.commands, **self.irc.exclam.commands)).keys():
163             handler = all_commands[help_command][0]
164             help_text = start_help
165             help_text += await handler(help=HELP.desc)
166             help_text += end_help
167         else:
168             help_text = ('help: Unknown command',)
169         return help_text
170 
171     async def handle_command_history(self, peer=None, limit='10', add_unread=None, help=None):
172         if not help:
173             async def get_unread(tgt):
174                 async for dialog in self.tg.telegram_client.iter_dialogs():
175                     id, type = tgutils.resolve_id(dialog.id)
176                     if id in self.tg.tid_to_iid.keys():
177                         name = self.tg.tid_to_iid[id]
178                         if tgt == name.lower():
179                             count = dialog.unread_count
180                             reply = None
181                             break
182                 else:
183                     count = None
184                     reply = ('Unknown unread',)
185                 return count, reply
186 
187             def conv_int(num_str):
188                 if num_str.isdigit():
189                     n = int(num_str)
190                     err = None
191                 else:
192                     n = None
193                     err = ('Invalid argument',)
194                 return n, err
195 
196             tgt = peer.lower()
197             peer_id, reply = self.get_peer_id(tgt)
198             if reply: return reply
199 
200             if limit == 'unread':
201                 add_unread = '0' if add_unread is None else add_unread
202                 add_unread_int, reply = conv_int(add_unread)
203                 if reply: return reply
204 
205                 li, reply = await get_unread(tgt)
206                 if reply: return reply
207                 li += add_unread_int
208             elif add_unread is not None:
209                 reply = ('Wrong number of arguments',)
210                 return reply
211             elif limit == 'all':
212                 li = None
213             else:
214                 li, reply = conv_int(limit)
215                 if reply: return reply
216 
217             his = await self.tg.telegram_client.get_messages(peer_id, limit=li)
218             for msg in reversed(his):
219                 await self.tg.handle_telegram_message(event=None, message=msg, history=True)
220             reply = ()
221             return reply
222 
223         else: # HELP.brief or HELP.desc (first line)
224             reply = ('   history     Get messages from history',)
225         if help == HELP.desc:  # rest of HELP.desc
226             reply += \
227             (
228               '   history <peer> [<limit>|all|unread [<plusN>]]',
229               'Get last <limit> number of messages already sent to <peer>',
230               '(channel or user). If not set <limit> is 10.',
231               'Instead of <limit>, "unread" is for messages not marked as read,',
232               'optionally <plusN> number of previous messages to the first unread.',
233               'Instead of <limit>, "all" is for retrieving all available messages',
234               'in <peer>.',
235             )
236         return reply
237 
238     async def handle_command_mark_read(self, peer=None, help=None):
239         if not help:
240             peer_id, reply = self.get_peer_id(peer.lower())
241             if reply: return reply
242 
243             await self.tg.telegram_client.send_read_acknowledge(peer_id, clear_mentions=True)
244             reply = ('',)
245         else: # HELP.brief or HELP.desc (first line)
246             reply = ('   mark_read   Mark messages as read',)
247         if help == HELP.desc:  # rest of HELP.desc
248             reply += \
249             (
250               '   mark_read <peer>',
251               'Mark all messages on <peer> (channel or user) as read, this also will',
252               'reset the number of mentions to you on <peer>.'
253             )
254         return reply
255 
256     def get_peer_id(self, tgt):
257         if tgt in self.irc.users or tgt in self.irc.irc_channels:
258             peer_id = self.tg.get_tid(tgt)
259             reply = None
260         else:
261             peer_id = None
262             reply = ('Unknown user or channel',)
263         return peer_id, reply