Implement exclusive locking with mtime-based cache

This commit is contained in:
Disassembler 2020-03-12 20:56:52 +01:00
parent ebb45e502a
commit ac4b7f9995
No known key found for this signature in database
GPG Key ID: 524BD33A0EE29499
10 changed files with 197 additions and 130 deletions

View File

@ -9,7 +9,9 @@ from spoc import repo_local
from spoc import repo_online
from spoc import repo_publish
from spoc.app import App
from spoc.cli import ActionQueue, readable_size
from spoc.cli import ActionQueue, print_lock, readable_size
from spoc.config import LOCK_FILE
from spoc.flock import locked
from spoc.image import Image
def listing(list_type):
@ -25,6 +27,7 @@ def listing(list_type):
for app in apps:
print(app)
@locked(LOCK_FILE, print_lock)
def install(app_name):
queue = ActionQueue()
required_images = []
@ -37,9 +40,11 @@ def install(app_name):
queue.download_app(App(app_name, False))
queue.process()
@locked(LOCK_FILE, print_lock)
def update(app_name):
App(app_name).update()
@locked(LOCK_FILE, print_lock)
def uninstall(app_name):
App(app_name).uninstall()

View File

@ -6,9 +6,9 @@ import os
import shlex
from spoc import repo_local
from spoc.config import VOLUMES_DIR
from spoc.container import Container, State
from spoc.image import Image
from spoc.config import VOLUMES_DIR
def listing(state):
if state == 'all':

View File

@ -8,11 +8,13 @@ import sys
from spoc import repo_local
from spoc import repo_online
from spoc import repo_publish
from spoc.cli import ActionQueue, print_lock, readable_size
from spoc.config import LOCK_FILE
from spoc.depsolver import DepSolver
from spoc.exceptions import ImageNotFoundError
from spoc.flock import locked
from spoc.image import Image
from spoc.imagebuilder import ImageBuilder
from spoc.cli import ActionQueue, readable_size
def get_image_name(file_path):
# Read and return image name from image file
@ -32,6 +34,7 @@ def listing(list_type):
for image in images:
print(image)
@locked(LOCK_FILE, print_lock)
def download(image_name):
queue = ActionQueue()
local_images = repo_local.get_images()
@ -40,6 +43,7 @@ def download(image_name):
queue.download_image(Image(layer, False))
queue.process()
@locked(LOCK_FILE, print_lock)
def delete(image_name):
# Remove the image including all images that have it as one of its parents
# Check if image is in use
@ -58,6 +62,7 @@ def delete(image_name):
queue.delete_image(image)
queue.process()
@locked(LOCK_FILE, print_lock)
def clean():
# Remove images which aren't used in any locally defined containers
used = set()
@ -74,6 +79,7 @@ def clean():
queue.delete_image(image)
queue.process()
@locked(LOCK_FILE, print_lock)
def build(filename, force, do_publish):
# Check if a build is needed and attempt to build the image from image file
image_name = get_image_name(filename)

View File

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
import os
import time
from concurrent.futures import ThreadPoolExecutor
from math import floor
@ -63,3 +64,8 @@ def readable_size(bytes):
i += 1
bytes /= 1024
return f'{bytes:.2f} {SIZE_PREFIXES[i]}B'
def print_lock(pid):
with open(os.path.join('/proc', pid, 'cmdline')) as f:
cmdline = f.read().replace('\0', ' ').strip()
print(f'Waiting for lock currently held by process {pid} - {cmdline}')

View File

