Prototype implementation of ActionItem + observers
This commit is contained in:
parent
a320661727
commit
dae215424e
@ -8,7 +8,7 @@ from spoc import repo_online
|
||||
from spoc import repo_publish
|
||||
from spoc.image import Image
|
||||
from spoc.imagebuilder import ImageBuilder
|
||||
from spoc.utils import readable_size
|
||||
from spoc.utils import ActionItem, readable_size
|
||||
|
||||
ACTION_LIST = 1
|
||||
ACTION_DOWNLOAD = 2
|
||||
@ -36,11 +36,15 @@ def listing(repo_type):
|
||||
print(image)
|
||||
|
||||
def download(image_name):
|
||||
plan = []
|
||||
local_images = repo_local.get_images()
|
||||
for layer in repo_online.get_image(image_name)['layers']:
|
||||
if layer not in local_images:
|
||||
print(f'Downloading {layer}...')
|
||||
Image(layer, False).download()
|
||||
image = Image(layer, False)
|
||||
plan.append(ActionItem(f'Downloading {image_name}', image.download))
|
||||
plan.append(ActionItem(f'Unpacking {image_name}', image.unpack_downloaded))
|
||||
for item in plan:
|
||||
item.run()
|
||||
|
||||
def delete(image_name):
|
||||
Image(image_name, False).delete()
|
||||
@ -50,9 +54,9 @@ def build(filename, force, do_publish):
|
||||
image_name = get_image_name(filename)
|
||||
if force or image_name not in repo_local.get_images():
|
||||
image = Image(image_name, False)
|
||||
image.delete()
|
||||
print(f'Building image {image_name} from file {filename}')
|
||||
ImageBuilder().build(image, filename)
|
||||
image.delete()
|
||||
image.create(ImageBuilder(), filename)
|
||||
print(f'Image {image_name} built successfully')
|
||||
# If publishing was requested, force publish after successful build
|
||||
force = True
|
||||
@ -119,6 +123,6 @@ elif args.action == ACTION_BUILD:
|
||||
elif args.action == ACTION_PUBLISH:
|
||||
publish(args.image, args.force)
|
||||
elif args.action == ACTION_UNPUBLISH:
|
||||
publishreop.unpublish_image(args.image)
|
||||
unpublish(args.image)
|
||||
else:
|
||||
parser.print_usage()
|
||||
|
@ -14,6 +14,8 @@ APPS_DIR = os.path.join(DATA_DIR, 'apps/')
|
||||
CONTAINERS_DIR = os.path.join(DATA_DIR, 'containers/')
|
||||
LAYERS_DIR = os.path.join(DATA_DIR, 'layers/')
|
||||
VOLUME_DIR = os.path.join(DATA_DIR, 'volumes/')
|
||||
TMP_APPS_DIR = os.path.join(DATA_DIR, 'tmp/apps/')
|
||||
TMP_LAYERS_DIR = os.path.join(DATA_DIR, 'tmp/layers/')
|
||||
HOSTS_FILE = os.path.join(DATA_DIR, 'hosts')
|
||||
REPO_FILE = os.path.join(DATA_DIR, 'repository.json')
|
||||
LOG_DIR = config.get('general', 'log-dir', fallback='/var/log/spoc')
|
||||
|
@ -8,7 +8,7 @@ import urllib.parse
|
||||
from . import repo_local
|
||||
from . import repo_online
|
||||
from . import repo_publish
|
||||
from .config import LAYERS_DIR, PUB_LAYERS_DIR, ONLINE_LAYERS_URL
|
||||
from .config import LAYERS_DIR, PUB_LAYERS_DIR, ONLINE_LAYERS_URL, TMP_LAYERS_DIR
|
||||
|
||||
DEFINITION_MEMBERS = {'layers', 'env', 'uid', 'gid', 'cmd', 'cwd', 'ready', 'halt', 'size', 'dlsize', 'hash'}
|
||||
|
||||
@ -16,8 +16,6 @@ class Image:
|
||||
def __init__(self, name, load_from_repo=True):
|
||||
self.name = name
|
||||
self.layer_path = os.path.join(LAYERS_DIR, name)
|
||||
self.archive_path = os.path.join(PUB_LAYERS_DIR, f'{name}.tar.xz')
|
||||
self.online_path = urllib.parse.urljoin(ONLINE_LAYERS_URL, f'{name}.tar.xz')
|
||||
self.layers = [name]
|
||||
self.env = {}
|
||||
self.uid = None
|
||||
@ -44,12 +42,12 @@ class Image:
|
||||
definition[key] = value
|
||||
return definition
|
||||
|
||||
def create(self, imagebuilder):
|
||||
def create(self, imagebuilder, filename):
|
||||
# Build the container from image file and save to local repository
|
||||
# Chown is possible only when the process is running as root, for user namespaces, see https://linuxcontainers.org/lxc/manpages/man1/lxc-usernsexec.1.html
|
||||
os.makedirs(self.layer_path, 0o755, True)
|
||||
os.chown(self.layer_path, 100000, 100000)
|
||||
imagebuilder.process_file()
|
||||
imagebuilder.build(self, filename)
|
||||
repo_local.register_image(self.name, self.get_definition())
|
||||
|
||||
def delete(self):
|
||||
@ -60,24 +58,36 @@ class Image:
|
||||
pass
|
||||
|
||||
def publish(self):
|
||||
os.makedirs(PUB_LAYERS_DIR, 0o755, True)
|
||||
os.makedirs(PUB_LAYERS_DIR, 0o700, True)
|
||||
files = repo_publish.TarSizeCounter()
|
||||
with tarfile.open(self.archive_path, 'w:xz') as tar:
|
||||
archive_path = os.path.join(PUB_LAYERS_DIR, f'{self.name}.tar.xz')
|
||||
with tarfile.open(archive_path, 'w:xz') as tar:
|
||||
tar.add(self.layer_path, self.name, filter=files.add_file)
|
||||
self.size = files.size
|
||||
self.dlsize = os.path.getsize(self.archive_path)
|
||||
self.hash = repo_publish.sign_file(self.archive_path).hex()
|
||||
self.dlsize = os.path.getsize(archive_path)
|
||||
self.hash = repo_publish.sign_file(archive_path).hex()
|
||||
repo_publish.register_image(self.name, self.get_definition())
|
||||
|
||||
def unpublish(self):
|
||||
repo_publish.unregister_image(self.name)
|
||||
archive_path = os.path.join(PUB_LAYERS_DIR, f'{self.name}.tar.xz')
|
||||
try:
|
||||
os.unlink(self.archive_path)
|
||||
os.unlink(archive_path)
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
def download(self):
|
||||
def download(self, observer):
|
||||
os.makedirs(TMP_LAYERS_DIR, 0o700, True)
|
||||
definition = repo_online.get_image(self.name)
|
||||
repo_online.download_archive(self.online_path, LAYERS_DIR, definition['hash'])
|
||||
observer.units_total = definition['dlsize']
|
||||
archive_url = urllib.parse.urljoin(ONLINE_LAYERS_URL, f'{self.name}.tar.xz')
|
||||
archive_path = os.path.join(TMP_LAYERS_DIR, f'{self.name}.tar.xz')
|
||||
repo_online.download_archive(archive_url, archive_path, definition['hash'], observer)
|
||||
|
||||
def unpack_downloaded(self, observer):
|
||||
definition = repo_online.get_image(self.name)
|
||||
observer.units_total = definition['size']
|
||||
archive_path = os.path.join(TMP_LAYERS_DIR, f'{self.name}.tar.xz')
|
||||
repo_online.unpack_archive(archive_path, LAYERS_DIR, definition['hash'], observer)
|
||||
self.set_definition(definition)
|
||||
repo_local.register_image(self.name, definition)
|
||||
|
@ -10,19 +10,16 @@ import zipfile
|
||||
|
||||
from .container import Container
|
||||
from .image import Image
|
||||
from .config import VOLUME_DIR
|
||||
from .config import LAYERS_DIR
|
||||
|
||||
class ImageBuilder:
|
||||
def build(self, image, filename):
|
||||
# Reset internal state, read and process lines from filename
|
||||
self.image = image
|
||||
self.filename = filename
|
||||
self.builddir = os.path.dirname(filename)
|
||||
self.script_eof = None
|
||||
self.script_lines = []
|
||||
self.image.create(self)
|
||||
|
||||
def process_file(self):
|
||||
with open(self.filename, 'r') as f:
|
||||
with open(filename, 'r') as f:
|
||||
for line in f:
|
||||
self.process_line(line.strip())
|
||||
|
||||
@ -71,7 +68,6 @@ class ImageBuilder:
|
||||
def run_script(self, script_lines):
|
||||
# Creates a temporary container, runs a script in its namespace, and stores the files modified by it as part of the layer
|
||||
# Note: If USER or WORKDIR directive has already been set, the command is run under that UID/GID or working directory
|
||||
os.makedirs(VOLUME_DIR, 0o755, True)
|
||||
script_fd, script_path = tempfile.mkstemp(suffix='.sh', dir=self.image.layer_path, text=True)
|
||||
script_name = os.path.basename(script_path)
|
||||
script_lines = '\n'.join(script_lines)
|
||||
@ -116,7 +112,7 @@ class ImageBuilder:
|
||||
if src.startswith('http://') or src.startswith('https://'):
|
||||
unpack_http_archive(src, dst)
|
||||
else:
|
||||
src = os.path.join(os.path.dirname(self.filename), src)
|
||||
src = os.path.join(self.builddir, src)
|
||||
copy_tree(src, dst)
|
||||
# Shift UID/GID of the files to the unprivileged range
|
||||
shift_uid(dst, os.stat(dst, follow_symlinks=False))
|
||||
|
@ -1,5 +1,6 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import copy
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
@ -8,6 +9,7 @@ import tarfile
|
||||
import tempfile
|
||||
import time
|
||||
|
||||
from cryptography.exceptions import InvalidSignature
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.asymmetric import ec, utils
|
||||
@ -27,24 +29,68 @@ PUBLIC_KEY = get_pubkey()
|
||||
|
||||
# TODO: HTTP Error handling for all downloads (including imagebuilder)
|
||||
|
||||
def download_archive(src, dst, expected_hash):
|
||||
# Download archive via http(s), verify hash and decompress
|
||||
with tempfile.TemporaryFile() as tmp_archive:
|
||||
sha512 = hashes.SHA512()
|
||||
hasher = hashes.Hash(sha512, default_backend())
|
||||
# Download the file via http(s) and store as temporary file
|
||||
with requests.Session() as session:
|
||||
resource = session.get(src, stream=True)
|
||||
def verify_fileobj(fileobj, expected_hash):
|
||||
hasher = hashes.Hash(hashes.SHA512(), default_backend())
|
||||
while True:
|
||||
data = fileobj.read(64*1024)
|
||||
if not data:
|
||||
break
|
||||
hasher.update(data)
|
||||
PUBLIC_KEY.verify(bytes.fromhex(expected_hash), hasher.finalize(), ec.ECDSA(utils.Prehashed(hashes.SHA512())))
|
||||
|
||||
def download_archive(archive_url, archive_path, expected_hash, observer):
|
||||
# Check if an archive needs to be downloaded via http(s)
|
||||
do_download = True
|
||||
# If the file already exists in the temporary directory, verify the signature
|
||||
if os.path.exists(archive_path):
|
||||
try:
|
||||
with open(archive_path, 'rb') as f:
|
||||
verify_fileobj(f, expected_hash)
|
||||
# If the signature matches, skip download
|
||||
observer.units_done = os.path.getsize(archive_path)
|
||||
do_download = False
|
||||
except InvalidSignature:
|
||||
pass
|
||||
if do_download:
|
||||
# Download archive via http(s) and store in temporary directory
|
||||
with open(archive_path, 'wb') as f, requests.Session() as session:
|
||||
resource = session.get(archive_url, stream=True)
|
||||
for chunk in resource.iter_content(chunk_size=None):
|
||||
if chunk:
|
||||
tmp_archive.write(chunk)
|
||||
hasher.update(chunk)
|
||||
# Verify hash
|
||||
PUBLIC_KEY.verify(bytes.fromhex(expected_hash), hasher.finalize(), ec.ECDSA(utils.Prehashed(sha512)))
|
||||
# Extract the tar.xz file
|
||||
tmp_archive.seek(0)
|
||||
with tarfile.open(fileobj=tmp_archive) as tar:
|
||||
tar.extractall(dst, numeric_owner=True)
|
||||
observer.units_done += f.write(chunk)
|
||||
|
||||
def unpack_archive(archive_path, destination, expected_hash, observer):
|
||||
with open(archive_path, 'rb') as f:
|
||||
# Verify file object, then seek back and open it as tar without losing handle, thus preventing posible malicious race conditions
|
||||
verify_fileobj(f, expected_hash)
|
||||
f.seek(0)
|
||||
tar = tarfile.open(fileobj=f)
|
||||
# Extract the tar members while counting their size, taken from https://github.com/python/cpython/blob/master/Lib/tarfile.py
|
||||
directories = []
|
||||
for tarinfo in tar:
|
||||
if tarinfo.isdir():
|
||||
# Extract directories with a safe mode
|
||||
directories.append(tarinfo)
|
||||
tarinfo = copy.copy(tarinfo)
|
||||
tarinfo.mode = 0o700
|
||||
# Do not set_attrs directories, as we will do that further down
|
||||
tar.extract(tarinfo, destination, set_attrs=not tarinfo.isdir(), numeric_owner=True)
|
||||
observer.units_done += tarinfo.size
|
||||
# Reverse sort directories
|
||||
directories.sort(key=lambda a: a.name)
|
||||
directories.reverse()
|
||||
# Set correct owner, mtime and filemode on directories
|
||||
for tarinfo in directories:
|
||||
dirpath = os.path.join(destination, tarinfo.name)
|
||||
try:
|
||||
tar.chown(tarinfo, dirpath, numeric_owner=True)
|
||||
tar.utime(tarinfo, dirpath)
|
||||
tar.chmod(tarinfo, dirpath)
|
||||
except ExtractError as e:
|
||||
if tar.errorlevel > 1:
|
||||
raise
|
||||
# Remove the archive
|
||||
os.unlink(archive_path)
|
||||
|
||||
def download_metadata():
|
||||
with requests.Session() as session:
|
||||
|
@ -2,6 +2,17 @@
|
||||
|
||||
SIZE_PREFIXES = ('', 'k', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
|
||||
|
||||
class ActionItem:
|
||||
def __init__(self, text, action):
|
||||
self.text = text
|
||||
self.action = action
|
||||
self.units_total = 0
|
||||
self.units_done = 0
|
||||
|
||||
def run(self):
|
||||
print(self.text)
|
||||
self.action(self)
|
||||
|
||||
def readable_size(bytes):
|
||||
i = 0
|
||||
while bytes > 1024:
|
||||
|
Loading…
x
Reference in New Issue
Block a user