/
/utils.py
  1 # irgramd: IRC-Telegram gateway
  2 # utils.py: Helper functions
  3 #
  4 # Copyright (c) 2019 Peter Bui <pbui@bx612.space>
  5 # Copyright (c) 2020-2024 E. Bosch <presidev@AT@gmail.com>
  6 #
  7 # Use of this source code is governed by a MIT style license that
  8 # can be found in the LICENSE file included in this project.
  9 
 10 import itertools
 11 import textwrap
 12 import re
 13 import datetime
 14 import zoneinfo
 15 import difflib
 16 import logging
 17 
 18 # Constants
 19 
 20 FILENAME_INVALID_CHARS = re.compile('[/{}<>()"\'\\|&#%?]')
 21 SIMPLE_URL = re.compile('http(|s)://[^ ]+')
 22 
 23 # Utilities
 24 
 25 class command:
 26     async def parse_command(self, line, nick):
 27         command = line.partition(' ')[0].lower()
 28         self.tmp_ircnick = nick
 29         if command in self.commands.keys():
 30             handler, min_args, max_args, maxsplit = self.commands[command]
 31             words = line.split(maxsplit=maxsplit)[1:]
 32             num_words = len(words)
 33             if num_words < min_args or num_words > max_args:
 34                 reply = ('Wrong number of arguments',)
 35             else:
 36                 reply = await handler(*words)
 37         else:
 38             reply = ('Unknown command',)
 39 
 40         return reply
 41 
 42 class HELP:
 43     desc = 1
 44     brief = 2
 45 
 46 def chunks(iterable, n, fillvalue=None):
 47     ''' Return iterable consisting of a sequence of n-length chunks '''
 48     args = [iter(iterable)] * n
 49     return itertools.zip_longest(*args, fillvalue=fillvalue)
 50 
 51 def set_replace(set, item, new_item):
 52     if item in set:
 53         set.remove(item)
 54         set.add(new_item)
 55 
 56 def get_continued(items, mark, length):
 57     # Add "continued" mark to lines, except last one
 58     return (x + mark if n != length else x for n, x in enumerate(items, start=1))
 59 
 60 def split_lines(message):
 61     MAX = 400
 62     messages_limited = []
 63     wr = textwrap.TextWrapper(width=MAX)
 64 
 65     # Split when Telegram original message has breaks
 66     messages = message.splitlines()
 67     lm = len(messages)
 68     if lm > 1:
 69         # Add "continued line" mark (\) for lines that belong to the same message
 70         # (split previously)
 71         messages = get_continued(messages, ' \\', lm)
 72     for m in messages:
 73         wrapped = wr.wrap(text=m)
 74         lw = len(wrapped)
 75         if lw > 1:
 76             # Add double "continued line" mark (\\) for lines that belong to the same message
 77             # and have been wrapped to not exceed IRC limits
 78             messages_limited += get_continued(wrapped, ' \\\\', lw)
 79         else:
 80             messages_limited += wrapped
 81     del wr
 82     return messages_limited
 83 
 84 def sanitize_filename(fn):
 85     cn = str(sanitize_filename.cn)
 86     new_fn, ns = FILENAME_INVALID_CHARS.subn(cn, fn)
 87     if ns:
 88         sanitize_filename.cn += 1
 89     return new_fn.strip('-').replace(' ','_')
 90 sanitize_filename.cn = 0
 91 
 92 def add_filename(filename, add):
 93     if add:
 94         aux = filename.rsplit('.', 1)
 95         name = aux[0]
 96         try:
 97             ext = aux[1]
 98         except:
 99             ext = ''
100         return '{}-{}.{}'.format(name, add, ext)
101     else:
102         return filename
103 
104 def remove_slash(url):
105     return url[:-1] if url[-1:] == '/' else url
106 
107 def remove_http_s(url):
108     if url[:8] == 'https://':
109         surl = url[8:]
110     elif url[:7] == 'http://':
111         surl = url[7:]
112     else:
113         surl = url
114     return remove_slash(surl)
115 
116 def is_url_equiv(url1, url2):
117     if url1 and url2:
118         return url1 == url2 or remove_slash(remove_http_s(url1)) == remove_slash(remove_http_s(url2))
119     else:
120         return False
121 
122 def extract_url(text):
123     url = SIMPLE_URL.search(text)
124     return url.group() if url else None
125 
126 def get_human_size(size):
127     human_units = ('', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
128 
129     def get_human_size_values(size, unit_pos=0):
130         aux = size / 1024.0
131         if aux > 1: return get_human_size_values(aux, unit_pos + 1)
132         else: return size, human_units[unit_pos]
133 
134     if size <= 1237940039285380274899124224:    # 1024Y
135         num, unit = get_human_size_values(size)
136     else:
137         num = size / 1208925819614629174706176  # 1Y
138         unit = 'Y'
139 
140     fs = '{:.1f}{}' if num < 10 else '{:.0f}{}'
141 
142     return fs.format(num, unit)
143 
144 def get_human_duration(duration):
145     res = ''
146     x, s = divmod(duration, 60)
147     h, m = divmod(x, 60)
148 
149     if h > 0: res = str(h) + 'h'
150     if m > 0: res += str(m) + 'm'
151     if s > 0 or duration < 60: res += str(s) + 's'
152     return res
153 
154 def compact_date(date, tz):
155     delta = datetime.datetime.now(datetime.timezone.utc) - date
156     date_local = date.astimezone(zoneinfo.ZoneInfo(tz))
157 
158     if delta.days < 1:
159         compact_date = date_local.strftime('%H:%M')
160     elif delta.days < 365:
161         compact_date = date_local.strftime('%d-%b')
162     else:
163         compact_date = date_local.strftime('%Y')
164 
165     return compact_date
166 
167 def get_highlighted(a, b):
168     awl = len(a.split())
169     bwl = len(b.split())
170     delta_size = abs(awl - bwl)
171     highlighted = True
172 
173     if not a:
174         res = '> {}'.format(b)
175     elif delta_size > 5:
176         res = b
177         highlighted = False
178     else:
179         al = a.split(' ')
180         bl = b.split(' ')
181         diff = difflib.ndiff(al, bl)
182         ld = list(diff)
183         res = ''
184         d = ''
185         eq = 0
186 
187         for i in ld:
188             if i == '- ' or i[0] == '?':
189                 continue
190             elif i == '  ' or i == '+ ':
191                 res += ' '
192                 continue
193             # deletion of words
194             elif i[0] == '-':
195                 res += '-{}- '.format(i[2:])
196             # addition of words
197             elif i[0] == '+':
198                 res += '+{}+ '.format(i[2:])
199             else:
200                 res += '{} '.format(i[2:])
201                 eq += 1
202 
203         delta_eq = bwl - eq
204         if delta_eq > 3:
205             res = b
206             highlighted = False
207 
208     return res, highlighted
209 
210 def fix_braces(text):
211     # Remove braces not closed, if the text was truncated
212     if text.endswith(' {...'):
213         subtext = text[:-5]
214         if not '{}' in subtext:
215             return '{}...'.format(subtext)
216     return text
217 
218 def format_timestamp(format, tz, date):
219     date_local = date.astimezone(zoneinfo.ZoneInfo(tz))
220     return date_local.strftime(format)
221 
222 def parse_loglevel(level):
223     levelu = level.upper()
224     if levelu == 'NONE':
225         l = None
226     elif levelu in ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'):
227         l = getattr(logging, levelu)
228     else:
229         l = False
230     return l