@ -17,11 +17,15 @@ VOLUMES_DIR = os.path.join(DATA_DIR, 'volumes/')
HOSTS_FILE = os.path.join(DATA_DIR, 'hosts')
REPO_FILE = os.path.join(DATA_DIR, 'repository.json')
LOCK_DIR = '/run/lock'
LOCK_FILE = os.path.join(LOCK_DIR, 'spoc.lock')
HOSTS_LOCK_FILE = os.path.join(LOCK_DIR, 'spoc-hosts.lock')
REPO_LOCK_FILE = os.path.join(LOCK_DIR, 'spoc-local.lock')
TMP_DIR = os.path.join(DATA_DIR, 'tmp/')
TMP_APPS_DIR = os.path.join(TMP_DIR, 'apps/')
TMP_LAYERS_DIR = os.path.join(TMP_DIR, 'layers/')
LOG_DIR = config.get('general', 'log-dir', fallback='/var/log/spoc')
LOCK_FILE = '/run/lock/spoc-local.lock'
PUB_DIR = config.get('publish', 'publish-dir', fallback=os.path.join(DATA_DIR, 'publish'))
PUB_LAYERS_DIR = os.path.join(PUB_DIR, 'layers/')
@ -29,7 +33,7 @@ PUB_APPS_DIR = os.path.join(PUB_DIR, 'apps/')
PUB_REPO_FILE = os.path.join(PUB_DIR, 'repository.json')
PUB_SIG_FILE = os.path.join(PUB_DIR, 'repository.sig')
PUB_PRIVKEY_FILE = config.get('publish', 'signing-key', fallback='/etc/spoc/publish.key')
PUB_LOCK_FILE = '/run/lock/spoc-publish.lock'
PUB_LOCK_FILE = os.path.join(LOCK_DIR, 'spoc-publish.lock')
# URLs which are an actual directories need to end with trailing slash
ONLINE_BASE_URL = '{}/'.format(config.get('repo', 'url', fallback='https://localhost').rstrip('/'))
@ -39,3 +43,8 @@ ONLINE_REPO_URL = urllib.parse.urljoin(ONLINE_BASE_URL, 'repository.json')
ONLINE_SIG_URL = urllib.parse.urljoin(ONLINE_BASE_URL, 'repository.sig')
ONLINE_REPO_FILE = os.path.join(TMP_DIR, 'online.json')
ONLINE_PUBKEY = config.get('repo', 'public-key', fallback='')
# Repo entry types constants
TYPE_APP = 'apps'
TYPE_CONTAINER = 'containers'
TYPE_IMAGE = 'images'

View File

@ -1,18 +1,48 @@
# -*- coding: utf-8 -*-
import errno
import fcntl
import os
import time
from contextlib import contextmanager
def locked_ex(lock_file):
@contextmanager
def lock(lock_file, fail_callback=None):
# Open the lock file in append mode first to ensure its existence but not modify any data if it already exists
with open(lock_file, 'a'):
pass
# Open the lock file in read + write mode without truncation
with open(lock_file, 'r+') as f:
while True:
try:
# Try to obtain exclusive lock in non-blocking mode
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
break
except OSError as e:
# If lock is already locked by another process
if e.errno == errno.EAGAIN:
if fail_callback:
# Call the callback function with contents of the lock file (PID of the process holding the lock)
fail_callback(f.read())
# Remove the callback function so it's not called in every loop
fail_callback = None
# Set the position for future truncation
f.seek(0)
# Wait for the lock to be freed
time.sleep(0.1)
else:
raise
# If the lock was obtained, truncate the file and write PID of the process holding the lock
f.truncate()
f.write(str(os.getpid()))
f.flush()
yield f
# Function decorator
def locked(lock_file, fail_callback=None):
def decorator(target):
def wrapper(*args, **kwargs):
with lock_ex(lock_file):
with lock(lock_file, fail_callback):
return target(*args, **kwargs)
return wrapper
return decorator
@contextmanager
def lock_ex(lock_file):
with open(lock_file, 'w') as lock:
fcntl.lockf(lock, fcntl.LOCK_EX)
yield lock

View File

