/
utils.py
  1 # irgramd: IRC-Telegram gateway
  2 # utils.py: Helper functions
  3 #
  4 # Copyright (c) 2019 Peter Bui <pbui@bx612.space>
  5 # Copyright (c) 2020-2024 E. Bosch <presidev@AT@gmail.com>
  6 #
  7 # Use of this source code is governed by a MIT style license that
  8 # can be found in the LICENSE file included in this project.
  9 
 10 import itertools
 11 import textwrap
 12 import re
 13 import datetime
 14 import zoneinfo
 15 import difflib
 16 import logging
 17 
 18 # Constants
 19 
 20 FILENAME_INVALID_CHARS = re.compile('[/{}<>()"\'\\|&#%?]')
 21 SIMPLE_URL = re.compile('http(|s)://[^ ]+')
 22 
 23 # Utilities
 24 
 25 class command:
 26     async def parse_command(self, line, nick):
 27         command = line.partition(' ')[0].lower()
 28         self.tmp_ircnick = nick
 29         if command in self.commands.keys():
 30             handler, min_args, max_args, maxsplit = self.commands[command]
 31             words = line.split(maxsplit=maxsplit)[1:]
 32             num_words = len(words)
 33             if num_words < min_args or num_words > max_args:
 34                 reply = ('Wrong number of arguments',)
 35             else:
 36                 reply = await handler(*words)
 37         else:
 38             reply = ('Unknown command',)
 39 
 40         return reply
 41 
 42 class HELP:
 43     desc = 1
 44     brief = 2
 45 
 46 class LOGL:
 47     debug = False
 48 
 49 def chunks(iterable, n, fillvalue=None):
 50     ''' Return iterable consisting of a sequence of n-length chunks '''
 51     args = [iter(iterable)] * n
 52     return itertools.zip_longest(*args, fillvalue=fillvalue)
 53 
 54 def set_replace(set, item, new_item):
 55     if item in set:
 56         set.remove(item)
 57         set.add(new_item)
 58 
 59 def get_continued(items, mark, length):
 60     # Add "continued" mark to lines, except last one
 61     return (x + mark if n != length else x for n, x in enumerate(items, start=1))
 62 
 63 def split_lines(message):
 64     MAX = 400
 65     messages_limited = []
 66     wr = textwrap.TextWrapper(width=MAX)
 67 
 68     # Split when Telegram original message has breaks
 69     messages = message.splitlines()
 70     lm = len(messages)
 71     if lm > 1:
 72         # Add "continued line" mark (\) for lines that belong to the same message
 73         # (split previously)
 74         messages = get_continued(messages, ' \\', lm)
 75     for m in messages:
 76         wrapped = wr.wrap(text=m)
 77         lw = len(wrapped)
 78         if lw > 1:
 79             # Add double "continued line" mark (\\) for lines that belong to the same message
 80             # and have been wrapped to not exceed IRC limits
 81             messages_limited += get_continued(wrapped, ' \\\\', lw)
 82         else:
 83             messages_limited += wrapped
 84     del wr
 85     return messages_limited
 86 
 87 def sanitize_filename(fn):
 88     cn = str(sanitize_filename.cn)
 89     new_fn, ns = FILENAME_INVALID_CHARS.subn(cn, fn)
 90     if ns:
 91         sanitize_filename.cn += 1
 92     return new_fn.strip('-').replace(' ','_')
 93 sanitize_filename.cn = 0
 94 
 95 def add_filename(filename, add):
 96     if add:
 97         aux = filename.rsplit('.', 1)
 98         name = aux[0]
 99         try:
