Move crypto-related functions to a separate file
This commit is contained in:
parent
c2b383e5c8
commit
3e0363b904
@ -8,14 +8,9 @@ import shutil
|
|||||||
import subprocess
|
import subprocess
|
||||||
|
|
||||||
from cryptography.exceptions import InvalidSignature
|
from cryptography.exceptions import InvalidSignature
|
||||||
from cryptography.hazmat.backends import default_backend
|
|
||||||
from cryptography.hazmat.primitives import hashes
|
|
||||||
from cryptography.hazmat.primitives.asymmetric import ec
|
|
||||||
from cryptography.hazmat.primitives.serialization import load_pem_public_key
|
|
||||||
|
|
||||||
from . import tools
|
from . import crypto
|
||||||
|
|
||||||
PUB_FILE = '/etc/vmmgr/packages.pub'
|
|
||||||
LXC_ROOT = '/var/lib/lxc'
|
LXC_ROOT = '/var/lib/lxc'
|
||||||
|
|
||||||
class InstallItem:
|
class InstallItem:
|
||||||
@ -45,9 +40,7 @@ class AppMgr:
|
|||||||
return packages.status_code
|
return packages.status_code
|
||||||
packages = packages.content
|
packages = packages.content
|
||||||
packages_sig = self.get_repo_resource('packages.sig').content
|
packages_sig = self.get_repo_resource('packages.sig').content
|
||||||
with open(PUB_FILE, 'rb') as f:
|
crypto.verify_signature(packages, packages_sig)
|
||||||
pub_key = load_pem_public_key(f.read(), default_backend())
|
|
||||||
pub_key.verify(packages_sig, packages, ec.ECDSA(hashes.SHA512()))
|
|
||||||
online_packages = json.loads(packages)
|
online_packages = json.loads(packages)
|
||||||
# Minimze the time when self.online_packages is out of sync
|
# Minimze the time when self.online_packages is out of sync
|
||||||
self.online_packages = online_packages
|
self.online_packages = online_packages
|
||||||
|
95
usr/lib/python3.6/vmmgr/crypto.py
Normal file
95
usr/lib/python3.6/vmmgr/crypto.py
Normal file
@ -0,0 +1,95 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
import bcrypt
|
||||||
|
import datetime
|
||||||
|
import os
|
||||||
|
|
||||||
|
from cryptography import x509
|
||||||
|
from cryptography.hazmat.backends import default_backend
|
||||||
|
from cryptography.hazmat.primitives import hashes, serialization
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import ec
|
||||||
|
from cryptography.hazmat.primitives.serialization import load_pem_public_key
|
||||||
|
from cryptography.x509.oid import NameOID, ExtendedKeyUsageOID
|
||||||
|
|
||||||
|
CERT_PUB_FILE = '/etc/ssl/services.pem'
|
||||||
|
CERT_KEY_FILE = '/etc/ssl/services.key'
|
||||||
|
SIG_PUB_FILE = '/etc/vmmgr/packages.pub'
|
||||||
|
|
||||||
|
def create_cert(domain):
|
||||||
|
# Create selfsigned certificate with wildcard alternative subject name
|
||||||
|
private_key = ec.generate_private_key(ec.SECP384R1(), default_backend())
|
||||||
|
public_key = private_key.public_key()
|
||||||
|
subject = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, domain)])
|
||||||
|
now = datetime.datetime.utcnow()
|
||||||
|
cert = x509.CertificateBuilder() \
|
||||||
|
.subject_name(subject) \
|
||||||
|
.issuer_name(subject) \
|
||||||
|
.public_key(public_key) \
|
||||||
|
.serial_number(x509.random_serial_number()) \
|
||||||
|
.not_valid_before(now) \
|
||||||
|
.not_valid_after(now + datetime.timedelta(days=7305)) \
|
||||||
|
.add_extension(
|
||||||
|
x509.SubjectAlternativeName((
|
||||||
|
x509.DNSName(domain),
|
||||||
|
x509.DNSName('*.{}'.format(domain)),
|
||||||
|
)),
|
||||||
|
critical=False) \
|
||||||
|
.add_extension(
|
||||||
|
x509.SubjectKeyIdentifier.from_public_key(public_key),
|
||||||
|
critical=False) \
|
||||||
|
.add_extension(
|
||||||
|
x509.AuthorityKeyIdentifier.from_issuer_public_key(public_key),
|
||||||
|
critical=False) \
|
||||||
|
.add_extension(
|
||||||
|
x509.BasicConstraints(ca=False, path_length=None),
|
||||||
|
critical=True) \
|
||||||
|
.add_extension(
|
||||||
|
x509.KeyUsage(digital_signature=True,
|
||||||
|
content_commitment=False,
|
||||||
|
key_encipherment=False,
|
||||||
|
data_encipherment=False,
|
||||||
|
key_agreement=False,
|
||||||
|
key_cert_sign=False,
|
||||||
|
crl_sign=False,
|
||||||
|
encipher_only=False,
|
||||||
|
decipher_only=False),
|
||||||
|
critical=True) \
|
||||||
|
.add_extension(
|
||||||
|
x509.ExtendedKeyUsage((
|
||||||
|
ExtendedKeyUsageOID.SERVER_AUTH,
|
||||||
|
ExtendedKeyUsageOID.CLIENT_AUTH)),
|
||||||
|
critical=False) \
|
||||||
|
.sign(private_key, hashes.SHA256(), default_backend())
|
||||||
|
with open(CERT_PUB_FILE, 'wb') as f:
|
||||||
|
f.write(cert.public_bytes(serialization.Encoding.PEM))
|
||||||
|
with open(CERT_KEY_FILE, 'wb') as f:
|
||||||
|
f.write(private_key.private_bytes(serialization.Encoding.PEM, serialization.PrivateFormat.PKCS8, serialization.NoEncryption()))
|
||||||
|
os.chmod(CERT_KEY_FILE, 0o640)
|
||||||
|
|
||||||
|
def get_cert_info():
|
||||||
|
# Gather certificate data important for setup-host
|
||||||
|
with open(CERT_PUB_FILE, 'rb') as f:
|
||||||
|
cert = x509.load_pem_x509_certificate(f.read(), default_backend())
|
||||||
|
data = {'subject': cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value,
|
||||||
|
'issuer': cert.issuer.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value,
|
||||||
|
'expires': '{} UTC'.format(cert.not_valid_after),
|
||||||
|
'method': 'manual'}
|
||||||
|
if os.path.exists('/etc/periodic/daily/acme-sh'):
|
||||||
|
data['method'] = 'letsencrypt'
|
||||||
|
# Naive method of inferring if the cert is selfsigned
|
||||||
|
# Good enough as reputable CAs will never have the same subject and issuer CN
|
||||||
|
# and the 'method' field is used just to populate a GUI element and not for any real cryptography
|
||||||
|
elif data['subject'] == data['issuer']:
|
||||||
|
data['method'] = 'selfsigned'
|
||||||
|
return data
|
||||||
|
|
||||||
|
def verify_signature(file, signature):
|
||||||
|
with open(SIG_PUB_FILE, 'rb') as f:
|
||||||
|
pub_key = load_pem_public_key(f.read(), default_backend())
|
||||||
|
pub_key.verify(packages_sig, packages, ec.ECDSA(hashes.SHA512()))
|
||||||
|
|
||||||
|
def adminpwd_hash(password):
|
||||||
|
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
|
||||||
|
|
||||||
|
def adminpwd_verify(password, hash):
|
||||||
|
return bcrypt.checkpw(password.encode(), hash.encode())
|
@ -117,11 +117,3 @@ ACME_CRON = '''#!/bin/sh
|
|||||||
|
|
||||||
[ -x /usr/bin/acme.sh ] && /usr/bin/acme.sh --cron >/dev/null
|
[ -x /usr/bin/acme.sh ] && /usr/bin/acme.sh --cron >/dev/null
|
||||||
'''
|
'''
|
||||||
|
|
||||||
CERT_SAN = '''[ req ]
|
|
||||||
distinguished_name = dn
|
|
||||||
x509_extensions = ext
|
|
||||||
[ dn ]
|
|
||||||
[ ext ]
|
|
||||||
subjectAltName=DNS:{domain},DNS:*.{domain}"
|
|
||||||
'''
|
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
import bcrypt
|
|
||||||
import dns.exception
|
import dns.exception
|
||||||
import dns.resolver
|
import dns.resolver
|
||||||
import os
|
import os
|
||||||
@ -59,11 +58,3 @@ def ping_url(url):
|
|||||||
raise
|
raise
|
||||||
except:
|
except:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# Admin password tools
|
|
||||||
|
|
||||||
def adminpwd_hash(password):
|
|
||||||
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
|
|
||||||
|
|
||||||
def adminpwd_verify(password, hash):
|
|
||||||
return bcrypt.checkpw(password.encode(), hash.encode())
|
|
||||||
|
@ -5,10 +5,7 @@ import os
|
|||||||
import shutil
|
import shutil
|
||||||
import subprocess
|
import subprocess
|
||||||
|
|
||||||
from cryptography import x509
|
from . import crypto
|
||||||
from cryptography.hazmat.backends import default_backend
|
|
||||||
from cryptography.x509.oid import NameOID
|
|
||||||
|
|
||||||
from . import templates
|
from . import templates
|
||||||
from . import tools
|
from . import tools
|
||||||
from .config import Config
|
from .config import Config
|
||||||
@ -18,9 +15,6 @@ VERSION = '0.0.1'
|
|||||||
ISSUE_FILE = '/etc/issue'
|
ISSUE_FILE = '/etc/issue'
|
||||||
NGINX_DIR = '/etc/nginx/conf.d'
|
NGINX_DIR = '/etc/nginx/conf.d'
|
||||||
ACME_CRON = '/etc/periodic/daily/acme-sh'
|
ACME_CRON = '/etc/periodic/daily/acme-sh'
|
||||||
CERT_PUB_FILE = '/etc/ssl/services.pem'
|
|
||||||
CERT_KEY_FILE = '/etc/ssl/services.key'
|
|
||||||
CERT_SAN_FILE = '/etc/ssl/san.cnf'
|
|
||||||
|
|
||||||
class VMMgr:
|
class VMMgr:
|
||||||
def __init__(self, conf):
|
def __init__(self, conf):
|
||||||
@ -66,7 +60,7 @@ class VMMgr:
|
|||||||
input = '{}\n{}'.format(oldpassword, newpassword).encode()
|
input = '{}\n{}'.format(oldpassword, newpassword).encode()
|
||||||
subprocess.run(['cryptsetup', 'luksChangeKey', '/dev/sda2'], input=input, check=True)
|
subprocess.run(['cryptsetup', 'luksChangeKey', '/dev/sda2'], input=input, check=True)
|
||||||
# Update bcrypt-hashed password in config
|
# Update bcrypt-hashed password in config
|
||||||
self.conf['host']['adminpwd'] = tools.adminpwd_hash(newpassword)
|
self.conf['host']['adminpwd'] = crypto.adminpwd_hash(newpassword)
|
||||||
# Save config to file
|
# Save config to file
|
||||||
self.conf.save()
|
self.conf.save()
|
||||||
|
|
||||||
@ -75,10 +69,7 @@ class VMMgr:
|
|||||||
if os.path.exists(ACME_CRON):
|
if os.path.exists(ACME_CRON):
|
||||||
os.unlink(ACME_CRON)
|
os.unlink(ACME_CRON)
|
||||||
# Create selfsigned certificate with wildcard alternative subject name
|
# Create selfsigned certificate with wildcard alternative subject name
|
||||||
with open(os.path.join(CERT_SAN_FILE), 'w') as f:
|
crypto.create_cert(self.domain)
|
||||||
f.write(templates.CERT_SAN.format(domain=self.domain))
|
|
||||||
subprocess.run(['openssl', 'req', '-config', CERT_SAN_FILE, '-x509', '-new', '-out', CERT_PUB_FILE, '-keyout', CERT_KEY_FILE, '-nodes', '-days', '7305', '-subj', '/CN={}'.format(self.domain)], check=True)
|
|
||||||
os.chmod(CERT_KEY_FILE, 0o640)
|
|
||||||
|
|
||||||
def request_acme_cert(self):
|
def request_acme_cert(self):
|
||||||
# Remove all possible conflicting certificates requested in the past
|
# Remove all possible conflicting certificates requested in the past
|
||||||
@ -103,7 +94,7 @@ class VMMgr:
|
|||||||
if e.returncode != 2:
|
if e.returncode != 2:
|
||||||
raise
|
raise
|
||||||
# Install the issued certificate
|
# Install the issued certificate
|
||||||
subprocess.run(['/usr/bin/acme.sh', '--install-cert', '-d', self.domain, '--key-file', CERT_KEY_FILE, '--fullchain-file', CERT_PUB_FILE, '--reloadcmd', '/sbin/service nginx reload'], check=True)
|
subprocess.run(['/usr/bin/acme.sh', '--install-cert', '-d', self.domain, '--key-file', crypto.CERT_KEY_FILE, '--fullchain-file', crypto.CERT_PUB_FILE, '--reloadcmd', '/sbin/service nginx reload'], check=True)
|
||||||
# Install acme.sh cronjob
|
# Install acme.sh cronjob
|
||||||
with open(ACME_CRON, 'w') as f:
|
with open(ACME_CRON, 'w') as f:
|
||||||
f.write(templates.ACME_CRON)
|
f.write(templates.ACME_CRON)
|
||||||
@ -113,27 +104,12 @@ class VMMgr:
|
|||||||
if os.path.exists(ACME_CRON):
|
if os.path.exists(ACME_CRON):
|
||||||
os.unlink(ACME_CRON)
|
os.unlink(ACME_CRON)
|
||||||
# Copy certificate files
|
# Copy certificate files
|
||||||
shutil.copyfile(public_file, CERT_PUB_FILE)
|
shutil.copyfile(public_file, crypto.CERT_PUB_FILE)
|
||||||
shutil.copyfile(private_file, CERT_KEY_FILE)
|
shutil.copyfile(private_file, crypto.CERT_KEY_FILE)
|
||||||
os.chmod(CERT_KEY_FILE, 0o640)
|
os.chmod(crypto.CERT_KEY_FILE, 0o640)
|
||||||
# Reload nginx
|
# Reload nginx
|
||||||
self.reload_nginx()
|
self.reload_nginx()
|
||||||
|
|
||||||
def get_cert_info(self):
|
|
||||||
# Gather certificate data important for setup-host
|
|
||||||
with open(CERT_PUB_FILE, 'rb') as f:
|
|
||||||
cert = x509.load_pem_x509_certificate(f.read(), default_backend())
|
|
||||||
data = {'subject': cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value,
|
|
||||||
'issuer': cert.issuer.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value,
|
|
||||||
'expires': '{} UTC'.format(cert.not_valid_after),
|
|
||||||
'method': 'manual'}
|
|
||||||
if os.path.exists('/etc/periodic/daily/acme-sh'):
|
|
||||||
data['method'] = 'letsencrypt'
|
|
||||||
# This is really naive method of inferring if the cert is selfsigned and should never be used in production :)
|
|
||||||
elif data['subject'] == data['issuer']:
|
|
||||||
data['method'] = 'selfsigned'
|
|
||||||
return data
|
|
||||||
|
|
||||||
def prepare_container(self):
|
def prepare_container(self):
|
||||||
# Extract the variables from values given via lxc.hook.pre-start hook
|
# Extract the variables from values given via lxc.hook.pre-start hook
|
||||||
app = os.environ['LXC_NAME']
|
app = os.environ['LXC_NAME']
|
||||||
|
@ -12,6 +12,7 @@ from jinja2 import Environment, FileSystemLoader
|
|||||||
|
|
||||||
from cryptography.exceptions import InvalidSignature
|
from cryptography.exceptions import InvalidSignature
|
||||||
|
|
||||||
|
from . import crypto
|
||||||
from . import tools
|
from . import tools
|
||||||
from . import validator
|
from . import validator
|
||||||
from .actionqueue import ActionQueue
|
from .actionqueue import ActionQueue
|
||||||
@ -128,7 +129,7 @@ class WSGIApp(object):
|
|||||||
def login_action(self, request):
|
def login_action(self, request):
|
||||||
password = request.form['password']
|
password = request.form['password']
|
||||||
redir = request.form['redir']
|
redir = request.form['redir']
|
||||||
if tools.adminpwd_verify(password, self.conf['host']['adminpwd']):
|
if crypto.adminpwd_verify(password, self.conf['host']['adminpwd']):
|
||||||
request.session['admin'] = True
|
request.session['admin'] = True
|
||||||
return redirect('/{}'.format(redir))
|
return redirect('/{}'.format(redir))
|
||||||
request.session['msg'] = 'login:error:{}'.format(request.session.lang.bad_password())
|
request.session['msg'] = 'login:error:{}'.format(request.session.lang.bad_password())
|
||||||
@ -151,7 +152,7 @@ class WSGIApp(object):
|
|||||||
ex_ipv6 = tools.get_external_ip(6)
|
ex_ipv6 = tools.get_external_ip(6)
|
||||||
in_ipv4 = tools.get_local_ip(4)
|
in_ipv4 = tools.get_local_ip(4)
|
||||||
in_ipv6 = tools.get_local_ip(6)
|
in_ipv6 = tools.get_local_ip(6)
|
||||||
cert_info = self.vmmgr.get_cert_info()
|
cert_info = crypto.get_cert_info()
|
||||||
return self.render_html('setup-host.html', request, ex_ipv4=ex_ipv4, ex_ipv6=ex_ipv6, in_ipv4=in_ipv4, in_ipv6=in_ipv6, cert_info=cert_info)
|
return self.render_html('setup-host.html', request, ex_ipv4=ex_ipv4, ex_ipv6=ex_ipv6, in_ipv4=in_ipv4, in_ipv6=in_ipv6, cert_info=cert_info)
|
||||||
|
|
||||||
def setup_apps_view(self, request):
|
def setup_apps_view(self, request):
|
||||||
|
Loading…
Reference in New Issue
Block a user