Make observer optional for all tasks
This commit is contained in:
parent
eeab3cb54c
commit
929e8d9a60
@ -46,20 +46,22 @@ class App:
|
||||
definition['containers'] = [container.name for container in self.containers]
|
||||
return definition
|
||||
|
||||
def download(self, observer):
|
||||
def download(self, observer=None):
|
||||
# Download the archive with application scripts and install data
|
||||
os.makedirs(TMP_APPS_DIR, 0o700, True)
|
||||
archive_url = urllib.parse.urljoin(ONLINE_APPS_URL, f'{self.name}.tar.xz')
|
||||
archive_path = os.path.join(TMP_APPS_DIR, f'{self.name}.tar.xz')
|
||||
definition = repo_online.get_app(self.name)
|
||||
observer.units_total = definition['dlsize']
|
||||
if observer:
|
||||
observer.units_total = definition['dlsize']
|
||||
repo_online.download_archive(archive_url, archive_path, definition['hash'], observer)
|
||||
|
||||
def unpack_downloaded(self, observer):
|
||||
def unpack_downloaded(self, observer=None):
|
||||
# Unpack downloaded archive with application scripts and install data
|
||||
archive_path = os.path.join(TMP_APPS_DIR, f'{self.name}.tar.xz')
|
||||
definition = repo_online.get_app(self.name)
|
||||
observer.units_total = definition['size']
|
||||
if observer:
|
||||
observer.units_total = definition['size']
|
||||
repo_online.unpack_archive(archive_path, APPS_DIR, definition['hash'], observer)
|
||||
|
||||
def run_script(self, action):
|
||||
@ -87,7 +89,7 @@ class App:
|
||||
container.create()
|
||||
self.containers.append(container)
|
||||
|
||||
def install(self):
|
||||
def install(self, observer=None):
|
||||
# Install the application
|
||||
definition = repo_online.get_app(self.name)
|
||||
self.version = definition['version']
|
||||
@ -100,7 +102,7 @@ class App:
|
||||
self.run_script('install')
|
||||
repo_local.register_app(self.name, self.get_definition())
|
||||
|
||||
def update(self):
|
||||
def update(self, observer=None):
|
||||
# Remove containers
|
||||
for container in self.containers:
|
||||
container.destroy()
|
||||
@ -115,7 +117,7 @@ class App:
|
||||
self.run_script('update')
|
||||
repo_local.register_app(self.name, self.get_definition())
|
||||
|
||||
def uninstall(self):
|
||||
def uninstall(self, observer=None):
|
||||
# Make sure the containers are stopped
|
||||
for container in self.containers:
|
||||
container.stop()
|
||||
|
@ -8,31 +8,27 @@ from math import floor
|
||||
SIZE_PREFIXES = ('', 'k', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
|
||||
|
||||
class ActionItem:
|
||||
def __init__(self, text, action, show_progress=True):
|
||||
def __init__(self, text, action):
|
||||
self.text = text
|
||||
self.action = action
|
||||
self.show_progress = show_progress
|
||||
self.units_total = 1
|
||||
self.units_total = 0
|
||||
self.units_done = 0
|
||||
|
||||
def run(self):
|
||||
if self.show_progress:
|
||||
with ThreadPoolExecutor() as executor:
|
||||
future = executor.submit(self.action, self)
|
||||
while not future.done():
|
||||
time.sleep(0.2)
|
||||
self.print_progress()
|
||||
# Get the result of the future and let it raise exception, if there was any
|
||||
data = future.result()
|
||||
self.print_progress('\n')
|
||||
else:
|
||||
self.print_progress()
|
||||
self.action()
|
||||
self.units_done = 1
|
||||
self.print_progress('\n')
|
||||
with ThreadPoolExecutor() as executor:
|
||||
future = executor.submit(self.action, self)
|
||||
while not future.done():
|
||||
time.sleep(0.2)
|
||||
self.print_progress()
|
||||
# Get the result of the future and let it raise exception, if there was any
|
||||
data = future.result()
|
||||
self.print_progress('\n')
|
||||
|
||||
def print_progress(self, end='\r'):
|
||||
print(f'\x1b[K{self.text} ({self.units_done}/{self.units_total}) [{floor(self.units_done/self.units_total*100)} %]', end=end)
|
||||
text = self.text
|
||||
if self.units_total:
|
||||
text = f'{text} ({self.units_done}/{self.units_total}) [{floor(self.units_done/self.units_total*100)} %]'
|
||||
print(f'\x1b[K{text}', end=end)
|
||||
|
||||
class ActionQueue:
|
||||
def __init__(self):
|
||||
@ -43,20 +39,20 @@ class ActionQueue:
|
||||
self.queue.append(ActionItem(f'Unpacking image {image.name}', image.unpack_downloaded))
|
||||
|
||||
def delete_image(self, image):
|
||||
self.queue.append(ActionItem(f'Deleting image {image.name}', image.delete, False))
|
||||
self.queue.append(ActionItem(f'Deleting image {image.name}', image.delete))
|
||||
|
||||
def install_app(self, app):
|
||||
self.queue.append(ActionItem(f'Downloading application {app.name}', app.download))
|
||||
self.queue.append(ActionItem(f'Unpacking application {app.name}', app.unpack_downloaded))
|
||||
self.queue.append(ActionItem(f'Installing application {app.name}', app.install, False))
|
||||
self.queue.append(ActionItem(f'Installing application {app.name}', app.install))
|
||||
|
||||
def update_app(self, app):
|
||||
self.queue.append(ActionItem(f'Downloading application {app.name}', app.download))
|
||||
self.queue.append(ActionItem(f'Unpacking application {app.name}', app.unpack_downloaded))
|
||||
self.queue.append(ActionItem(f'Updating application {app.name}', app.update, False))
|
||||
self.queue.append(ActionItem(f'Updating application {app.name}', app.update))
|
||||
|
||||
def uninstall_app(self, app):
|
||||
self.queue.append(ActionItem(f'Uninstalling application {app.name}', app.uninstall, False))
|
||||
self.queue.append(ActionItem(f'Uninstalling application {app.name}', app.uninstall))
|
||||
|
||||
def process(self):
|
||||
index = 0
|
||||
|
@ -50,7 +50,7 @@ class Image:
|
||||
imagebuilder.build(self, filename)
|
||||
repo_local.register_image(self.name, self.get_definition())
|
||||
|
||||
def delete(self):
|
||||
def delete(self, observer=None):
|
||||
# Remove the layer from local repository and filesystem
|
||||
repo_local.unregister_image(self.name)
|
||||
try:
|
||||
@ -58,20 +58,22 @@ class Image:
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
def download(self, observer):
|
||||
def download(self, observer=None):
|
||||
# Download the archive with layer data
|
||||
os.makedirs(TMP_LAYERS_DIR, 0o700, True)
|
||||
archive_url = urllib.parse.urljoin(ONLINE_LAYERS_URL, f'{self.name}.tar.xz')
|
||||
archive_path = os.path.join(TMP_LAYERS_DIR, f'{self.name}.tar.xz')
|
||||
definition = repo_online.get_image(self.name)
|
||||
observer.units_total = definition['dlsize']
|
||||
if observer:
|
||||
observer.units_total = definition['dlsize']
|
||||
repo_online.download_archive(archive_url, archive_path, definition['hash'], observer)
|
||||
|
||||
def unpack_downloaded(self, observer):
|
||||
def unpack_downloaded(self, observer=None):
|
||||
# Unpack downloaded archive with layer data
|
||||
archive_path = os.path.join(TMP_LAYERS_DIR, f'{self.name}.tar.xz')
|
||||
definition = repo_online.get_image(self.name)
|
||||
observer.units_total = definition['size']
|
||||
if observer:
|
||||
observer.units_total = definition['size']
|
||||
repo_online.unpack_archive(archive_path, LAYERS_DIR, definition['hash'], observer)
|
||||
self.set_definition(definition)
|
||||
repo_local.register_image(self.name, definition)
|
||||
|
@ -30,7 +30,7 @@ def verify_fileobj(fileobj, expected_hash):
|
||||
hasher.update(data)
|
||||
PUBLIC_KEY.verify(bytes.fromhex(expected_hash), hasher.finalize(), ec.ECDSA(utils.Prehashed(hashes.SHA512())))
|
||||
|
||||
def download_archive(archive_url, archive_path, expected_hash, observer):
|
||||
def download_archive(archive_url, archive_path, expected_hash, observer=None):
|
||||
# Check if an archive needs to be downloaded via http(s)
|
||||
do_download = True
|
||||
# If the file already exists in the temporary directory, verify the signature
|
||||
@ -39,7 +39,8 @@ def download_archive(archive_url, archive_path, expected_hash, observer):
|
||||
with open(archive_path, 'rb') as f:
|
||||
verify_fileobj(f, expected_hash)
|
||||
# If the signature matches, skip download
|
||||
observer.units_done = os.path.getsize(archive_path)
|
||||
if observer:
|
||||
observer.units_done = os.path.getsize(archive_path)
|
||||
do_download = False
|
||||
except InvalidSignature:
|
||||
# If the signature is invalid, redownload the file
|
||||
@ -49,9 +50,14 @@ def download_archive(archive_url, archive_path, expected_hash, observer):
|
||||
with open(archive_path, 'wb') as f, requests.Session() as session:
|
||||
resource = session.get(archive_url, stream=True)
|
||||
resource.raise_for_status()
|
||||
for chunk in resource.iter_content(chunk_size=64*1024):
|
||||
if chunk:
|
||||
observer.units_done += f.write(chunk)
|
||||
if observer:
|
||||
for chunk in resource.iter_content(chunk_size=64*1024):
|
||||
if chunk:
|
||||
observer.units_done += f.write(chunk)
|
||||
else:
|
||||
for chunk in resource.iter_content(chunk_size=64*1024):
|
||||
if chunk:
|
||||
f.write(chunk)
|
||||
|
||||
def unpack_archive(archive_path, destination, expected_hash, observer):
|
||||
with open(archive_path, 'rb') as f:
|
||||
@ -67,9 +73,12 @@ def unpack_archive(archive_path, destination, expected_hash, observer):
|
||||
# Extract the tar members while counting their size
|
||||
# If this is done as non-root, extractall() from https://github.com/python/cpython/blob/master/Lib/tarfile.py needs to be reimplemented instead
|
||||
tar = tarfile.open(fileobj=f)
|
||||
for tarinfo in tar:
|
||||
tar.extract(tarinfo, destination, numeric_owner=True)
|
||||
observer.units_done += tarinfo.size
|
||||
if observer:
|
||||
for tarinfo in tar:
|
||||
tar.extract(tarinfo, destination, numeric_owner=True)
|
||||
observer.units_done += tarinfo.size
|
||||
else:
|
||||
tar.extractall(destination, numeric_owner=True)
|
||||
# Remove the archive
|
||||
os.unlink(archive_path)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user