@ -2,30 +2,44 @@
import fcntl
import ipaddress
import os
import socket
import struct
from .config import HOSTS_FILE, NETWORK_INTERFACE
from .config import HOSTS_FILE, HOSTS_LOCK_FILE, NETWORK_INTERFACE
from .flock import locked
# ioctl magic constants taken from https://git.musl-libc.org/cgit/musl/tree/include/sys/ioctl.h (same as glibc)
IOCTL_SIOCGIFADDR = 0x8915
IOCTL_SIOCGIFNETMASK = 0x891b
def read_leases():
leases = {}
mtime = None
@locked(HOSTS_LOCK_FILE)
def load_leases():
# Read and parse all IP-hostname pairs from the global hosts file
global leases
global mtime
try:
file_mtime = os.stat(HOSTS_FILE).st_mtime
if mtime != file_mtime:
with open(HOSTS_FILE, 'r') as f:
leases = [lease.strip().split(' ', 1) for lease in f]
return {ip: hostname for ip, hostname in leases}
leases = [lease.strip().split(None, 1) for lease in f]
leases = {ip: hostname for ip, hostname in leases}
mtime = file_mtime
except:
interface = get_bridge_interface()
return {str(interface.ip): 'host'}
leases = {str(interface.ip): 'host'}
def write_leases(leases):
@locked(HOSTS_LOCK_FILE)
def save_leases():
# write all IP-hostname pairs to the global hosts file
global mtime
with open(HOSTS_FILE, 'w') as f:
for ip, hostname in sorted(leases.items(), key=lambda lease: socket.inet_aton(lease[0])):
f.write(f'{ip} {hostname}\n')
mtime = os.stat(HOSTS_FILE).st_mtime
def get_bridge_interface():
# Returns bridge interface's IP address and netmask
@ -39,7 +53,7 @@ def get_bridge_interface():
def request_ip(container_name):
# Find if and IP hasn't been leased for the hostname
interface = get_bridge_interface()
leases = read_leases()
load_leases()
for ip in leases:
if leases[ip] == container_name:
return (ip, str(interface.network.prefixlen), str(interface.ip))
@ -48,10 +62,12 @@ def request_ip(container_name):
ip = str(ip)
if ip not in leases:
leases[ip] = container_name
write_leases(leases)
save_leases()
return (ip, str(interface.network.prefixlen), str(interface.ip))
def release_ip(container_name):
# Delete the lease from hosts file
leases = {ip: h for ip, h in read_leases().items() if h != container_name}
write_leases(leases)
global leases
load_leases()
leases = {ip: h for ip, h in leases.items() if h != container_name}
save_leases()

View File

@ -1,46 +1,57 @@
# -*- coding: utf-8 -*-
import fcntl
import json
import os
from .exceptions import AppNotFoundError, ContainerNotFoundError, ImageNotFoundError
from .flock import lock_ex
from .config import REPO_FILE, LOCK_FILE
from .config import REPO_FILE, REPO_LOCK_FILE, TYPE_APP, TYPE_CONTAINER, TYPE_IMAGE
from .flock import locked
TYPE_APP = 'apps'
TYPE_CONTAINER = 'containers'
TYPE_IMAGE = 'images'
data = {TYPE_IMAGE: {}, TYPE_CONTAINER: {}, TYPE_APP: {}}
mtime = None
def load():
global data
global mtime
try:
file_mtime = os.stat(REPO_FILE).st_mtime
if mtime != file_mtime:
with open(REPO_FILE) as f:
return json.load(f)
data = json.load(f)
mtime = file_mtime
except FileNotFoundError:
return {TYPE_IMAGE: {}, TYPE_CONTAINER: {}, TYPE_APP: {}}
pass
def save(data):
def save():
global mtime
with open(REPO_FILE, 'w') as f:
json.dump(data, f, sort_keys=True, indent=4)
mtime = os.stat(REPO_FILE).st_mtime
@locked(REPO_LOCK_FILE)
def get_entries(entry_type):
with lock_ex(LOCK_FILE):
data = load()
load()
return data[entry_type]
def get_entry(entry_type, name):
def get_entry(entry_type, name, exception):
try:
return get_entries(entry_type)[name]
except KeyError as e:
raise exception(name) from e
@locked(REPO_LOCK_FILE)
def add_entry(entry_type, name, definition):
with lock_ex(LOCK_FILE):
data = load()
load()
data[entry_type][name] = definition
save(data)
save()
@locked(REPO_LOCK_FILE)
def delete_entry(entry_type, name):
with lock_ex(LOCK_FILE):
data = load()
load()
try:
del data[entry_type][name]
save(data)
save()
except KeyError:
pass
@ -48,10 +59,7 @@ def get_images():
return get_entries(TYPE_IMAGE)
def get_image(image_name):
try:
return get_entry(TYPE_IMAGE, image_name)
except KeyError as e:
raise ImageNotFoundError(image_name) from e
return get_entry(TYPE_IMAGE, image_name, ImageNotFoundError)
def register_image(image_name, definition):
add_entry(TYPE_IMAGE, image_name, definition)
@ -63,10 +71,7 @@ def get_containers():
return get_entries(TYPE_CONTAINER)
def get_container(container_name):
try:
return get_entry(TYPE_CONTAINER, container_name)
except KeyError as e:
raise ContainerNotFoundError(container_name) from e
return get_entry(TYPE_CONTAINER, container_name, ContainerNotFoundError)
def register_container(container_name, definition):
add_entry(TYPE_CONTAINER, container_name, definition)
@ -78,10 +83,7 @@ def get_apps():
return get_entries(TYPE_APP)
def get_app(app_name):
try:
return get_entry(TYPE_APP, app_name)
except KeyError as e:
raise ImageNotFoundError(app_name) from e
return get_entry(TYPE_APP, app_name, AppNotFoundError)
def register_app(app_name, definition):
add_entry(TYPE_APP, app_name, definition)

