1 # irgramd: IRC-Telegram gateway 2 # utils.py: Helper functions 3 # 4 # Copyright (c) 2019 Peter Bui <pbui@bx612.space> 5 # Copyright (c) 2020-2024 E. Bosch <presidev@AT@gmail.com> 6 # 7 # Use of this source code is governed by a MIT style license that 8 # can be found in the LICENSE file included in this project. 9 10 import itertools 11 import textwrap 12 import re 13 import datetime 14 import zoneinfo 15 import difflib 16 import logging 17 18 # Constants 19 20 FILENAME_INVALID_CHARS = re.compile('[/{}<>()"\'\\|&#%?]') 21 SIMPLE_URL = re.compile('http(|s)://[^ ]+') 22 23 # Utilities 24 25 class command: 26 async def parse_command(self, line, nick): 27 command = line.partition(' ')[0].lower() 28 self.tmp_ircnick = nick 29 if command in self.commands.keys(): 30 handler, min_args, max_args, maxsplit = self.commands[command] 31 words = line.split(maxsplit=maxsplit)[1:] 32 num_words = len(words) 33 if num_words < min_args or num_words > max_args: 34 reply = ('Wrong number of arguments',) 35 else: 36 reply = await handler(*words) 37 else: 38 reply = ('Unknown command',) 39 40 return reply 41 42 class HELP: 43 desc = 1 44 brief = 2 45 46 def chunks(iterable, n, fillvalue=None): 47 ''' Return iterable consisting of a sequence of n-length chunks ''' 48 args = [iter(iterable)] * n 49 return itertools.zip_longest(*args, fillvalue=fillvalue) 50 51 def set_replace(set, item, new_item): 52 if item in set: 53 set.remove(item) 54 set.add(new_item) 55 56 def get_continued(items, mark, length): 57 # Add "continued" mark to lines, except last one 58 return (x + mark if n != length else x for n, x in enumerate(items, start=1)) 59 60 def split_lines(message): 61 MAX = 400 62 messages_limited = [] 63 wr = textwrap.TextWrapper(width=MAX) 64 65 # Split when Telegram original message has breaks 66 messages = message.splitlines() 67 lm = len(messages) 68 if lm > 1: 69 # Add "continued line" mark (\) for lines that belong to the same message 70 # (split previously) 71 messages = get_continued(messages, ' \\', lm) 72 for m in messages: 73 wrapped = wr.wrap(text=m) 74 lw = len(wrapped) 75 if lw > 1: 76 # Add double "continued line" mark (\\) for lines that belong to the same message 77 # and have been wrapped to not exceed IRC limits 78 messages_limited += get_continued(wrapped, ' \\\\', lw) 79 else: 80 messages_limited += wrapped 81 del wr 82 return messages_limited 83 84 def sanitize_filename(fn): 85 cn = str(sanitize_filename.cn) 86 new_fn, ns = FILENAME_INVALID_CHARS.subn(cn, fn) 87 if ns: 88 sanitize_filename.cn += 1 89 return new_fn.strip('-').replace(' ','_') 90 sanitize_filename.cn = 0 91 92 def add_filename(filename, add): 93 if add: 94 aux = filename.rsplit('.', 1) 95 name = aux[0] 96 try: 97 ext = aux[1] 98 except: 99 ext = '' 100 return '{}-{}.{}'.format(name, add, ext) 101 else: 102 return filename 103 104 def remove_slash(url): 105 return url[:-1] if url[-1:] == '/' else url 106 107 def remove_http_s(url): 108 if url[:8] == 'https://': 109 surl = url[8:] 110 elif url[:7] == 'http://': 111 surl = url[7:] 112 else: 113 surl = url 114 return remove_slash(surl) 115 116 def is_url_equiv(url1, url2): 117 if url1 and url2: 118 return url1 == url2 or remove_slash(remove_http_s(url1)) == remove_slash(remove_http_s(url2)) 119 else: 120 return False 121 122 def extract_url(text): 123 url = SIMPLE_URL.search(text) 124 return url.group() if url else None 125 126 def get_human_size(size): 127 human_units = ('', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y') 128 129 def get_human_size_values(size, unit_pos=0): 130 aux = size / 1024.0 131 if aux > 1: return get_human_size_values(aux, unit_pos + 1) 132 else: return size, human_units[unit_pos] 133 134 if size <= 1237940039285380274899124224: # 1024Y 135 num, unit = get_human_size_values(size) 136 else: 137 num = size / 1208925819614629174706176 # 1Y 138 unit = 'Y' 139 140 fs = '{:.1f}{}' if num < 10 else '{:.0f}{}' 141 142 return fs.format(num, unit) 143 144 def get_human_duration(duration): 145 res = '' 146 x, s = divmod(duration, 60) 147 h, m = divmod(x, 60) 148 149 if h > 0: res = str(h) + 'h' 150 if m > 0: res += str(m) + 'm' 151 if s > 0 or duration < 60: res += str(s) + 's' 152 return res 153 154 def compact_date(date, tz): 155 delta = datetime.datetime.now(datetime.timezone.utc) - date 156 date_local = date.astimezone(zoneinfo.ZoneInfo(tz)) 157 158 if delta.days < 1: 159 compact_date = date_local.strftime('%H:%M') 160 elif delta.days < 365: 161 compact_date = date_local.strftime('%d-%b') 162 else: 163 compact_date = date_local.strftime('%Y') 164 165 return compact_date 166 167 def get_highlighted(a, b): 168 awl = len(a.split()) 169 bwl = len(b.split()) 170 delta_size = abs(awl - bwl) 171 highlighted = True 172 173 if not a: 174 res = '> {}'.format(b) 175 elif delta_size > 5: 176 res = b 177 highlighted = False 178 else: 179 al = a.split(' ') 180 bl = b.split(' ') 181 diff = difflib.ndiff(al, bl) 182 ld = list(diff) 183 res = '' 184 d = '' 185 eq = 0 186 187 for i in ld: 188 if i == '- ' or i[0] == '?': 189 continue 190 elif i == ' ' or i == '+ ': 191 res += ' ' 192 continue 193 # deletion of words 194 elif i[0] == '-': 195 res += '-{}- '.format(i[2:]) 196 # addition of words 197 elif i[0] == '+': 198 res += '+{}+ '.format(i[2:]) 199 else: 200 res += '{} '.format(i[2:]) 201 eq += 1 202 203 delta_eq = bwl - eq 204 if delta_eq > 3: 205 res = b 206 highlighted = False 207 208 return res, highlighted 209 210 def fix_braces(text): 211 # Remove braces not closed, if the text was truncated 212 if text.endswith(' {...'): 213 subtext = text[:-5] 214 if not '{}' in subtext: 215 return '{}...'.format(subtext) 216 return text 217 218 def format_timestamp(format, tz, date): 219 date_local = date.astimezone(zoneinfo.ZoneInfo(tz)) 220 return date_local.strftime(format) 221 222 def parse_loglevel(level): 223 levelu = level.upper() 224 if levelu == 'NONE': 225 l = None 226 elif levelu in ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'): 227 l = getattr(logging, levelu) 228 else: 229 l = False 230 return l