spoc/usr/lib/python3.8/spoc/cli.py
2020-04-04 17:59:34 +02:00

82 lines
2.9 KiB
Python

# -*- coding: utf-8 -*-
import os
import time
from concurrent.futures import ThreadPoolExecutor
from math import floor
SIZE_PREFIXES = ('', 'k', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
class ActionItem:
def __init__(self, text, action):
self.text = text
self.action = action
self.units_total = 0
self.units_done = 0
def run(self):
with ThreadPoolExecutor() as executor:
future = executor.submit(self.action, self)
while not future.done():
time.sleep(0.2)
self.print_progress()
# Get the result of the future and let it raise exception, if there was any
data = future.result()
self.print_progress('\n')
def print_progress(self, end='\r'):
text = self.text
if self.units_total:
text = f'{text} ({self.units_done}/{self.units_total}) [{floor(self.units_done/self.units_total*100)} %]'
print(f'\x1b[K{text}', end=end)
class ActionQueue:
def __init__(self):
self.queue = []
def download_image(self, image):
self.queue.append(ActionItem(f'Downloading image {image.name}', image.download))
self.queue.append(ActionItem(f'Unpacking image {image.name}', image.unpack_downloaded))
def delete_image(self, image):
self.queue.append(ActionItem(f'Deleting image {image.name}', image.delete))
def install_app(self, app):
self.queue.append(ActionItem(f'Downloading application {app.name}', app.download))
self.queue.append(ActionItem(f'Unpacking application {app.name}', app.unpack_downloaded))
self.queue.append(ActionItem(f'Installing application {app.name}', app.install))
def update_app(self, app):
self.queue.append(ActionItem(f'Downloading application {app.name}', app.download))
self.queue.append(ActionItem(f'Unpacking application {app.name}', app.unpack_downloaded))
self.queue.append(ActionItem(f'Updating application {app.name}', app.update))
def uninstall_app(self, app):
self.queue.append(ActionItem(f'Uninstalling application {app.name}', app.uninstall))
def start_app(self, app):
self.queue.append(ActionItem(f'Starting application {app.name}', app.start))
def stop_app(self, app):
self.queue.append(ActionItem(f'Stopping application {app.name}', app.stop))
def process(self):
index = 0
queue_length = len(self.queue)
for item in self.queue:
index += 1
item.text = f'[{index}/{queue_length}] {item.text}'
item.run()
def readable_size(bytes):
i = 0
while bytes > 1024:
i += 1
bytes /= 1024
return f'{bytes:.2f} {SIZE_PREFIXES[i]}B'
def print_lock(pid):
with open(os.path.join('/proc', pid, 'cmdline')) as f:
cmdline = f.read().replace('\0', ' ').strip()
print(f'Waiting for lock currently held by process {pid} - {cmdline}')