1 # irgramd: IRC-Telegram gateway 2 # utils.py: Helper functions 3 # 4 # Copyright (c) 2019 Peter Bui <pbui@bx612.space> 5 # Copyright (c) 2020-2024 E. Bosch <presidev@AT@gmail.com> 6 # 7 # Use of this source code is governed by a MIT style license that 8 # can be found in the LICENSE file included in this project. 9 10 import itertools 11 import textwrap 12 import re 13 import datetime 14 import zoneinfo 15 import difflib 16 import logging 17 18 # Constants 19 20 FILENAME_INVALID_CHARS = re.compile('[/{}<>()"\'\\|&#%?]') 21 SIMPLE_URL = re.compile('http(|s)://[^ ]+') 22 23 from include import MAX_LINE 24 25 # Utilities 26 27 class command: 28 async def parse_command(self, line, nick): 29 command = line.partition(' ')[0].lower() 30 self.tmp_ircnick = nick 31 if command in self.commands.keys(): 32 handler, min_args, max_args, maxsplit = self.commands[command] 33 words = line.split(maxsplit=maxsplit)[1:] 34 num_words = len(words) 35 if num_words < min_args or num_words > max_args: 36 reply = ('Wrong number of arguments',) 37 else: 38 reply = await handler(*words) 39 else: 40 reply = ('Unknown command',) 41 42 return reply 43 44 class HELP: 45 desc = 1 46 brief = 2 47 48 class LOGL: 49 debug = False 50 51 def chunks(iterable, n, fillvalue=None): 52 ''' Return iterable consisting of a sequence of n-length chunks ''' 53 args = [iter(iterable)] * n 54 return itertools.zip_longest(*args, fillvalue=fillvalue) 55 56 def set_replace(set, item, new_item): 57 if item in set: 58 set.remove(item) 59 set.add(new_item) 60 61 def get_continued(items, mark, length): 62 # Add "continued" mark to lines, except last one 63 return (x + mark if n != length else x for n, x in enumerate(items, start=1)) 64 65 def split_lines(message): 66 messages_limited = [] 67 wr = textwrap.TextWrapper(width=MAX_LINE) 68 69 # Split when Telegram original message has breaks 70 messages = message.splitlines() 71 lm = len(messages) 72 if lm > 1: 73 # Add "continued line" mark (\) for lines that belong to the same message 74 # (split previously) 75 messages = get_continued(messages, ' \\', lm) 76 for m in messages: 77 wrapped = wr.wrap(text=m) 78 lw = len(wrapped) 79 if lw > 1: 80 # Add double "continued line" mark (\\) for lines that belong to the same message 81 # and have been wrapped to not exceed IRC limits 82 messages_limited += get_continued(wrapped, ' \\\\', lw) 83 else: 84 messages_limited += wrapped 85 del wr 86 return messages_limited 87 88 def sanitize_filename(fn): 89 cn = str(sanitize_filename.cn) 90 new_fn, ns = FILENAME_INVALID_CHARS.subn(cn, fn) 91 if ns: 92 sanitize_filename.cn += 1 93 return new_fn.strip('-').replace(' ','_') 94 sanitize_filename.cn = 0 95 96 def add_filename(filename, add): 97 if add: 98 aux = filename.rsplit('.', 1) 99 name = aux[0] 100 try: 101 ext = aux[1] 102 except: 103 ext = '' 104 return '{}-{}.{}'.format(name, add, ext) 105 else: 106 return filename 107 108 def remove_slash(url): 109 return url[:-1] if url[-1:] == '/' else url 110 111 def remove_http_s(url): 112 if url[:8] == 'https://': 113 surl = url[8:] 114 elif url[:7] == 'http://': 115 surl = url[7:] 116 else: 117 surl = url 118 return remove_slash(surl) 119 120 def is_url_equiv(url1, url2): 121 if url1 and url2: 122 return url1 == url2 or remove_slash(remove_http_s(url1)) == remove_slash(remove_http_s(url2)) 123 else: 124 return False 125 126 def extract_url(text): 127 url = SIMPLE_URL.search(text) 128 return url.group() if url else None 129 130 def get_human_size(size): 131 human_units = ('', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y') 132 133 def get_human_size_values(size, unit_pos=0): 134 aux = size / 1024.0 135 if aux > 1: return get_human_size_values(aux, unit_pos + 1) 136 else: return size, human_units[unit_pos] 137 138 if size <= 1237940039285380274899124224: # 1024Y 139 num, unit = get_human_size_values(size) 140 else: 141 num = size / 1208925819614629174706176 # 1Y 142 unit = 'Y' 143 144 fs = '{:.1f}{}' if num < 10 else '{:.0f}{}' 145 146 return fs.format(num, unit) 147 148 def get_human_duration(duration): 149 res = '' 150 x, s = divmod(duration, 60) 151 h, m = divmod(x, 60) 152 153 if h > 0: res = str(h) + 'h' 154 if m > 0: res += str(m) + 'm' 155 if s > 0 or duration < 60: res += str(s) + 's' 156 return res 157 158 def compact_date(date, tz): 159 delta = current_date() - date 160 date_local = date.astimezone(zoneinfo.ZoneInfo(tz)) 161 162 if delta.days < 1: 163 compact_date = date_local.strftime('%H:%M') 164 elif delta.days < 365: 165 compact_date = date_local.strftime('%d-%b') 166 else: 167 compact_date = date_local.strftime('%Y') 168 169 return compact_date 170 171 def current_date(): 172 return datetime.datetime.now(datetime.timezone.utc) 173 174 def get_highlighted(a, b): 175 awl = len(a.split()) 176 bwl = len(b.split()) 177 delta_size = abs(awl - bwl) 178 highlighted = True 179 180 if not a: 181 res = '> {}'.format(b) 182 elif delta_size > 5: 183 res = b 184 highlighted = False 185 else: 186 al = a.split(' ') 187 bl = b.split(' ') 188 diff = difflib.ndiff(al, bl) 189 ld = list(diff) 190 res = '' 191 d = '' 192 eq = 0 193 194 for i in ld: 195 if i == '- ' or i[0] == '?': 196 continue 197 elif i == ' ' or i == '+ ': 198 res += ' ' 199 continue 200 # deletion of words 201 elif i[0] == '-': 202 res += '-{}- '.format(i[2:]) 203 # addition of words 204 elif i[0] == '+': 205 res += '+{}+ '.format(i[2:]) 206 else: 207 res += '{} '.format(i[2:]) 208 eq += 1 209 210 delta_eq = bwl - eq 211 if delta_eq > 3: 212 res = b 213 highlighted = False 214 215 return res, highlighted 216 217 def fix_braces(text): 218 # Remove braces not closed, if the text was truncated 219 if text.endswith(' {...'): 220 subtext = text[:-5] 221 if not '{}' in subtext: 222 return '{}...'.format(subtext) 223 return text 224 225 def format_timestamp(format, tz, date): 226 date_local = date.astimezone(zoneinfo.ZoneInfo(tz)) 227 return date_local.strftime(format) 228 229 def parse_loglevel(level): 230 levelu = level.upper() 231 if levelu == 'DEBUG': 232 LOGL.debug = True 233 if levelu == 'NONE': 234 l = None 235 elif levelu in ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'): 236 l = getattr(logging, levelu) 237 else: 238 l = False 239 return l 240 241 def pretty(object): 242 return object.stringify() if LOGL.debug and object else object