Define own layer in image, simplify copy_tree + extractall, raise HTTP errors
This commit is contained in:
parent
9e9b3da0dd
commit
e32a76e174
@ -6,7 +6,7 @@ import os
|
||||
import shlex
|
||||
|
||||
from spoc import repo_local
|
||||
from spoc.container import Container, STATE_RUNNING, STATE_STOPPED
|
||||
from spoc.container import Container, State
|
||||
from spoc.image import Image
|
||||
from spoc.config import VOLUME_DIR
|
||||
|
||||
@ -14,9 +14,9 @@ def listing(state):
|
||||
if state == 'all':
|
||||
containers = repo_local.get_containers().keys()
|
||||
elif state == 'running':
|
||||
containers = [c for c in repo_local.get_containers() if Container(c).get_state() == STATE_RUNNING]
|
||||
containers = [c for c in repo_local.get_containers() if Container(c).get_state() == State.RUNNING]
|
||||
elif state == 'stopped':
|
||||
containers = [c for c in repo_local.get_containers() if Container(c).get_state() == STATE_STOPPED]
|
||||
containers = [c for c in repo_local.get_containers() if Container(c).get_state() == State.STOPPED]
|
||||
for container in containers:
|
||||
print(container)
|
||||
|
||||
@ -71,7 +71,7 @@ def modify_container(container, depends, mounts, envs, uid, gid, cmd, cwd, ready
|
||||
def create(container_name, image_name, depends, mounts, env, uid, gid, cmd, cwd, ready, halt, autostart):
|
||||
# Create container based on image definition and extrea fields
|
||||
container = Container(container_name, False)
|
||||
container.set_definition(Image(image_name).get_definition(True))
|
||||
container.set_definition(Image(image_name).get_definition())
|
||||
modify_container(container, depends, mounts, env, uid, gid, cmd, cwd, ready, halt, autostart)
|
||||
container.create()
|
||||
|
||||
|
@ -3,6 +3,7 @@
|
||||
import time
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from math import floor
|
||||
|
||||
SIZE_PREFIXES = ('', 'k', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
|
||||
|
||||
@ -50,7 +51,7 @@ class ActionItem:
|
||||
self.print_progress('\n')
|
||||
|
||||
def print_progress(self, end='\r'):
|
||||
print(f'\x1b[K{self.text} ({self.units_done}/{self.units_total}) [{self.units_done/self.units_total*100:.2f} %]', end=end)
|
||||
print(f'\x1b[K{self.text} ({self.units_done}/{self.units_total}) [{floor(self.units_done/self.units_total*100)} %]', end=end)
|
||||
|
||||
def readable_size(bytes):
|
||||
i = 0
|
||||
|
@ -14,10 +14,12 @@ APPS_DIR = os.path.join(DATA_DIR, 'apps/')
|
||||
CONTAINERS_DIR = os.path.join(DATA_DIR, 'containers/')
|
||||
LAYERS_DIR = os.path.join(DATA_DIR, 'layers/')
|
||||
VOLUME_DIR = os.path.join(DATA_DIR, 'volumes/')
|
||||
TMP_APPS_DIR = os.path.join(DATA_DIR, 'tmp/apps/')
|
||||
TMP_LAYERS_DIR = os.path.join(DATA_DIR, 'tmp/layers/')
|
||||
HOSTS_FILE = os.path.join(DATA_DIR, 'hosts')
|
||||
REPO_FILE = os.path.join(DATA_DIR, 'repository.json')
|
||||
|
||||
TMP_DIR = os.path.join(DATA_DIR, 'tmp/')
|
||||
TMP_APPS_DIR = os.path.join(TMP_DIR, 'apps/')
|
||||
TMP_LAYERS_DIR = os.path.join(TMP_DIR, 'layers/')
|
||||
LOG_DIR = config.get('general', 'log-dir', fallback='/var/log/spoc')
|
||||
LOCK_FILE = '/run/lock/spoc-local.lock'
|
||||
|
||||
@ -35,5 +37,5 @@ ONLINE_LAYERS_URL = urllib.parse.urljoin(ONLINE_BASE_URL, 'layers/')
|
||||
ONLINE_APPS_URL = urllib.parse.urljoin(ONLINE_BASE_URL, 'apps/')
|
||||
ONLINE_REPO_URL = urllib.parse.urljoin(ONLINE_BASE_URL, 'repository.json')
|
||||
ONLINE_SIG_URL = urllib.parse.urljoin(ONLINE_BASE_URL, 'repository.sig')
|
||||
ONLINE_REPO_FILE = os.path.join(DATA_DIR, 'online.json')
|
||||
ONLINE_REPO_FILE = os.path.join(TMP_DIR, 'online.json')
|
||||
ONLINE_PUBKEY = config.get('repo', 'public-key', fallback='')
|
||||
|
@ -1,6 +1,7 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import copy
|
||||
import enum
|
||||
import os
|
||||
import shlex
|
||||
import shutil
|
||||
@ -17,14 +18,15 @@ from .config import CONTAINERS_DIR, LAYERS_DIR, LOG_DIR, HOSTS_FILE, VOLUME_DIR
|
||||
from .templates import LXC_CONTAINER_TEMPLATE
|
||||
|
||||
# States taken from https://github.com/lxc/lxc/blob/master/src/lxc/state.h
|
||||
STATE_STOPPED = 'STOPPED'
|
||||
STATE_STARTING = 'STARTING'
|
||||
STATE_RUNNING = 'RUNNING'
|
||||
STATE_STOPPING = 'STOPPING'
|
||||
STATE_ABORTING = 'ABORTING'
|
||||
STATE_FREEZING = 'FREEZING'
|
||||
STATE_FROZEN = 'FROZEN'
|
||||
STATE_THAWED = 'THAWED'
|
||||
class State(enum.Enum):
|
||||
STOPPED = 'STOPPED'
|
||||
STARTING = 'STARTING'
|
||||
RUNNING = 'RUNNING'
|
||||
STOPPING = 'STOPPING'
|
||||
ABORTING = 'ABORTING'
|
||||
FREEZING = 'FREEZING'
|
||||
FROZEN = 'FROZEN'
|
||||
THAWED = 'THAWED'
|
||||
|
||||
DEFINITION_MEMBERS = {'build', 'depends', 'layers', 'mounts', 'env', 'uid', 'gid', 'cmd', 'cwd', 'ready', 'halt', 'autostart'}
|
||||
|
||||
@ -68,11 +70,11 @@ class Container:
|
||||
def get_state(self):
|
||||
# Get current state of the container, uses LXC monitor socket accessible only in ocntainer's namespace
|
||||
state = subprocess.run(['lxc-info', '-sH', '-P', CONTAINERS_DIR, self.name], capture_output=True, check=True)
|
||||
return state.stdout.strip().decode()
|
||||
return State[state.stdout.strip().decode()]
|
||||
|
||||
def await_state(self, awaited_state):
|
||||
# Block execution until the container reaches the desired state or until timeout
|
||||
subprocess.run(['lxc-wait', '-P', CONTAINERS_DIR, '-s', awaited_state, '-t', '30', self.name], check=True)
|
||||
subprocess.run(['lxc-wait', '-P', CONTAINERS_DIR, '-s', awaited_state.value, '-t', '30', self.name], check=True)
|
||||
|
||||
def mount_rootfs(self):
|
||||
# Prepares container rootfs
|
||||
@ -150,13 +152,13 @@ class Container:
|
||||
depsolver = DepSolver()
|
||||
self.get_start_dependencies(depsolver)
|
||||
for dependency in depsolver.solve():
|
||||
if dependency.get_state() != STATE_RUNNING:
|
||||
if dependency.get_state() != State.RUNNING:
|
||||
dependency.do_start()
|
||||
|
||||
def do_start(self):
|
||||
# Start the current container, wait until it is reported as started and execute application readiness check
|
||||
subprocess.Popen(['lxc-start', '-P', CONTAINERS_DIR, self.name])
|
||||
self.await_state(STATE_RUNNING)
|
||||
self.await_state(State.RUNNING)
|
||||
# Launch the readiness check in a separate thread, so it can be reliably cancelled after timeout
|
||||
with ThreadPoolExecutor(max_workers=1) as pool:
|
||||
# Create anonymous object to pass the task cancellation information
|
||||
@ -170,7 +172,7 @@ class Container:
|
||||
ready_cmd = shlex.split(self.ready) if self.ready else ['/bin/true']
|
||||
while not guard.cancel:
|
||||
state = self.get_state()
|
||||
if state != STATE_RUNNING:
|
||||
if state != State.RUNNING:
|
||||
raise InvalidContainerStateError(self.name, state)
|
||||
check = subprocess.run(['lxc-attach', '-P', CONTAINERS_DIR, '--clear-env', self.name, '--']+ready_cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=30)
|
||||
if check.returncode == 0:
|
||||
@ -182,22 +184,22 @@ class Container:
|
||||
depsolver = DepSolver()
|
||||
self.get_stop_dependencies(depsolver)
|
||||
for dependency in depsolver.solve():
|
||||
if dependency.get_state() != STATE_STOPPED:
|
||||
if dependency.get_state() != State.STOPPED:
|
||||
dependency.do_stop()
|
||||
|
||||
def do_stop(self):
|
||||
# Stop the current container and wait until it stops completely
|
||||
subprocess.Popen(['lxc-stop', '-P', CONTAINERS_DIR, self.name])
|
||||
self.await_state(STATE_STOPPED)
|
||||
self.await_state(State.STOPPED)
|
||||
|
||||
def execute(self, cmd, uid=None, gid=None, **kwargs):
|
||||
# If the container is starting or stopping, wait until the operation is finished
|
||||
state = self.get_state()
|
||||
if state == STATE_STARTING:
|
||||
self.await_state(STATE_RUNNING)
|
||||
if state == State.STARTING:
|
||||
self.await_state(State.RUNNING)
|
||||
state = self.get_state()
|
||||
elif state == STATE_STOPPING:
|
||||
self.await_state(STATE_STOPPED)
|
||||
elif state == State.STOPPING:
|
||||
self.await_state(State.STOPPED)
|
||||
state = self.get_state()
|
||||
# Resolve UID/GID, if they have been given
|
||||
uidgid_param = []
|
||||
@ -207,9 +209,9 @@ class Container:
|
||||
if gid:
|
||||
uidgid_param.extend(('-g', gid))
|
||||
# If the container is stopped, use lxc-execute, otherwise use lxc-attach
|
||||
if state == STATE_STOPPED:
|
||||
if state == State.STOPPED:
|
||||
return subprocess.run(['lxc-execute', '-P', CONTAINERS_DIR]+uidgid_param+[self.name, '--']+cmd, **kwargs)
|
||||
elif state == STATE_RUNNING:
|
||||
elif state == State.RUNNING:
|
||||
return subprocess.run(['lxc-attach', '-P', CONTAINERS_DIR, '--clear-env']+uidgid_param+[self.name, '--']+cmd, **kwargs)
|
||||
else:
|
||||
raise InvalidContainerStateError(self.name, state)
|
||||
|
@ -5,7 +5,8 @@ from .exceptions import CircularDependencyError
|
||||
class Node:
|
||||
def __init__(self, name, depends, instance):
|
||||
self.name = name
|
||||
self.depends = set(depends)
|
||||
# Remove the node from its own dependencies
|
||||
self.depends = set(depends) - {name}
|
||||
self.instance = instance
|
||||
|
||||
class DepSolver:
|
||||
|
@ -17,7 +17,7 @@ class Image:
|
||||
def __init__(self, name, load_from_repo=True):
|
||||
self.name = name
|
||||
self.layer_path = os.path.join(LAYERS_DIR, name)
|
||||
self.layers = []
|
||||
self.layers = [name]
|
||||
self.env = {}
|
||||
self.uid = None
|
||||
self.gid = None
|
||||
@ -35,19 +35,13 @@ class Image:
|
||||
for key in DEFINITION_MEMBERS.intersection(definition):
|
||||
setattr(self, key, definition[key])
|
||||
|
||||
def get_definition(self, including_self_layer=False):
|
||||
def get_definition(self):
|
||||
# Return shallow copy of image definition as dictionary
|
||||
definition = {}
|
||||
for key in DEFINITION_MEMBERS:
|
||||
value = getattr(self, key)
|
||||
if value:
|
||||
definition[key] = copy.copy(value)
|
||||
# Always add layers in definition even if there are none to ease processing elsewhere
|
||||
if 'layers' not in definition:
|
||||
definition['layers'] = []
|
||||
# Add the image's layer as tompost, if requested (useful for container creation)
|
||||
if including_self_layer:
|
||||
definition['layers'].append(self.name)
|
||||
return definition
|
||||
|
||||
def create(self, imagebuilder, filename):
|
||||
|
@ -41,7 +41,8 @@ class ImageBuilder:
|
||||
self.script_eof = args
|
||||
elif 'FROM' == directive:
|
||||
# Set the values of image from which this one inherits
|
||||
self.image.set_definition(Image(args).get_definition(True))
|
||||
self.image.set_definition(Image(args).get_definition())
|
||||
self.image.layers.append(self.image.name)
|
||||
elif 'COPY' == directive:
|
||||
srcdst = args.split()
|
||||
self.copy_files(srcdst[0], srcdst[1] if len(srcdst) > 1 else '')
|
||||
@ -76,7 +77,7 @@ class ImageBuilder:
|
||||
os.chown(script_path, 100000, 100000)
|
||||
# Create a temporary container from the current image definition and execute the script within the container
|
||||
container = Container(self.image.name, False)
|
||||
container.set_definition(self.image.get_definition(True))
|
||||
container.set_definition(self.image.get_definition())
|
||||
container.build = True
|
||||
container.create()
|
||||
container.execute(['/bin/sh', '-lc', os.path.join('/', script_name)], check=True)
|
||||
@ -98,7 +99,7 @@ class ImageBuilder:
|
||||
if not uid.isdigit() or not gid.isdigit():
|
||||
# Resolve the UID/GID from container if either of them is entered as string
|
||||
container = Container(self.image.name, False)
|
||||
container.set_definition(self.image.get_definition(True))
|
||||
container.set_definition(self.image.get_definition())
|
||||
container.create()
|
||||
uid,gid = container.get_uidgid(uid, gid)
|
||||
container.destroy()
|
||||
@ -112,7 +113,10 @@ class ImageBuilder:
|
||||
unpack_http_archive(src, dst)
|
||||
else:
|
||||
src = os.path.join(self.builddir, src)
|
||||
copy_tree(src, dst)
|
||||
if not os.path.isdir(src):
|
||||
shutil.copy2(src, dst)
|
||||
else:
|
||||
shutil.copytree(src, dst, symlinks=True, ignore_dangling_symlinks=True, dirs_exist_ok=True)
|
||||
# Shift UID/GID of the files to the unprivileged range
|
||||
shift_uid(dst, os.stat(dst, follow_symlinks=False))
|
||||
|
||||
@ -122,6 +126,7 @@ def unpack_http_archive(src, dst):
|
||||
# Download the file via http(s) and store as temporary file
|
||||
with requests.Session() as session:
|
||||
resource = session.get(src, stream=True)
|
||||
resource.raise_for_status()
|
||||
for chunk in resource.iter_content(chunk_size=None):
|
||||
if chunk:
|
||||
tmp_archive.write(chunk)
|
||||
@ -137,17 +142,6 @@ def unpack_http_archive(src, dst):
|
||||
with tarfile.open(fileobj=tmp_archive) as tar:
|
||||
tar.extractall(dst, numeric_owner=True)
|
||||
|
||||
def copy_tree(src, dst):
|
||||
# TODO: shutil.copytree?
|
||||
# Copies files from the host
|
||||
if not os.path.isdir(src):
|
||||
shutil.copy2(src, dst)
|
||||
else:
|
||||
os.makedirs(dst, exist_ok=True)
|
||||
for name in os.listdir(src):
|
||||
copy_tree(os.path.join(src, name), os.path.join(dst, name))
|
||||
shutil.copystat(src, dst)
|
||||
|
||||
def shift_uid(path, path_stat):
|
||||
# Shifts UID/GID of a file or a directory and its contents to the unprivileged range
|
||||
# The function parameters could arguably be more friendly, but os.scandir() already calls stat() on the entires,
|
||||
|
@ -16,7 +16,7 @@ from cryptography.hazmat.primitives.asymmetric import ec, utils
|
||||
from cryptography.hazmat.primitives.serialization import load_pem_public_key
|
||||
|
||||
from .exceptions import AppNotFoundError, ImageNotFoundError
|
||||
from .config import ONLINE_REPO_FILE, ONLINE_PUBKEY, ONLINE_REPO_URL, ONLINE_SIG_URL
|
||||
from .config import ONLINE_REPO_FILE, ONLINE_PUBKEY, ONLINE_REPO_URL, ONLINE_SIG_URL, TMP_DIR
|
||||
|
||||
TYPE_APP = 'apps'
|
||||
TYPE_IMAGE = 'images'
|
||||
@ -27,8 +27,6 @@ def get_pubkey():
|
||||
|
||||
PUBLIC_KEY = get_pubkey()
|
||||
|
||||
# TODO: HTTP Error handling for all downloads (including imagebuilder)
|
||||
|
||||
def verify_fileobj(fileobj, expected_hash):
|
||||
hasher = hashes.Hash(hashes.SHA512(), default_backend())
|
||||
while True:
|
||||
@ -55,48 +53,35 @@ def download_archive(archive_url, archive_path, expected_hash, observer):
|
||||
# Download archive via http(s) and store in temporary directory
|
||||
with open(archive_path, 'wb') as f, requests.Session() as session:
|
||||
resource = session.get(archive_url, stream=True)
|
||||
resource.raise_for_status()
|
||||
for chunk in resource.iter_content(chunk_size=64*1024):
|
||||
if chunk:
|
||||
observer.units_done += f.write(chunk)
|
||||
|
||||
def unpack_archive(archive_path, destination, expected_hash, observer):
|
||||
with open(archive_path, 'rb') as f:
|
||||
# Verify file object, then seek back and open it as tar without losing handle, thus preventing posible malicious race conditions
|
||||
# Verify file object, then seek back and open it as tar without losing handle, preventing possible malicious race conditions
|
||||
verify_fileobj(f, expected_hash)
|
||||
f.seek(0)
|
||||
tar = tarfile.open(fileobj=f)
|
||||
# Extract the tar members while counting their size, taken from https://github.com/python/cpython/blob/master/Lib/tarfile.py
|
||||
directories = []
|
||||
# Extract the tar members while counting their size
|
||||
# If this is done as non-root, extractall() from https://github.com/python/cpython/blob/master/Lib/tarfile.py needs to be reimplemented instead
|
||||
for tarinfo in tar:
|
||||
if tarinfo.isdir():
|
||||
# Extract directories with a safe mode
|
||||
directories.append(tarinfo)
|
||||
tarinfo = copy.copy(tarinfo)
|
||||
tarinfo.mode = 0o700
|
||||
# Do not set_attrs directories, as we will do that further down
|
||||
tar.extract(tarinfo, destination, set_attrs=not tarinfo.isdir(), numeric_owner=True)
|
||||
tar.extract(tarinfo, destination, numeric_owner=True)
|
||||
observer.units_done += tarinfo.size
|
||||
# Reverse sort directories
|
||||
directories.sort(key=lambda a: a.name)
|
||||
directories.reverse()
|
||||
# Set correct owner, mtime and filemode on directories
|
||||
for tarinfo in directories:
|
||||
dirpath = os.path.join(destination, tarinfo.name)
|
||||
try:
|
||||
tar.chown(tarinfo, dirpath, numeric_owner=True)
|
||||
tar.utime(tarinfo, dirpath)
|
||||
tar.chmod(tarinfo, dirpath)
|
||||
except ExtractError as e:
|
||||
if tar.errorlevel > 1:
|
||||
raise
|
||||
# Remove the archive
|
||||
os.unlink(archive_path)
|
||||
|
||||
def download_metadata():
|
||||
with requests.Session() as session:
|
||||
packages = session.get(ONLINE_REPO_URL, timeout=5).content
|
||||
packages_sig = session.get(ONLINE_SIG_URL, timeout=5).content
|
||||
resource = session.get(ONLINE_REPO_URL, timeout=5)
|
||||
resource.raise_for_status()
|
||||
packages = resource.content
|
||||
resource = session.get(ONLINE_SIG_URL, timeout=5)
|
||||
resource.raise_for_status()
|
||||
packages_sig = resource.content
|
||||
PUBLIC_KEY.verify(packages_sig, packages, ec.ECDSA(hashes.SHA512()))
|
||||
os.makedirs(TMP_DIR, 0o700, True)
|
||||
with open(ONLINE_REPO_FILE, 'wb') as f:
|
||||
f.write(packages)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user