  1 # irgramd: IRC-Telegram gateway
  2 # utils.py: Helper functions
  3 #
  4 # Copyright (c) 2019 Peter Bui <pbui@bx612.space>
  5 # Copyright (c) 2020-2024 E. Bosch <presidev@AT@gmail.com>
  6 #
  7 # Use of this source code is governed by a MIT style license that
  8 # can be found in the LICENSE file included in this project.
 10 import itertools
 11 import textwrap
 12 import re
 13 import datetime
 14 import zoneinfo
 15 import difflib
 16 import logging
 18 # Constants
 20 FILENAME_INVALID_CHARS = re.compile('[/{}<>()"\'\\|&#%?]')
 21 SIMPLE_URL = re.compile('http(|s)://[^ ]+')
 23 from include import MAX_LINE
 25 # Utilities
 27 class command:
 28     async def parse_command(self, line, nick):
 29         command = line.partition(' ')[0].lower()
 30         self.tmp_ircnick = nick
 31         if command in self.commands.keys():
 32             handler, min_args, max_args, maxsplit = self.commands[command]
 33             words = line.split(maxsplit=maxsplit)[1:]
 34             num_words = len(words)
 35             if num_words < min_args or num_words > max_args:
 36                 reply = ('Wrong number of arguments',)
 37             else:
 38                 reply = await handler(*words)
 39         else:
 40             reply = ('Unknown command',)
 42         return reply
 44 class HELP:
 45     desc = 1
 46     brief = 2
 48 class LOGL:
 49     debug = False
 51 def chunks(iterable, n, fillvalue=None):
 52     ''' Return iterable consisting of a sequence of n-length chunks '''
 53     args = [iter(iterable)] * n
 54     return itertools.zip_longest(*args, fillvalue=fillvalue)
 56 def set_replace(set, item, new_item):
 57     if item in set:
 58         set.remove(item)
 59         set.add(new_item)
 61 def get_continued(items, mark, length):
 62     # Add "continued" mark to lines, except last one
 63     return (x + mark if n != length else x for n, x in enumerate(items, start=1))
 65 def split_lines(message):
 66     messages_limited = []
 67     wr = textwrap.TextWrapper(width=MAX_LINE)
 69     # Split when Telegram original message has breaks
 70     messages = message.splitlines()
 71     lm = len(messages)
 72     if lm > 1:
 73         # Add "continued line" mark (\) for lines that belong to the same message
 74         # (split previously)
 75         messages = get_continued(messages, ' \\', lm)
 76     for m in messages:
 77         wrapped = wr.wrap(text=m)
 78         lw = len(wrapped)
 79         if lw > 1:
 80             # Add double "continued line" mark (\\) for lines that belong to the same message
 81             # and have been wrapped to not exceed IRC limits
 82             messages_limited += get_continued(wrapped, ' \\\\', lw)
 83         else:
 84             messages_limited += wrapped
 85     del wr
 86     return messages_limited
 88 def sanitize_filename(fn):
 89     cn = str(sanitize_filename.cn)
 90     new_fn, ns = FILENAME_INVALID_CHARS.subn(cn, fn)
 91     if ns:
 92         sanitize_filename.cn += 1
 93     return new_fn.strip('-').replace(' ','_')
 94 sanitize_filename.cn = 0
 96 def add_filename(filename, add):
 97     if add:
 98         aux = filename.rsplit('.', 1)
 99         name = aux[0]
100         try:
101             ext = aux[1]
102         except:
103             ext = ''
104         return '{}-{}.{}'.format(name, add, ext)
105     else:
106         return filename
108 def remove_slash(url):
109     return url[:-1] if url[-1:] == '/' else url
111 def remove_http_s(url):
112     if url[:8] == 'https://':
113         surl = url[8:]
114     elif url[:7] == 'http://':
115         surl = url[7:]
116     else:
117         surl = url
118     return remove_slash(surl)
120 def is_url_equiv(url1, url2):
121     if url1 and url2:
122         return url1 == url2 or remove_slash(remove_http_s(url1)) == remove_slash(remove_http_s(url2))
123     else:
124         return False
126 def extract_url(text):
127     url = SIMPLE_URL.search(text)
128     return url.group() if url else None
130 def get_human_size(size):
131     human_units = ('', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
133     def get_human_size_values(size, unit_pos=0):
134         aux = size / 1024.0
135         if aux > 1: return get_human_size_values(aux, unit_pos + 1)
136         else: return size, human_units[unit_pos]
138     if size <= 1237940039285380274899124224:    # 1024Y
139         num, unit = get_human_size_values(size)
140     else:
141         num = size / 1208925819614629174706176  # 1Y
142         unit = 'Y'
144     fs = '{:.1f}{}' if num < 10 else '{:.0f}{}'
146     return fs.format(num, unit)
148 def get_human_duration(duration):
149     res = ''
150     x, s = divmod(duration, 60)
151     h, m = divmod(x, 60)
153     if h > 0: res = str(h) + 'h'
154     if m > 0: res += str(m) + 'm'
155     if s > 0 or duration < 60: res += str(s) + 's'
156     return res
158 def compact_date(date, tz):
159     delta = current_date() - date
160     date_local = date.astimezone(zoneinfo.ZoneInfo(tz))
162     if delta.days < 1:
163         compact_date = date_local.strftime('%H:%M')
164     elif delta.days < 365:
165         compact_date = date_local.strftime('%d-%b')
166     else:
167         compact_date = date_local.strftime('%Y')
169     return compact_date
171 def current_date():
172     return datetime.datetime.now(datetime.timezone.utc)
174 def get_highlighted(a, b):
175     awl = len(a.split())
176     bwl = len(b.split())
177     delta_size = abs(awl - bwl)
178     highlighted = True
180     if not a:
181         res = '> {}'.format(b)
182     elif delta_size > 5:
183         res = b
184         highlighted = False
185     else:
186         al = a.split(' ')
187         bl = b.split(' ')
188         diff = difflib.ndiff(al, bl)
189         ld = list(diff)
190         res = ''
191         d = ''
192         eq = 0
194         for i in ld:
195             if i == '- ' or i[0] == '?':
196                 continue
197             elif i == '  ' or i == '+ ':
198                 res += ' '
199                 continue
200             # deletion of words
201             elif i[0] == '-':
202                 res += '-{}- '.format(i[2:])
203             # addition of words
204             elif i[0] == '+':
205                 res += '+{}+ '.format(i[2:])
206             else:
207                 res += '{} '.format(i[2:])
208                 eq += 1
210         delta_eq = bwl - eq
211         if delta_eq > 3:
212             res = b
213             highlighted = False
215     return res, highlighted
217 def fix_braces(text):
218     # Remove braces not closed, if the text was truncated
219     if text.endswith(' {...'):
220         subtext = text[:-5]
221         if not '{}' in subtext:
222             return '{}...'.format(subtext)
223     return text
225 def format_timestamp(format, tz, date):
226     date_local = date.astimezone(zoneinfo.ZoneInfo(tz))
227     return date_local.strftime(format)
229 def parse_loglevel(level):
230     levelu = level.upper()
231     if levelu == 'DEBUG':
232         LOGL.debug = True
233     if levelu == 'NONE':
234         l = None
235     elif levelu in ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'):
236         l = getattr(logging, levelu)
237     else:
238         l = False
239     return l
241 def pretty(object):
242     return object.stringify() if LOGL.debug and object else object