1 # irgramd: IRC-Telegram gateway 2 # utils.py: Helper functions 3 # 4 # Copyright (c) 2019 Peter Bui <pbui@bx612.space> 5 # Copyright (c) 2020-2024 E. Bosch <presidev@AT@gmail.com> 6 # 7 # Use of this source code is governed by a MIT style license that 8 # can be found in the LICENSE file included in this project. 9 10 import itertools 11 import textwrap 12 import re 13 import datetime 14 import zoneinfo 15 import difflib 16 import logging 17 18 # Constants 19 20 FILENAME_INVALID_CHARS = re.compile('[/{}<>()"\'\\|&#%?]') 21 SIMPLE_URL = re.compile('http(|s)://[^ ]+') 22 23 # Utilities 24 25 class command: 26 async def parse_command(self, line, nick): 27 command = line.partition(' ')[0].lower() 28 self.tmp_ircnick = nick 29 if command in self.commands.keys(): 30 handler, min_args, max_args, maxsplit = self.commands[command] 31 words = line.split(maxsplit=maxsplit)[1:] 32 num_words = len(words) 33 if num_words < min_args or num_words > max_args: 34 reply = ('Wrong number of arguments',) 35 else: 36 reply = await handler(*words) 37 else: 38 reply = ('Unknown command',) 39 40 return reply 41 42 class HELP: 43 desc = 1 44 brief = 2 45 46 class LOGL: 47 debug = False 48 49 def chunks(iterable, n, fillvalue=None): 50 ''' Return iterable consisting of a sequence of n-length chunks ''' 51 args = [iter(iterable)] * n 52 return itertools.zip_longest(*args, fillvalue=fillvalue) 53 54 def set_replace(set, item, new_item): 55 if item in set: 56 set.remove(item) 57 set.add(new_item) 58 59 def get_continued(items, mark, length): 60 # Add "continued" mark to lines, except last one 61 return (x + mark if n != length else x for n, x in enumerate(items, start=1)) 62 63 def split_lines(message): 64 MAX = 400 65 messages_limited = [] 66 wr = textwrap.TextWrapper(width=MAX) 67 68 # Split when Telegram original message has breaks 69 messages = message.splitlines() 70 lm = len(messages) 71 if lm > 1: 72 # Add "continued line" mark (\) for lines that belong to the same message 73 # (split previously) 74 messages = get_continued(messages, ' \\', lm) 75 for m in messages: 76 wrapped = wr.wrap(text=m) 77 lw = len(wrapped) 78 if lw > 1: 79 # Add double "continued line" mark (\\) for lines that belong to the same message 80 # and have been wrapped to not exceed IRC limits 81 messages_limited += get_continued(wrapped, ' \\\\', lw) 82 else: 83 messages_limited += wrapped 84 del wr 85 return messages_limited 86 87 def sanitize_filename(fn): 88 cn = str(sanitize_filename.cn) 89 new_fn, ns = FILENAME_INVALID_CHARS.subn(cn, fn) 90 if ns: 91 sanitize_filename.cn += 1 92 return new_fn.strip('-').replace(' ','_') 93 sanitize_filename.cn = 0 94 95 def add_filename(filename, add): 96 if add: 97 aux = filename.rsplit('.', 1) 98 name = aux[0] 99 try: 100 ext = aux[1] 101 except: 102 ext = '' 103 return '{}-{}.{}'.format(name, add, ext) 104 else: 105 return filename 106 107 def remove_slash(url): 108 return url[:-1] if url[-1:] == '/' else url 109 110 def remove_http_s(url): 111 if url[:8] == 'https://': 112 surl = url[8:] 113 elif url[:7] == 'http://': 114 surl = url[7:] 115 else: 116 surl = url 117 return remove_slash(surl) 118 119 def is_url_equiv(url1, url2): 120 if url1 and url2: 121 return url1 == url2 or remove_slash(remove_http_s(url1)) == remove_slash(remove_http_s(url2)) 122 else: 123 return False 124 125 def extract_url(text): 126 url = SIMPLE_URL.search(text) 127 return url.group() if url else None 128 129 def get_human_size(size): 130 human_units = ('', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y') 131 132 def get_human_size_values(size, unit_pos=0): 133 aux = size / 1024.0 134 if aux > 1: return get_human_size_values(aux, unit_pos + 1) 135 else: return size, human_units[unit_pos] 136 137 if size <= 1237940039285380274899124224: # 1024Y 138 num, unit = get_human_size_values(size) 139 else: 140 num = size / 1208925819614629174706176 # 1Y 141 unit = 'Y' 142 143 fs = '{:.1f}{}' if num < 10 else '{:.0f}{}' 144 145 return fs.format(num, unit) 146 147 def get_human_duration(duration): 148 res = '' 149 x, s = divmod(duration, 60) 150 h, m = divmod(x, 60) 151 152 if h > 0: res = str(h) + 'h' 153 if m > 0: res += str(m) + 'm' 154 if s > 0 or duration < 60: res += str(s) + 's' 155 return res 156 157 def compact_date(date, tz): 158 delta = current_date() - date 159 date_local = date.astimezone(zoneinfo.ZoneInfo(tz)) 160 161 if delta.days < 1: 162 compact_date = date_local.strftime('%H:%M') 163 elif delta.days < 365: 164 compact_date = date_local.strftime('%d-%b') 165 else: 166 compact_date = date_local.strftime('%Y') 167 168 return compact_date 169 170 def current_date(): 171 return datetime.datetime.now(datetime.timezone.utc) 172 173 def get_highlighted(a, b): 174 awl = len(a.split()) 175 bwl = len(b.split()) 176 delta_size = abs(awl - bwl) 177 highlighted = True 178 179 if not a: 180 res = '> {}'.format(b) 181 elif delta_size > 5: 182 res = b 183 highlighted = False 184 else: 185 al = a.split(' ') 186 bl = b.split(' ') 187 diff = difflib.ndiff(al, bl) 188 ld = list(diff) 189 res = '' 190 d = '' 191 eq = 0 192 193 for i in ld: 194 if i == '- ' or i[0] == '?': 195 continue 196 elif i == ' ' or i == '+ ': 197 res += ' ' 198 continue 199 # deletion of words 200 elif i[0] == '-': 201 res += '-{}- '.format(i[2:]) 202 # addition of words 203 elif i[0] == '+': 204 res += '+{}+ '.format(i[2:]) 205 else: 206 res += '{} '.format(i[2:]) 207 eq += 1 208 209 delta_eq = bwl - eq 210 if delta_eq > 3: 211 res = b 212 highlighted = False 213 214 return res, highlighted 215 216 def fix_braces(text): 217 # Remove braces not closed, if the text was truncated 218 if text.endswith(' {...'): 219 subtext = text[:-5] 220 if not '{}' in subtext: 221 return '{}...'.format(subtext) 222 return text 223 224 def format_timestamp(format, tz, date): 225 date_local = date.astimezone(zoneinfo.ZoneInfo(tz)) 226 return date_local.strftime(format) 227 228 def parse_loglevel(level): 229 levelu = level.upper() 230 if levelu == 'DEBUG': 231 LOGL.debug = True 232 if levelu == 'NONE': 233 l = None 234 elif levelu in ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'): 235 l = getattr(logging, levelu) 236 else: 237 l = False 238 return l 239 240 def pretty(object): 241 return object.stringify() if LOGL.debug and object else object