View File

@ -1,12 +1,9 @@
# -*- coding: utf-8 -*-
import copy
import hashlib
import json
import os
import requests
import tarfile
import tempfile
import time
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.backends import default_backend
@ -15,10 +12,7 @@ from cryptography.hazmat.primitives.asymmetric import ec, utils
from cryptography.hazmat.primitives.serialization import load_pem_public_key
from .exceptions import AppNotFoundError, ImageNotFoundError
from .config import ONLINE_REPO_FILE, ONLINE_PUBKEY, ONLINE_REPO_URL, ONLINE_SIG_URL, TMP_DIR
TYPE_APP = 'apps'
TYPE_IMAGE = 'images'
from .config import ONLINE_PUBKEY, ONLINE_REPO_URL, ONLINE_SIG_URL, TYPE_APP, TYPE_IMAGE
def get_pubkey():
pubkey = f'-----BEGIN PUBLIC KEY-----\n{ONLINE_PUBKEY}\n-----END PUBLIC KEY-----'
@ -71,7 +65,11 @@ def unpack_archive(archive_path, destination, expected_hash, observer):
# Remove the archive
os.unlink(archive_path)
def download_metadata():
data = None
def load():
global data
if not data:
with requests.Session() as session:
resource = session.get(ONLINE_REPO_URL, timeout=5)
resource.raise_for_status()
@ -80,38 +78,26 @@ def download_metadata():
resource.raise_for_status()
packages_sig = resource.content
PUBLIC_KEY.verify(packages_sig, packages, ec.ECDSA(hashes.SHA512()))
os.makedirs(TMP_DIR, 0o700, True)
with open(ONLINE_REPO_FILE, 'wb') as f:
f.write(packages)
def load():
if not os.path.exists(ONLINE_REPO_FILE) or os.stat(ONLINE_REPO_FILE).st_mtime+300 < time.time():
# Download and the metadata file if local cache doesn't exist or is older than 5 minutes
download_metadata()
with open(ONLINE_REPO_FILE) as f:
return json.load(f)
data = json.loads(packages.decode())
def get_entries(entry_type):
data = load()
load()
return data[entry_type]
def get_entry(entry_type, name):
def get_entry(entry_type, name, exception):
try:
return get_entries(entry_type)[name]
except KeyError as e:
raise exception(name) from e
def get_images():
return get_entries(TYPE_IMAGE)
def get_image(image_name):
try:
return get_entry(TYPE_IMAGE, image_name)
except KeyError as e:
raise ImageNotFoundError(image_name) from e
return get_entry(TYPE_IMAGE, image_name, ImageNotFoundError)
def get_apps():
return get_entries(TYPE_APP)
def get_app(app_name):
try:
return get_entry(TYPE_APP, app_name)
except KeyError as e:
raise ImageNotFoundError(app_name) from e
return get_entry(TYPE_APP, app_name, AppNotFoundError)