100             ext = aux[1]
101         except:
102             ext = ''
103         return '{}-{}.{}'.format(name, add, ext)
104     else:
105         return filename
106 
107 def remove_slash(url):
108     return url[:-1] if url[-1:] == '/' else url
109 
110 def remove_http_s(url):
111     if url[:8] == 'https://':
112         surl = url[8:]
113     elif url[:7] == 'http://':
114         surl = url[7:]
115     else:
116         surl = url
117     return remove_slash(surl)
118 
119 def is_url_equiv(url1, url2):
120     if url1 and url2:
121         return url1 == url2 or remove_slash(remove_http_s(url1)) == remove_slash(remove_http_s(url2))
122     else:
123         return False
124 
125 def extract_url(text):
126     url = SIMPLE_URL.search(text)
127     return url.group() if url else None
128 
129 def get_human_size(size):
130     human_units = ('', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
131 
132     def get_human_size_values(size, unit_pos=0):
133         aux = size / 1024.0
134         if aux > 1: return get_human_size_values(aux, unit_pos + 1)
135         else: return size, human_units[unit_pos]
136 
137     if size <= 1237940039285380274899124224:    # 1024Y
138         num, unit = get_human_size_values(size)
139     else:
140         num = size / 1208925819614629174706176  # 1Y
141         unit = 'Y'
142 
143     fs = '{:.1f}{}' if num < 10 else '{:.0f}{}'
144 
145     return fs.format(num, unit)
146 
147 def get_human_duration(duration):
148     res = ''
149     x, s = divmod(duration, 60)
150     h, m = divmod(x, 60)
151 
152     if h > 0: res = str(h) + 'h'
153     if m > 0: res += str(m) + 'm'
154     if s > 0 or duration < 60: res += str(s) + 's'
155     return res
156 
157 def compact_date(date, tz):
158     delta = current_date() - date
159     date_local = date.astimezone(zoneinfo.ZoneInfo(tz))
160 
161     if delta.days < 1:
162         compact_date = date_local.strftime('%H:%M')
163     elif delta.days < 365:
164         compact_date = date_local.strftime('%d-%b')
165     else:
166         compact_date = date_local.strftime('%Y')
167 
168     return compact_date
169 
170 def current_date():
171     return datetime.datetime.now(datetime.timezone.utc)
172 
173 def get_highlighted(a, b):
174     awl = len(a.split())
175     bwl = len(b.split())
176     delta_size = abs(awl - bwl)
177     highlighted = True
178 
179     if not a:
180         res = '> {}'.format(b)
181     elif delta_size > 5:
182         res = b
183         highlighted = False
184     else:
185         al = a.split(' ')
186         bl = b.split(' ')
187         diff = difflib.ndiff(al, bl)
188         ld = list(diff)
189         res = ''
190         d = ''
191         eq = 0
192 
193         for i in ld:
194             if i == '- ' or i[0] == '?':
195                 continue
196             elif i == '  ' or i == '+ ':
197                 res += ' '
198                 continue
199             # deletion of words
200             elif i[0] == '-':
201                 res += '-{}- '.format(i[2:])
202             # addition of words
203             elif i[0] == '+':
204                 res += '+{}+ '.format(i[2:])
205             else:
206                 res += '{} '.format(i[2:])
207                 eq += 1
208 
209         delta_eq = bwl - eq
210         if delta_eq > 3:
211             res = b
212             highlighted = False
213 
214     return res, highlighted
215 
216 def fix_braces(text):
217     # Remove braces not closed, if the text was truncated
218     if text.endswith(' {...'):
219         subtext = text[:-5]
220         if not '{}' in subtext:
221             return '{}...'.format(subtext)
222     return text
223 
224 def format_timestamp(format, tz, date):
225     date_local = date.astimezone(zoneinfo.ZoneInfo(tz))
226     return date_local.strftime(format)
227 
228 def parse_loglevel(level):
229     levelu = level.upper()
230     if levelu == 'DEBUG':
231         LOGL.debug = True
232     if levelu == 'NONE':
233         l = None
234     elif levelu in ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'):
235         l = getattr(logging, levelu)
236     else:
237         l = False
238     return l
239 
240 def pretty(object):
241     return object.stringify() if LOGL.debug and object else object