134 lines
4.9 KiB
Python

# -*- coding: utf-8 -*-
import copy
import hashlib
import json
import os
import requests
import tarfile
import tempfile
import time
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec, utils
from cryptography.hazmat.primitives.serialization import load_pem_public_key
from .exceptions import AppNotFoundError, ImageNotFoundError
from .config import ONLINE_REPO_FILE, ONLINE_PUBKEY, ONLINE_REPO_URL, ONLINE_SIG_URL
TYPE_APP = 'apps'
TYPE_IMAGE = 'images'
def get_pubkey():
pubkey = f'-----BEGIN PUBLIC KEY-----\n{ONLINE_PUBKEY}\n-----END PUBLIC KEY-----'
return load_pem_public_key(pubkey.encode(), default_backend())
PUBLIC_KEY = get_pubkey()
# TODO: HTTP Error handling for all downloads (including imagebuilder)
def verify_fileobj(fileobj, expected_hash):
hasher = hashes.Hash(hashes.SHA512(), default_backend())
while True:
data = fileobj.read(64*1024)
if not data:
break
hasher.update(data)
PUBLIC_KEY.verify(bytes.fromhex(expected_hash), hasher.finalize(), ec.ECDSA(utils.Prehashed(hashes.SHA512())))
def download_archive(archive_url, archive_path, expected_hash, observer):
# Check if an archive needs to be downloaded via http(s)
do_download = True
# If the file already exists in the temporary directory, verify the signature
if os.path.exists(archive_path):
try:
with open(archive_path, 'rb') as f:
verify_fileobj(f, expected_hash)
# If the signature matches, skip download
observer.units_done = os.path.getsize(archive_path)
do_download = False
except InvalidSignature:
pass
if do_download:
# Download archive via http(s) and store in temporary directory
with open(archive_path, 'wb') as f, requests.Session() as session:
resource = session.get(archive_url, stream=True)
for chunk in resource.iter_content(chunk_size=64*1024):
if chunk:
observer.units_done += f.write(chunk)
def unpack_archive(archive_path, destination, expected_hash, observer):
with open(archive_path, 'rb') as f:
# Verify file object, then seek back and open it as tar without losing handle, thus preventing posible malicious race conditions
verify_fileobj(f, expected_hash)
f.seek(0)
tar = tarfile.open(fileobj=f)
# Extract the tar members while counting their size, taken from https://github.com/python/cpython/blob/master/Lib/tarfile.py
directories = []
for tarinfo in tar:
if tarinfo.isdir():
# Extract directories with a safe mode
directories.append(tarinfo)
tarinfo = copy.copy(tarinfo)
tarinfo.mode = 0o700
# Do not set_attrs directories, as we will do that further down
tar.extract(tarinfo, destination, set_attrs=not tarinfo.isdir(), numeric_owner=True)
observer.units_done += tarinfo.size
# Reverse sort directories
directories.sort(key=lambda a: a.name)
directories.reverse()
# Set correct owner, mtime and filemode on directories
for tarinfo in directories:
dirpath = os.path.join(destination, tarinfo.name)
try:
tar.chown(tarinfo, dirpath, numeric_owner=True)
tar.utime(tarinfo, dirpath)
tar.chmod(tarinfo, dirpath)
except ExtractError as e:
if tar.errorlevel > 1:
raise
# Remove the archive
os.unlink(archive_path)
def download_metadata():
with requests.Session() as session:
packages = session.get(ONLINE_REPO_URL, timeout=5).content
packages_sig = session.get(ONLINE_SIG_URL, timeout=5).content
PUBLIC_KEY.verify(packages_sig, packages, ec.ECDSA(hashes.SHA512()))
with open(ONLINE_REPO_FILE, 'wb') as f:
f.write(packages)
def load():
if not os.path.exists(ONLINE_REPO_FILE) or os.stat(ONLINE_REPO_FILE).st_mtime+300 < time.time():
# Download and the metadata file if local cache doesn't exist or is older than 5 minutes
download_metadata()
with open(ONLINE_REPO_FILE) as f:
return json.load(f)
def get_entries(entry_type):
data = load()
return data[entry_type]
def get_entry(entry_type, name):
return get_entries(entry_type)[name]
def get_images():
return get_entries(TYPE_IMAGE)
def get_image(image_name):
try:
return get_entry(TYPE_IMAGE, image_name)
except KeyError as e:
raise ImageNotFoundError(image_name) from e
def get_apps():
return get_entries(TYPE_APP)
def get_app(app_name):
try:
return get_entry(TYPE_APP, image_name)
except KeyError as e:
raise ImageNotFoundError(image_name) from e