From 37c14f113389f967912f3b9aa3a203d2e6befe11 Mon Sep 17 00:00:00 2001 From: CorpNewt <12772521+corpnewt@users.noreply.github.com> Date: Tue, 23 Jul 2024 13:21:48 -0500 Subject: [PATCH] Wrap up progress checks in multiprocessing.Process() --- Scripts/downloader.py | 256 +++++++++++++++++++++++++----------------- 1 file changed, 154 insertions(+), 102 deletions(-) diff --git a/Scripts/downloader.py b/Scripts/downloader.py index 108331e..05c22cf 100755 --- a/Scripts/downloader.py +++ b/Scripts/downloader.py @@ -1,15 +1,130 @@ -import sys, os, time, ssl, gzip +import sys, os, time, ssl, gzip, multiprocessing from io import BytesIO # Python-aware urllib stuff -if sys.version_info >= (3, 0): +try: from urllib.request import urlopen, Request -else: + import queue as q +except ImportError: # Import urllib2 to catch errors import urllib2 from urllib2 import urlopen, Request + import Queue as q TERMINAL_WIDTH = 120 if os.name=="nt" else 80 +def get_size(size, suffix=None, use_1024=False, round_to=2, strip_zeroes=False): + # size is the number of bytes + # suffix is the target suffix to locate (B, KB, MB, etc) - if found + # use_2014 denotes whether or not we display in MiB vs MB + # round_to is the number of dedimal points to round our result to (0-15) + # strip_zeroes denotes whether we strip out zeroes + + # Failsafe in case our size is unknown + if size == -1: + return "Unknown" + # Get our suffixes based on use_1024 + ext = ["B","KiB","MiB","GiB","TiB","PiB"] if use_1024 else ["B","KB","MB","GB","TB","PB"] + div = 1024 if use_1024 else 1000 + s = float(size) + s_dict = {} # Initialize our dict + # Iterate the ext list, and divide by 1000 or 1024 each time to setup the dict {ext:val} + for e in ext: + s_dict[e] = s + s /= div + # Get our suffix if provided - will be set to None if not found, or if started as None + suffix = next((x for x in ext if x.lower() == suffix.lower()),None) if suffix else suffix + # Get the largest value that's still over 1 + biggest = suffix if suffix else next((x for x in ext[::-1] if s_dict[x] >= 1), "B") + # Determine our rounding approach - first make sure it's an int; default to 2 on error + try:round_to=int(round_to) + except:round_to=2 + round_to = 0 if round_to < 0 else 15 if round_to > 15 else round_to # Ensure it's between 0 and 15 + bval = round(s_dict[biggest], round_to) + # Split our number based on decimal points + a,b = str(bval).split(".") + # Check if we need to strip or pad zeroes + b = b.rstrip("0") if strip_zeroes else b.ljust(round_to,"0") if round_to > 0 else "" + return "{:,}{} {}".format(int(a),"" if not b else "."+b,biggest) + +def _process_hook(queue, total_size, timeout=5, max_packets=1024): + bytes_so_far = 0 + packets = [] + speed = remaining = "" + while True: + try: + packet = queue.get(timeout=timeout) + # Packets should be formatted as a tuple of + # (timestamp, len(bytes_downloaded)) + # If "DONE" is passed, we assume the download + # finished - and bail + if packet == "DONE": + return + # Append our packet to the list and ensure our max + # is 1024 packets + packets.append(packet) + packets = packets[-max_packets:] + # Increment our bytes so far as well + bytes_so_far += packet[1] + except q.Empty: + # Didn't get anything - reset the speed + # and packets + packets = [] + speed = " | 0 B/s" + remaining = " | ?? left" if total_size > 0 else "" + # If we have *any* packets, process + # the info. + if packets: + speed = " | ?? B/s" + if len(packets) > 1: + # Let's calculate the amount downloaded over how long + try: + first,last = packets[0][0],packets[-1][0] + chunks = sum([float(x[1]) for x in packets]) + t = last-first + assert t >= 0 + bytes_speed = 1. / t * chunks + speed = " | {}/s".format(get_size(bytes_speed,round_to=1)) + # Get our remaining time + if total_size > 0: + seconds_left = (total_size-bytes_so_far) / bytes_speed + days = seconds_left // 86400 + hours = (seconds_left - (days*86400)) // 3600 + mins = (seconds_left - (days*86400) - (hours*3600)) // 60 + secs = seconds_left - (days*86400) - (hours*3600) - (mins*60) + if days > 99 or bytes_speed == 0: + remaining = " | ?? left" + else: + remaining = " | {}{:02d}:{:02d}:{:02d} left".format( + "{}:".format(int(days)) if days else "", + int(hours), + int(mins), + int(round(secs)) + ) + except: + pass + if total_size > 0: + percent = float(bytes_so_far) / total_size + percent = round(percent*100, 2) + t_s = get_size(total_size) + try: b_s = get_size(bytes_so_far, t_s.split(" ")[1]) + except: b_s = get_size(bytes_so_far) + perc_str = " {:.2f}%".format(percent) + bar_width = (TERMINAL_WIDTH // 3)-len(perc_str) + progress = "=" * int(bar_width * (percent/100)) + sys.stdout.write("\r\033[K{}/{} | {}{}{}{}{}".format( + b_s, + t_s, + progress, + " " * (bar_width-len(progress)), + perc_str, + speed, + remaining + )) + else: + b_s = get_size(bytes_so_far) + sys.stdout.write("\r\033[K{}{}".format(b_s, speed)) + sys.stdout.flush() + class Downloader: def __init__(self,**kwargs): @@ -46,93 +161,8 @@ class Downloader: return None return response - def get_size(self, size, suffix=None, use_1024=False, round_to=2, strip_zeroes=False): - # size is the number of bytes - # suffix is the target suffix to locate (B, KB, MB, etc) - if found - # use_2014 denotes whether or not we display in MiB vs MB - # round_to is the number of dedimal points to round our result to (0-15) - # strip_zeroes denotes whether we strip out zeroes - - # Failsafe in case our size is unknown - if size == -1: - return "Unknown" - # Get our suffixes based on use_1024 - ext = ["B","KiB","MiB","GiB","TiB","PiB"] if use_1024 else ["B","KB","MB","GB","TB","PB"] - div = 1024 if use_1024 else 1000 - s = float(size) - s_dict = {} # Initialize our dict - # Iterate the ext list, and divide by 1000 or 1024 each time to setup the dict {ext:val} - for e in ext: - s_dict[e] = s - s /= div - # Get our suffix if provided - will be set to None if not found, or if started as None - suffix = next((x for x in ext if x.lower() == suffix.lower()),None) if suffix else suffix - # Get the largest value that's still over 1 - biggest = suffix if suffix else next((x for x in ext[::-1] if s_dict[x] >= 1), "B") - # Determine our rounding approach - first make sure it's an int; default to 2 on error - try:round_to=int(round_to) - except:round_to=2 - round_to = 0 if round_to < 0 else 15 if round_to > 15 else round_to # Ensure it's between 0 and 15 - bval = round(s_dict[biggest], round_to) - # Split our number based on decimal points - a,b = str(bval).split(".") - # Check if we need to strip or pad zeroes - b = b.rstrip("0") if strip_zeroes else b.ljust(round_to,"0") if round_to > 0 else "" - return "{:,}{} {}".format(int(a),"" if not b else "."+b,biggest) - - def _progress_hook(self, bytes_so_far, total_size, packets=None): - speed = remaining = "" - if packets: - speed = " | ?? B/s" - if len(packets) > 1: - # Let's calculate the amount downloaded over how long - try: - first,last = packets[0][0],packets[-1][0] - chunks = sum([float(x[1]) for x in packets]) - t = last-first - assert t >= 0 - bytes_speed = 1. / t * chunks - speed = " | {}/s".format(self.get_size(bytes_speed,round_to=1)) - # Get our remaining time - if total_size > 0: - seconds_left = (total_size-bytes_so_far) / bytes_speed - days = seconds_left // 86400 - hours = (seconds_left - (days*86400)) // 3600 - mins = (seconds_left - (days*86400) - (hours*3600)) // 60 - secs = seconds_left - (days*86400) - (hours*3600) - (mins*60) - if days > 99 or bytes_speed == 0: - remaining = " | ?? left" - else: - remaining = " | {}{:02d}:{:02d}:{:02d} left".format( - "{}:".format(int(days)) if days else "", - int(hours), - int(mins), - int(round(secs)) - ) - except: - pass - if total_size > 0: - percent = float(bytes_so_far) / total_size - percent = round(percent*100, 2) - t_s = self.get_size(total_size) - try: b_s = self.get_size(bytes_so_far, t_s.split(" ")[1]) - except: b_s = self.get_size(bytes_so_far) - perc_str = " {:.2f}%".format(percent) - bar_width = (TERMINAL_WIDTH // 3)-len(perc_str) - progress = "=" * int(bar_width * (percent/100)) - sys.stdout.write("\r\033[K{}/{} | {}{}{}{}{}".format( - b_s, - t_s, - progress, - " " * (bar_width-len(progress)), - perc_str, - speed, - remaining - )) - else: - b_s = self.get_size(bytes_so_far) - sys.stdout.write("\r\033[K{}{}".format(b_s, speed)) - sys.stdout.flush() + def get_size(self, *args, **kwargs): + return get_size(*args,**kwargs) def get_string(self, url, progress = True, headers = None, expand_gzip = True): response = self.get_bytes(url,progress,headers,expand_gzip) @@ -142,25 +172,35 @@ class Downloader: def get_bytes(self, url, progress = True, headers = None, expand_gzip = True): response = self.open_url(url, headers) if response is None: return None - bytes_so_far = 0 try: total_size = int(response.headers['Content-Length']) except: total_size = -1 chunk_so_far = b"" - packets = [] if progress else None + packets = queue = process = None + if progress: + # Make sure our vars are initialized + packets = [] if progress else None + queue = multiprocessing.Queue() + # Create the multiprocess and start it + process = multiprocessing.Process(target=_process_hook,args=(queue,total_size)) + process.daemon = True + process.start() while True: chunk = response.read(self.chunk) - bytes_so_far += len(chunk) if progress: - packets.append((time.time(),len(chunk))) - packets = packets[-1024:] - self._progress_hook(bytes_so_far,total_size,packets=packets) + # Add our items to the queue + queue.put((time.time(),len(chunk))) if not chunk: break chunk_so_far += chunk if expand_gzip and response.headers.get("Content-Encoding","unknown").lower() == "gzip": fileobj = BytesIO(chunk_so_far) gfile = gzip.GzipFile(fileobj=fileobj) return gfile.read() - if progress: print("") # Add a newline so our last progress prints completely + if progress: + # Finalize the queue and wait + queue.put("DONE") + process.join() + # Add a newline so our last progress prints completely + print("") return chunk_so_far def stream_to_file(self, url, file_path, progress = True, headers = None, ensure_size_if_present = True): @@ -169,18 +209,30 @@ class Downloader: bytes_so_far = 0 try: total_size = int(response.headers['Content-Length']) except: total_size = -1 - packets = [] if progress else None + packets = queue = process = None + if progress: + # Make sure our vars are initialized + packets = [] if progress else None + queue = multiprocessing.Queue() + # Create the multiprocess and start it + process = multiprocessing.Process(target=_process_hook,args=(queue,total_size)) + process.daemon = True + process.start() with open(file_path, 'wb') as f: while True: chunk = response.read(self.chunk) bytes_so_far += len(chunk) if progress: - packets.append((time.time(),len(chunk))) - packets = packets[-1024:] - self._progress_hook(bytes_so_far,total_size,packets=packets) + # Add our items to the queue + queue.put((time.time(),len(chunk))) if not chunk: break f.write(chunk) - if progress: print("") # Add a newline so our last progress prints completely + if progress: + # Finalize the queue and wait + queue.put("DONE") + process.join() + # Add a newline so our last progress prints completely + print("") if ensure_size_if_present and total_size != -1: # We're verifying size - make sure we got what we asked for if bytes_so_far != total_size: