/
utils.py
  1 # irgramd: IRC-Telegram gateway
  2 # utils.py: Helper functions
  3 #
  4 # Copyright (c) 2019 Peter Bui <pbui@bx612.space>
  5 # Copyright (c) 2020-2024 E. Bosch <presidev@AT@gmail.com>
  6 #
  7 # Use of this source code is governed by a MIT style license that
  8 # can be found in the LICENSE file included in this project.
  9 
 10 import itertools
 11 import textwrap
 12 import re
 13 import datetime
 14 import zoneinfo
 15 import difflib
 16 import logging
 17 
 18 # Constants
 19 
 20 FILENAME_INVALID_CHARS = re.compile('[/{}<>()"\'\\|&#%?]')
 21 SIMPLE_URL = re.compile('http(|s)://[^ ]+')
 22 
 23 from include import MAX_LINE
 24 
 25 # Utilities
 26 
 27 class command:
 28     async def parse_command(self, line, nick):
 29         command = line.partition(' ')[0].lower()
 30         self.tmp_ircnick = nick
 31         if command in self.commands.keys():
 32             handler, min_args, max_args, maxsplit = self.commands[command]
 33             words = line.split(maxsplit=maxsplit)[1:]
 34             num_words = len(words)
 35             if num_words < min_args or num_words > max_args:
 36                 reply = ('Wrong number of arguments',)
 37             else:
 38                 reply = await handler(*words)
 39         else:
 40             reply = ('Unknown command',)
 41 
 42         return reply
 43 
 44 class HELP:
 45     desc = 1
 46     brief = 2
 47 
 48 class LOGL:
 49     debug = False
 50 
 51 def chunks(iterable, n, fillvalue=None):
 52     ''' Return iterable consisting of a sequence of n-length chunks '''
 53     args = [iter(iterable)] * n
 54     return itertools.zip_longest(*args, fillvalue=fillvalue)
 55 
 56 def set_replace(set, item, new_item):
 57     if item in set:
 58         set.remove(item)
 59         set.add(new_item)
 60 
 61 def get_continued(items, mark, length):
 62     # Add "continued" mark to lines, except last one
 63     return (x + mark if n != length else x for n, x in enumerate(items, start=1))
 64 
 65 def split_lines(message):
 66     messages_limited = []
 67     wr = textwrap.TextWrapper(width=MAX_LINE)
 68 
 69     # Split when Telegram original message has breaks
 70     messages = message.splitlines()
 71     lm = len(messages)
 72     if lm > 1:
 73         # Add "continued line" mark (\) for lines that belong to the same message
 74         # (split previously)
 75         messages = get_continued(messages, ' \\', lm)
 76     for m in messages:
 77         wrapped = wr.wrap(text=m)
 78         lw = len(wrapped)
 79         if lw > 1:
 80             # Add double "continued line" mark (\\) for lines that belong to the same message
 81             # and have been wrapped to not exceed IRC limits
 82             messages_limited += get_continued(wrapped, ' \\\\', lw)
 83         else:
 84             messages_limited += wrapped
 85     del wr
 86     return messages_limited
 87 
 88 def sanitize_filename(fn):
 89     cn = str(sanitize_filename.cn)
 90     new_fn, ns = FILENAME_INVALID_CHARS.subn(cn, fn)
 91     if ns:
 92         sanitize_filename.cn += 1
 93     return new_fn.strip('-').replace(' ','_')
 94 sanitize_filename.cn = 0
 95 
 96 def add_filename(filename, add):
 97     if add:
 98         aux = filename.rsplit('.', 1)
 99         name = aux[0]
