diff --git a/usr/lib/python3.6/vmmgr/appmgr.py b/usr/lib/python3.6/vmmgr/appmgr.py index c1f63f6..577e05f 100644 --- a/usr/lib/python3.6/vmmgr/appmgr.py +++ b/usr/lib/python3.6/vmmgr/appmgr.py @@ -8,14 +8,9 @@ import shutil import subprocess from cryptography.exceptions import InvalidSignature -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import hashes -from cryptography.hazmat.primitives.asymmetric import ec -from cryptography.hazmat.primitives.serialization import load_pem_public_key -from . import tools +from . import crypto -PUB_FILE = '/etc/vmmgr/packages.pub' LXC_ROOT = '/var/lib/lxc' class InstallItem: @@ -45,9 +40,7 @@ class AppMgr: return packages.status_code packages = packages.content packages_sig = self.get_repo_resource('packages.sig').content - with open(PUB_FILE, 'rb') as f: - pub_key = load_pem_public_key(f.read(), default_backend()) - pub_key.verify(packages_sig, packages, ec.ECDSA(hashes.SHA512())) + crypto.verify_signature(packages, packages_sig) online_packages = json.loads(packages) # Minimze the time when self.online_packages is out of sync self.online_packages = online_packages diff --git a/usr/lib/python3.6/vmmgr/crypto.py b/usr/lib/python3.6/vmmgr/crypto.py new file mode 100644 index 0000000..7d409bc --- /dev/null +++ b/usr/lib/python3.6/vmmgr/crypto.py @@ -0,0 +1,95 @@ +# -*- coding: utf-8 -*- + +import bcrypt +import datetime +import os + +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives.serialization import load_pem_public_key +from cryptography.x509.oid import NameOID, ExtendedKeyUsageOID + +CERT_PUB_FILE = '/etc/ssl/services.pem' +CERT_KEY_FILE = '/etc/ssl/services.key' +SIG_PUB_FILE = '/etc/vmmgr/packages.pub' + +def create_cert(domain): + # Create selfsigned certificate with wildcard alternative subject name + private_key = ec.generate_private_key(ec.SECP384R1(), default_backend()) + public_key = private_key.public_key() + subject = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, domain)]) + now = datetime.datetime.utcnow() + cert = x509.CertificateBuilder() \ + .subject_name(subject) \ + .issuer_name(subject) \ + .public_key(public_key) \ + .serial_number(x509.random_serial_number()) \ + .not_valid_before(now) \ + .not_valid_after(now + datetime.timedelta(days=7305)) \ + .add_extension( + x509.SubjectAlternativeName(( + x509.DNSName(domain), + x509.DNSName('*.{}'.format(domain)), + )), + critical=False) \ + .add_extension( + x509.SubjectKeyIdentifier.from_public_key(public_key), + critical=False) \ + .add_extension( + x509.AuthorityKeyIdentifier.from_issuer_public_key(public_key), + critical=False) \ + .add_extension( + x509.BasicConstraints(ca=False, path_length=None), + critical=True) \ + .add_extension( + x509.KeyUsage(digital_signature=True, + content_commitment=False, + key_encipherment=False, + data_encipherment=False, + key_agreement=False, + key_cert_sign=False, + crl_sign=False, + encipher_only=False, + decipher_only=False), + critical=True) \ + .add_extension( + x509.ExtendedKeyUsage(( + ExtendedKeyUsageOID.SERVER_AUTH, + ExtendedKeyUsageOID.CLIENT_AUTH)), + critical=False) \ + .sign(private_key, hashes.SHA256(), default_backend()) + with open(CERT_PUB_FILE, 'wb') as f: + f.write(cert.public_bytes(serialization.Encoding.PEM)) + with open(CERT_KEY_FILE, 'wb') as f: + f.write(private_key.private_bytes(serialization.Encoding.PEM, serialization.PrivateFormat.PKCS8, serialization.NoEncryption())) + os.chmod(CERT_KEY_FILE, 0o640) + +def get_cert_info(): + # Gather certificate data important for setup-host + with open(CERT_PUB_FILE, 'rb') as f: + cert = x509.load_pem_x509_certificate(f.read(), default_backend()) + data = {'subject': cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value, + 'issuer': cert.issuer.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value, + 'expires': '{} UTC'.format(cert.not_valid_after), + 'method': 'manual'} + if os.path.exists('/etc/periodic/daily/acme-sh'): + data['method'] = 'letsencrypt' + # Naive method of inferring if the cert is selfsigned + # Good enough as reputable CAs will never have the same subject and issuer CN + # and the 'method' field is used just to populate a GUI element and not for any real cryptography + elif data['subject'] == data['issuer']: + data['method'] = 'selfsigned' + return data + +def verify_signature(file, signature): + with open(SIG_PUB_FILE, 'rb') as f: + pub_key = load_pem_public_key(f.read(), default_backend()) + pub_key.verify(packages_sig, packages, ec.ECDSA(hashes.SHA512())) + +def adminpwd_hash(password): + return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode() + +def adminpwd_verify(password, hash): + return bcrypt.checkpw(password.encode(), hash.encode()) diff --git a/usr/lib/python3.6/vmmgr/templates.py b/usr/lib/python3.6/vmmgr/templates.py index 570d687..e55d98e 100644 --- a/usr/lib/python3.6/vmmgr/templates.py +++ b/usr/lib/python3.6/vmmgr/templates.py @@ -117,11 +117,3 @@ ACME_CRON = '''#!/bin/sh [ -x /usr/bin/acme.sh ] && /usr/bin/acme.sh --cron >/dev/null ''' - -CERT_SAN = '''[ req ] -distinguished_name = dn -x509_extensions = ext -[ dn ] -[ ext ] -subjectAltName=DNS:{domain},DNS:*.{domain}" -''' diff --git a/usr/lib/python3.6/vmmgr/tools.py b/usr/lib/python3.6/vmmgr/tools.py index 86887eb..7c376c0 100644 --- a/usr/lib/python3.6/vmmgr/tools.py +++ b/usr/lib/python3.6/vmmgr/tools.py @@ -1,6 +1,5 @@ # -*- coding: utf-8 -*- -import bcrypt import dns.exception import dns.resolver import os @@ -59,11 +58,3 @@ def ping_url(url): raise except: return False - -# Admin password tools - -def adminpwd_hash(password): - return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode() - -def adminpwd_verify(password, hash): - return bcrypt.checkpw(password.encode(), hash.encode()) diff --git a/usr/lib/python3.6/vmmgr/vmmgr.py b/usr/lib/python3.6/vmmgr/vmmgr.py index 3e3b57b..1a6de4a 100644 --- a/usr/lib/python3.6/vmmgr/vmmgr.py +++ b/usr/lib/python3.6/vmmgr/vmmgr.py @@ -5,10 +5,7 @@ import os import shutil import subprocess -from cryptography import x509 -from cryptography.hazmat.backends import default_backend -from cryptography.x509.oid import NameOID - +from . import crypto from . import templates from . import tools from .config import Config @@ -18,9 +15,6 @@ VERSION = '0.0.1' ISSUE_FILE = '/etc/issue' NGINX_DIR = '/etc/nginx/conf.d' ACME_CRON = '/etc/periodic/daily/acme-sh' -CERT_PUB_FILE = '/etc/ssl/services.pem' -CERT_KEY_FILE = '/etc/ssl/services.key' -CERT_SAN_FILE = '/etc/ssl/san.cnf' class VMMgr: def __init__(self, conf): @@ -66,7 +60,7 @@ class VMMgr: input = '{}\n{}'.format(oldpassword, newpassword).encode() subprocess.run(['cryptsetup', 'luksChangeKey', '/dev/sda2'], input=input, check=True) # Update bcrypt-hashed password in config - self.conf['host']['adminpwd'] = tools.adminpwd_hash(newpassword) + self.conf['host']['adminpwd'] = crypto.adminpwd_hash(newpassword) # Save config to file self.conf.save() @@ -75,10 +69,7 @@ class VMMgr: if os.path.exists(ACME_CRON): os.unlink(ACME_CRON) # Create selfsigned certificate with wildcard alternative subject name - with open(os.path.join(CERT_SAN_FILE), 'w') as f: - f.write(templates.CERT_SAN.format(domain=self.domain)) - subprocess.run(['openssl', 'req', '-config', CERT_SAN_FILE, '-x509', '-new', '-out', CERT_PUB_FILE, '-keyout', CERT_KEY_FILE, '-nodes', '-days', '7305', '-subj', '/CN={}'.format(self.domain)], check=True) - os.chmod(CERT_KEY_FILE, 0o640) + crypto.create_cert(self.domain) def request_acme_cert(self): # Remove all possible conflicting certificates requested in the past @@ -103,7 +94,7 @@ class VMMgr: if e.returncode != 2: raise # Install the issued certificate - subprocess.run(['/usr/bin/acme.sh', '--install-cert', '-d', self.domain, '--key-file', CERT_KEY_FILE, '--fullchain-file', CERT_PUB_FILE, '--reloadcmd', '/sbin/service nginx reload'], check=True) + subprocess.run(['/usr/bin/acme.sh', '--install-cert', '-d', self.domain, '--key-file', crypto.CERT_KEY_FILE, '--fullchain-file', crypto.CERT_PUB_FILE, '--reloadcmd', '/sbin/service nginx reload'], check=True) # Install acme.sh cronjob with open(ACME_CRON, 'w') as f: f.write(templates.ACME_CRON) @@ -113,27 +104,12 @@ class VMMgr: if os.path.exists(ACME_CRON): os.unlink(ACME_CRON) # Copy certificate files - shutil.copyfile(public_file, CERT_PUB_FILE) - shutil.copyfile(private_file, CERT_KEY_FILE) - os.chmod(CERT_KEY_FILE, 0o640) + shutil.copyfile(public_file, crypto.CERT_PUB_FILE) + shutil.copyfile(private_file, crypto.CERT_KEY_FILE) + os.chmod(crypto.CERT_KEY_FILE, 0o640) # Reload nginx self.reload_nginx() - def get_cert_info(self): - # Gather certificate data important for setup-host - with open(CERT_PUB_FILE, 'rb') as f: - cert = x509.load_pem_x509_certificate(f.read(), default_backend()) - data = {'subject': cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value, - 'issuer': cert.issuer.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value, - 'expires': '{} UTC'.format(cert.not_valid_after), - 'method': 'manual'} - if os.path.exists('/etc/periodic/daily/acme-sh'): - data['method'] = 'letsencrypt' - # This is really naive method of inferring if the cert is selfsigned and should never be used in production :) - elif data['subject'] == data['issuer']: - data['method'] = 'selfsigned' - return data - def prepare_container(self): # Extract the variables from values given via lxc.hook.pre-start hook app = os.environ['LXC_NAME'] diff --git a/usr/lib/python3.6/vmmgr/wsgiapp.py b/usr/lib/python3.6/vmmgr/wsgiapp.py index 1cc4e7b..f9b1e2b 100644 --- a/usr/lib/python3.6/vmmgr/wsgiapp.py +++ b/usr/lib/python3.6/vmmgr/wsgiapp.py @@ -12,6 +12,7 @@ from jinja2 import Environment, FileSystemLoader from cryptography.exceptions import InvalidSignature +from . import crypto from . import tools from . import validator from .actionqueue import ActionQueue @@ -128,7 +129,7 @@ class WSGIApp(object): def login_action(self, request): password = request.form['password'] redir = request.form['redir'] - if tools.adminpwd_verify(password, self.conf['host']['adminpwd']): + if crypto.adminpwd_verify(password, self.conf['host']['adminpwd']): request.session['admin'] = True return redirect('/{}'.format(redir)) request.session['msg'] = 'login:error:{}'.format(request.session.lang.bad_password()) @@ -151,7 +152,7 @@ class WSGIApp(object): ex_ipv6 = tools.get_external_ip(6) in_ipv4 = tools.get_local_ip(4) in_ipv6 = tools.get_local_ip(6) - cert_info = self.vmmgr.get_cert_info() + cert_info = crypto.get_cert_info() return self.render_html('setup-host.html', request, ex_ipv4=ex_ipv4, ex_ipv6=ex_ipv6, in_ipv4=in_ipv4, in_ipv6=in_ipv6, cert_info=cert_info) def setup_apps_view(self, request):