spoc/usr/lib/python3.8/spoc/repo_publish.py

115 lines
3.1 KiB
Python

# -*- coding: utf-8 -*-
import fcntl
import json
import os
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec, utils
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from . import config
from .exceptions import AppNotFoundError, ImageNotFoundError
from .flock import locked
TYPE_APP = 'apps'
TYPE_IMAGE = 'images'
class TarSizeCounter:
def __init__(self):
self.size = 0
def add_file(self, tarinfo):
self.size += tarinfo.size
return tarinfo
def sign_file(file_path):
# Generate ECDSA HMAC SHA512 signature of a file using EC private key
sha512 = hashes.SHA512()
hasher = hashes.Hash(sha512, default_backend())
with open(file_path, 'rb') as f:
while True:
data = f.read(64*1024)
if not data:
break
hasher.update(data)
with open(config.PUB_PRIVKEY_FILE, 'rb') as f:
private_key = load_pem_private_key(f.read(), None, default_backend())
return private_key.sign(hasher.finalize(), ec.ECDSA(utils.Prehashed(sha512)))
data = {TYPE_IMAGE: {}, TYPE_APP: {}}
mtime = 0
def load():
global data
global mtime
try:
file_mtime = os.stat(config.PUB_REPO_FILE).st_mtime
if mtime != file_mtime:
with open(config.PUB_REPO_FILE) as f:
data = json.load(f)
mtime = file_mtime
except FileNotFoundError:
pass
def save():
global mtime
# Open the repository file in read + write mode using exclusive lock
with open(config.PUB_REPO_FILE, 'w') as f:
json.dump(data, f, sort_keys=True, indent=4)
mtime = os.stat(config.PUB_REPO_FILE).st_mtime
# Cryptographically sign the repository file
signature = sign_file(config.PUB_REPO_FILE)
with open(config.PUB_SIG_FILE, 'wb') as f:
f.write(signature)
@locked(config.PUB_LOCK_FILE)
def get_entries(entry_type):
load()
return data[entry_type]
def get_entry(entry_type, name, exception):
try:
return get_entries(entry_type)[name]
except KeyError as e:
raise exception(name) from e
@locked(config.PUB_LOCK_FILE)
def add_entry(entry_type, name, definition):
load()
data[entry_type][name] = definition
save()
@locked(config.PUB_LOCK_FILE)
def delete_entry(entry_type, name):
load()
try:
del data[entry_type][name]
save()
except KeyError:
pass
def get_images():
return get_entries(TYPE_IMAGE)
def get_image(image_name):
return get_entry(TYPE_IMAGE, image_name, ImageNotFoundError)
def register_image(image_name, definition):
add_entry(TYPE_IMAGE, image_name, definition)
def unregister_image(image_name):
delete_entry(TYPE_IMAGE, image_name)
def get_apps():
return get_entries(TYPE_APP)
def get_app(app_name):
return get_entry(TYPE_APP, app_name, ImageNotFoundError)
def register_app(app_name, definition):
add_entry(TYPE_APP, app_name, definition)
def unregister_app(app_name):
delete_entry(TYPE_APP, app_name)