100         try:
101             ext = aux[1]
102         except:
103             ext = ''
104         return '{}-{}.{}'.format(name, add, ext)
105     else:
106         return filename
107 
108 def remove_slash(url):
109     return url[:-1] if url[-1:] == '/' else url
110 
111 def remove_http_s(url):
112     if url[:8] == 'https://':
113         surl = url[8:]
114     elif url[:7] == 'http://':
115         surl = url[7:]
116     else:
117         surl = url
118     return remove_slash(surl)
119 
120 def is_url_equiv(url1, url2):
121     if url1 and url2:
122         return url1 == url2 or remove_slash(remove_http_s(url1)) == remove_slash(remove_http_s(url2))
123     else:
124         return False
125 
126 def extract_url(text):
127     url = SIMPLE_URL.search(text)
128     return url.group() if url else None
129 
130 def get_human_size(size):
131     human_units = ('', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
132 
133     def get_human_size_values(size, unit_pos=0):
134         aux = size / 1024.0
135         if aux > 1: return get_human_size_values(aux, unit_pos + 1)
136         else: return size, human_units[unit_pos]
137 
138     if size <= 1237940039285380274899124224:    # 1024Y
139         num, unit = get_human_size_values(size)
140     else:
141         num = size / 1208925819614629174706176  # 1Y
142         unit = 'Y'
143 
144     fs = '{:.1f}{}' if num < 10 else '{:.0f}{}'
145 
146     return fs.format(num, unit)
147 
148 def get_human_duration(duration):
149     res = ''
150     x, s = divmod(duration, 60)
151     h, m = divmod(x, 60)
152 
153     if h > 0: res = str(h) + 'h'
154     if m > 0: res += str(m) + 'm'
155     if s > 0 or duration < 60: res += str(s) + 's'
156     return res
157 
158 def compact_date(date, tz):
159     delta = current_date() - date
160     date_local = date.astimezone(zoneinfo.ZoneInfo(tz))
161 
162     if delta.days < 1:
163         compact_date = date_local.strftime('%H:%M')
164     elif delta.days < 365:
165         compact_date = date_local.strftime('%d-%b')
166     else:
167         compact_date = date_local.strftime('%Y')
168 
169     return compact_date
170 
171 def current_date():
172     return datetime.datetime.now(datetime.timezone.utc)
173 
174 def get_highlighted(a, b):
175     awl = len(a.split())
176     bwl = len(b.split())
177     delta_size = abs(awl - bwl)
178     highlighted = True
179 
180     if not a:
181         res = '> {}'.format(b)
182     elif delta_size > 5:
183         res = b
184         highlighted = False
185     else:
186         al = a.split(' ')
187         bl = b.split(' ')
188         diff = difflib.ndiff(al, bl)
189         ld = list(diff)
190         res = ''
191         d = ''
192         eq = 0
193 
194         for i in ld:
195             if i == '- ' or i[0] == '?':
196                 continue
197             elif i == '  ' or i == '+ ':
198                 res += ' '
199                 continue
200             # deletion of words
201             elif i[0] == '-':
202                 res += '-{}- '.format(i[2:])
203             # addition of words
204             elif i[0] == '+':
205                 res += '+{}+ '.format(i[2:])
206             else:
207                 res += '{} '.format(i[2:])
208                 eq += 1
209 
210         delta_eq = bwl - eq
211         if delta_eq > 3:
212             res = b
213             highlighted = False
214 
215     return res, highlighted
216 
217 def fix_braces(text):
218     # Remove braces not closed, if the text was truncated
219     if text.endswith(' {...'):
220         subtext = text[:-5]
221         if not '{}' in subtext:
222             return '{}...'.format(subtext)
223     return text
224 
225 def format_timestamp(format, tz, date):
226     date_local = date.astimezone(zoneinfo.ZoneInfo(tz))
227     return date_local.strftime(format)
228 
229 def parse_loglevel(level):
230     levelu = level.upper()
231     if levelu == 'DEBUG':
232         LOGL.debug = True
233     if levelu == 'NONE':
234         l = None
235     elif levelu in ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'):
236         l = getattr(logging, levelu)
237     else:
238         l = False
239     return l
240 
241 def pretty(object):
242     return object.stringify() if LOGL.debug and object else object