128 lines
5.6 KiB
Python

# -*- coding: utf-8 -*-
import os
import shutil
import subprocess
import urllib
from . import crypto
from . import templates
from . import net
from .paths import ACME_CRON, ACME_DIR, ISSUE_FILE, NGINX_DIR, REPO_FILE
class VMMgr:
def __init__(self, conf):
# Load JSON configuration
self.conf = conf
self.domain = conf['host']['domain']
self.port = conf['host']['port']
def register_app(self, app, login, password):
# Register newly installed application and its credentials
self.conf['apps'][app] = {'login': login if login else 'N/A',
'password': password if password else 'N/A',
'visible': False}
self.conf.save()
def update_host(self, domain, port):
# Update domain and port and rebuild all configuration. Web interface calls restart_nginx() in WSGI close handler
self.domain = self.conf['host']['domain'] = domain
self.port = self.conf['host']['port'] = port
self.conf.save()
# Rebuild and restart nginx if it was requested.
self.rebuild_nginx()
def rebuild_nginx(self):
# Rebuild nginx config for the portal app. Web interface calls restart_nginx() in WSGI close handler
with open(os.path.join(NGINX_DIR, 'default.conf'), 'w') as f:
f.write(templates.NGINX_DEFAULT.format(port=self.port, domain_esc=self.domain.replace('.', '\\.')))
def reload_nginx(self):
subprocess.run(['/usr/sbin/nginx', '-s', 'reload'])
def restart_nginx(self):
subprocess.run(['/sbin/service', 'nginx', 'restart'])
def rebuild_issue(self):
# Compile the URLs displayed in terminal banner and rebuild the file
with open(ISSUE_FILE, 'w') as f:
f.write(templates.ISSUE.format(url=net.compile_url(self.domain, self.port), ip=net.compile_url(net.get_local_ip(), self.port)))
def update_password(self, oldpassword, newpassword):
# Update LUKS password and adminpwd for WSGI application
pwinput = '{}\n{}'.format(oldpassword, newpassword).encode()
subprocess.run(['cryptsetup', 'luksChangeKey', '/dev/sda2'], input=pwinput, check=True)
# Update bcrypt-hashed password in config
self.conf['host']['adminpwd'] = crypto.adminpwd_hash(newpassword)
# Save config to file
self.conf.save()
def get_repo_conf(self):
# Read, parse and return current @vm repository configuration
with open(REPO_FILE) as f:
url = [l for l in f.read().splitlines() if l.startswith('@vm')][0].split(' ', 2)[1]
url = urllib.parse.urlparse(url)
return {'url': '{}://{}{}'.format(url.scheme, url.netloc, url.path),
'user': url.username if url.username else '' ,
'pwd': url.password if url.password else ''}
def set_repo_conf(self, url, user, pwd):
# Update @vm repository configuration
url = urllib.parse.urlparse(url)
# Create URL with username and password
repo_url = [url.scheme, '://']
if user:
repo_url.append(urllib.quote(user, safe=''))
if pwd:
repo_url.extend((':', urllib.quote(pwd, safe='')))
repo_url.append('@')
repo_url.extend((url.netloc, url.path))
# Update URL in repositories file
with open(REPO_FILE, 'w') as f:
f.write(templates.REPOSITORIES.format(url=''.join(repo_url)))
def create_selfsigned_cert(self):
# Disable acme.sh cronjob
os.chmod(ACME_CRON, 0o640)
# Create selfsigned certificate with wildcard alternative subject name
crypto.create_cert(self.domain)
# Reload nginx
self.reload_nginx()
def request_acme_cert(self):
# Remove all possible conflicting certificates requested in the past
certs = [i for i in os.listdir(ACME_DIR) if i not in ('account.conf', 'ca', 'http.header')]
for cert in certs:
if cert != self.domain:
subprocess.run(['/usr/bin/acme.sh', '--remove', '-d', cert])
# Compile an acme.sh command for certificate requisition only if the certificate hasn't been requested before
if not os.path.exists(os.path.join(ACME_DIR, self.domain)):
cmd = ['/usr/bin/acme.sh', '--issue', '-d', self.domain]
for app in self.conf['apps'].copy():
cmd += ['-d', '{}.{}'.format(self.conf['apps'][app]['host'], self.domain)]
cmd += ['-w', ACME_DIR]
# Request the certificate
subprocess.run(cmd, check=True)
# Otherwise just try to renew
else:
# Acme.sh returns code 2 on skipped renew
try:
subprocess.run(['/usr/bin/acme.sh', '--renew', '-d', self.domain], check=True)
except subprocess.CalledProcessError as e:
if e.returncode != 2:
raise
# Install the issued certificate
subprocess.run(['/usr/bin/acme.sh', '--install-cert', '-d', self.domain, '--key-file', crypto.CERT_KEY_FILE, '--fullchain-file', crypto.CERT_PUB_FILE, '--reloadcmd', '/sbin/service nginx reload'], check=True)
# Enable acme.sh cronjob
os.chmod(ACME_CRON, 0o750)
def install_manual_cert(self, public_file, private_file):
# Disable acme.sh cronjob
os.chmod(ACME_CRON, 0o640)
# Copy certificate files
shutil.copyfile(public_file, crypto.CERT_PUB_FILE)
shutil.copyfile(private_file, crypto.CERT_KEY_FILE)
os.chmod(crypto.CERT_KEY_FILE, 0o640)
# Reload nginx
self.reload_nginx()