View File

@ -1,17 +1,16 @@
# -*- coding: utf-8 -*-
import fcntl
import json
import os
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec, utils
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from .exceptions import AppNotFoundError, ImageNotFoundError
from .flock import lock_ex
from .config import PUB_LOCK_FILE, PUB_PRIVKEY_FILE, PUB_REPO_FILE, PUB_SIG_FILE
TYPE_APP = 'apps'
TYPE_IMAGE = 'images'
from .config import PUB_LOCK_FILE, PUB_PRIVKEY_FILE, PUB_REPO_FILE, PUB_SIG_FILE, TYPE_APP, TYPE_IMAGE
from .flock import locked
class TarSizeCounter:
def __init__(self):
@ -35,41 +34,55 @@ def sign_file(file_path):
private_key = load_pem_private_key(f.read(), None, default_backend())
return private_key.sign(hasher.finalize(), ec.ECDSA(utils.Prehashed(sha512)))
def load():
try:
with open(PUB_REPO_FILE) as f:
return json.load(f)
except FileNotFoundError:
return {TYPE_IMAGE: {}, TYPE_APP: {}}
data = {TYPE_IMAGE: {}, TYPE_APP: {}}
mtime = None
def save(data):
def load():
global data
global mtime
try:
file_mtime = os.stat(PUB_REPO_FILE).st_mtime
if mtime != file_mtime:
with open(PUB_REPO_FILE) as f:
data = json.load(f)
mtime = file_mtime
except FileNotFoundError:
pass
def save():
global mtime
# Open the repository file in read + write mode using exclusive lock
with open(PUB_REPO_FILE, 'w') as f:
json.dump(data, f, sort_keys=True, indent=4)
mtime = os.stat(PUB_REPO_FILE).st_mtime
# Cryptographically sign the repository file
signature = sign_file(PUB_REPO_FILE)
with open(PUB_SIG_FILE, 'wb') as f:
f.write(signature)
@locked(PUB_LOCK_FILE)
def get_entries(entry_type):
with lock_ex(PUB_LOCK_FILE):
data = load()
load()
return data[entry_type]
def get_entry(entry_type, name):
def get_entry(entry_type, name, exception):
try:
return get_entries(entry_type)[name]
except KeyError as e:
raise exception(name) from e
@locked(PUB_LOCK_FILE)
def add_entry(entry_type, name, definition):
with lock_ex(PUB_LOCK_FILE):
data = load()
load()
data[entry_type][name] = definition
save(data)
save()
@locked(PUB_LOCK_FILE)
def delete_entry(entry_type, name):
with lock_ex(PUB_LOCK_FILE):
data = load()
load()
try:
del data[entry_type][name]
save(data)
save()
except KeyError:
pass
@ -77,10 +90,7 @@ def get_images():
return get_entries(TYPE_IMAGE)
def get_image(image_name):
try:
return get_entry(TYPE_IMAGE, image_name)
except KeyError as e:
raise ImageNotFoundError(image_name) from e
return get_entry(TYPE_IMAGE, image_name, ImageNotFoundError)
def register_image(image_name, definition):
add_entry(TYPE_IMAGE, image_name, definition)
@ -92,10 +102,7 @@ def get_apps():
return get_entries(TYPE_APP)
def get_app(app_name):
try:
return get_entry(TYPE_APP, app_name)
except KeyError as e:
raise ImageNotFoundError(app_name) from e
return get_entry(TYPE_APP, app_name, ImageNotFoundError)
def register_app(app_name, definition):
add_entry(TYPE_APP, app_name, definition)