# -*- coding: utf-8 -*- import json import os from cryptography.exceptions import InvalidSignature from jinja2 import Environment, FileSystemLoader from math import floor from pkg_resources import parse_version from spoc import repo_online, repo_local from werkzeug.exceptions import HTTPException, NotFound, Unauthorized from werkzeug.routing import Map, Rule, RequestRedirect from werkzeug.utils import redirect from werkzeug.wrappers import Request, Response from . import config, crypto, net, remote, validator, vmmgr from .actionqueue import ActionQueue, ActionItemType from .wsgilang import WSGILang from .wsgisession import WSGISession SESSION_KEY = os.urandom(26) class WSGIApp: def __init__(self): self.queue = ActionQueue() self.jinja_env = Environment(loader=FileSystemLoader('/usr/share/vmmgr/templates'), autoescape=True, lstrip_blocks=True, trim_blocks=True) self.url_map = Map(( Rule('/', endpoint='portal_view'), Rule('/login', methods=['GET'], endpoint='login_view'), Rule('/login', methods=['POST'], endpoint='login_action'), Rule('/setup-host', redirect_to='/login?redir=setup-host'), Rule('/setup-apps', redirect_to='/login?redir=setup-apps'), Rule('/setup-remote', redirect_to='/login?redir=setup-remote'), )) self.admin_url_map = Map(( Rule('/', endpoint='portal_view'), Rule('/logout', endpoint='logout_action'), Rule('/setup-host', endpoint='setup_host_view'), Rule('/setup-apps', endpoint='setup_apps_view'), Rule('/setup-remote', endpoint='setup_remote_view'), Rule('/update-host', endpoint='update_host_action'), Rule('/verify-dns', endpoint='verify_dns_action'), Rule('/verify-https', endpoint='verify_http_action', defaults={'proto': 'https'}), Rule('/verify-http', endpoint='verify_http_action', defaults={'proto': 'http'}), Rule('/update-cert', endpoint='update_cert_action'), Rule('/update-repo', endpoint='update_repo_action'), Rule('/update-app-visibility', endpoint='update_app_visibility_action'), Rule('/update-app-autostart', endpoint='update_app_autostart_action'), Rule('/start-app', endpoint='start_app_action'), Rule('/stop-app', endpoint='stop_app_action'), Rule('/install-app', endpoint='install_app_action'), Rule('/get-app-status', endpoint='get_app_status_action'), Rule('/clear-app-status', endpoint='clear_app_status_action'), Rule('/uninstall-app', endpoint='uninstall_app_action'), Rule('/update-app', endpoint='update_app_action'), Rule('/update-password', endpoint='update_password_action'), Rule('/shutdown-vm', endpoint='shutdown_vm_action'), Rule('/reboot-vm', endpoint='reboot_vm_action'), Rule('/update-ssh-keys', endpoint='update_ssh_keys_action'), Rule('/update-vpn', endpoint='update_vpn_action'), Rule('/generate-vpn-key', endpoint='generate_vpn_key_action'), Rule('/start-vpn', endpoint='start_vpn_action'), Rule('/stop-vpn', endpoint='stop_vpn_action'), )) def __call__(self, environ, start_response): return self.wsgi_app(environ, start_response) def wsgi_app(self, environ, start_response): request = Request(environ) # Enhance request request.session = WSGISession(request.cookies, SESSION_KEY) request.session.lang = WSGILang() # Dispatch request response = self.dispatch_request(request) return response(environ, start_response) def dispatch_request(self, request): url_map = self.admin_url_map if request.session['admin'] else self.url_map adapter = url_map.bind_to_environ(request.environ) try: endpoint, values = adapter.match() response = getattr(self, endpoint)(request, **values) # Save session if changed request.session.save(response) return response except NotFound as e: # Return custom 404 page response = self.render_html('404.html', request) response.status_code = 404 return response except HTTPException as e: # Return MethodNotAllowed or RequestRedirect as they are return e def render_template(self, template_name, request, **context): # Enhance context context['config'] = config context['session'] = request.session context['lang'] = request.session.lang # Render template template = self.jinja_env.get_template(template_name) return template.render(context) def render_html(self, template_name, request, **context): html = self.render_template(template_name, request, **context) return Response(html, mimetype='text/html') def render_json(self, data): return Response(json.dumps(data), mimetype='application/json') def get_session_message(self, request): # Consume and retrieve message stored in session if 'msg' not in request.session: return None message = request.session['msg'] del request.session['msg'] # Message is in format location:type:text return message.split(':', 2) def login_view(self, request): redir = request.args.get('redir', '') message = self.get_session_message(request) return self.render_html('login.html', request, redir=redir, message=message) def login_action(self, request): password = request.form['password'] redir = request.form['redir'] hash = config.get_host()['adminpwd'] if crypto.adminpwd_verify(password, hash): request.session['admin'] = True return redirect(f'/{redir}') request.session['msg'] = f'login:error:{request.session.lang.bad_password()}' return redirect(f'/login?redir={redir}') if redir else redirect('/login') def logout_action(self, request): request.session.reset() return redirect('/') def portal_view(self, request): # Default portal view. host = config.get_host() host = net.compile_url(host['domain'], host['port'], None) apps = config.get_apps() visible_apps = [app for app,definition in apps.items() if definition['visible'] and vmmgr.is_app_started(app)] if request.session['admin']: return self.render_html('portal-admin.html', request, host=host, apps=apps, visible_apps=visible_apps) return self.render_html('portal-user.html', request, host=host, apps=apps, visible_apps=visible_apps) def setup_host_view(self, request): # Host setup view. ex_ipv4 = net.get_external_ip(4) ex_ipv6 = net.get_external_ip(6) in_ipv4 = net.get_local_ip(4) in_ipv6 = net.get_local_ip(6) cert_info = crypto.get_cert_info() apps = config.get_apps() host = config.get_host() return self.render_html('setup-host.html', request, ex_ipv4=ex_ipv4, ex_ipv6=ex_ipv6, in_ipv4=in_ipv4, in_ipv6=in_ipv6, cert_info=cert_info, apps=apps, domain=host['domain'], port=host['port']) def setup_apps_view(self, request): # Application manager view. repo_error = None try: # Repopulate online_repo cache or fail early when the repo can't be reached repo_online.load(True) except InvalidSignature: repo_error = request.session.lang.invalid_packages_signature() except Unauthorized: repo_error = request.session.lang.repo_invalid_credentials() except: repo_error = request.session.lang.repo_unavailable() table = self.render_setup_apps_table(request) message = self.get_session_message(request) repo_conf = vmmgr.get_repo_conf() return self.render_html('setup-apps.html', request, repo_conf=repo_conf, repo_error=repo_error, table=table, message=message) def render_setup_apps_table(self, request): lang = request.session.lang local_apps = repo_local.get_apps() try: online_apps = repo_online.get_apps() except: online_apps = {} apps_config = config.get_apps() actionable_apps = sorted(set(online_apps) | set(local_apps)) pending_actions = self.queue.get_actions() app_data = {} for app in actionable_apps: installed = app in local_apps title = local_apps[app]['meta']['title'] if installed else online_apps[app]['meta']['title'] try: visible = apps_config[app]['visible'] except KeyError: visible = False try: autostarted = local_apps[app]['autostart'] except KeyError: autostarted = False if app in pending_actions: # Display queued or currently processed actions app_queue = pending_actions[app] if app_queue.index: if app_queue.exception: # Display failed task if isinstance(app_queue.exception, InvalidSignature): status = lang.repo_package_invalid_signature() elif isinstance(app_queue.exception, NotFound): status = lang.repo_package_missing() elif isinstance(app_queue.exception, BaseException): if app_queue.action in (vmmgr.start_app, vmmgr.stop_app): status = lang.stop_start_error() else: status = lang.package_manager_error() status = f'<span class="error">{status}<span> <a href="#" class="app-clear-status">OK</a>' actions = None else: # Display task/subtask progress action_item = app_queue.queue[app_queue.index-1] if action_item.type in (ActionItemType.IMAGE_DOWNLOAD, ActionItemType.APP_DOWNLOAD): status = lang.status_downloading(action_item.key) elif action_item.type in (ActionItemType.IMAGE_UNPACK, ActionItemType.APP_UNPACK): status = lang.status_unpacking(action_item.key) elif action_item.type == ActionItemType.IMAGE_DELETE: status = lang.status_deleting(action_item.key) elif action_item.type == ActionItemType.APP_START: status = lang.status_starting() elif action_item.type == ActionItemType.APP_STOP: status = lang.status_stopping() elif action_item.type == ActionItemType.APP_INSTALL: status = lang.status_installing() elif action_item.type == ActionItemType.APP_UPDATE: status = lang.status_updating() elif action_item.type == ActionItemType.APP_UNINSTALL: status = lang.status_uninstalling() if app_queue.action not in (vmmgr.start_app, vmmgr.stop_app): # For tasks other than start/stop which have only a single subtask, display also index of the subtask in queue status = f'[{app_queue.index}/{len(app_queue.queue)}] {status}' if action_item.units_total: # Show progress for tasks which have measurable progress status = f'{status} ({floor(action_item.units_done/action_item.units_total*100)} %)' actions = '<div class="loader"></div>' else: # Display queued (pending, not started) task if app_queue.action == vmmgr.start_app: status = lang.status_starting() elif app_queue.action == vmmgr.stop_app: status = lang.status_stopping() elif app_queue.action == vmmgr.install_app: status = lang.status_installing() elif app_queue.action == vmmgr.update_app: status = lang.status_updating() elif app_queue.action == vmmgr.uninstall_app: status = lang.status_uninstalling() status = f'{status} ({lang.status_queued()})' actions = '<div class="loader"></div>' else: # Diplay apps with no queued or currently processed action if not installed: status = lang.status_not_installed() actions = f'<a href="#" class="app-install">{lang.action_install()}</a>' else: if vmmgr.is_app_started(app): status = f'<span class="info">{lang.status_started()}</span>' actions = f'<a href="#" class="app-stop">{lang.action_stop()}</a>' else: status = f'<span class="error">{lang.status_stopped()}</span>' actions = f'<a href="#" class="app-start">{lang.action_start()}</a>, <a href="#" class="app-uninstall">{lang.action_uninstall()}</a>' try: if parse_version(online_apps[app]['version']) > parse_version(local_apps[app]['version']): actions = f'{actions}, <a href="#" class="app-update">{lang.action_update()}</a>' except KeyError: pass app_data[app] = {'title': title, 'visible': visible, 'installed': installed, 'autostarted': autostarted, 'status': status, 'actions': actions} return self.render_template('setup-apps-table.html', request, app_data=app_data) def setup_remote_view(self, request): # Remote access setup view authorized_keys = remote.get_authorized_keys() wg_conf = remote.get_wireguard_conf() wg_conf['status'] = ('info', request.session.lang.status_started()) if wg_conf['running'] else ('error', request.session.lang.status_stopped()) message = self.get_session_message(request) return self.render_html('setup-remote.html', request, authorized_keys=authorized_keys, wg_conf=wg_conf, message=message) def update_host_action(self, request): # Update domain and port, then restart nginx domain = request.form['domain'] port = request.form['port'] if not validator.is_valid_domain(domain): return self.render_json({'error': request.session.lang.invalid_domain()}) if not validator.is_valid_port(port): return self.render_json({'error': request.session.lang.invalid_port()}) vmmgr.update_host(domain, port) url = f'{net.compile_url(net.get_local_ip(), port)}/setup-host' response = self.render_json({'ok': request.session.lang.host_updated(url, url)}) response.call_on_close(vmmgr.restart_nginx) return response def verify_dns_action(self, request): # Check if all FQDNs for all applications are resolvable and point to current external IP domain = config.get_host()['domain'] domains = [domain]+[f'{definition["host"]}.{domain}' for app,definition in config.get_apps().items()] ipv4 = net.get_external_ip(4) ipv6 = net.get_external_ip(6) for domain in domains: try: a = net.resolve_ip(domain, 'A') aaaa = net.resolve_ip(domain, 'AAAA') if not a and not aaaa: return self.render_json({'error': request.session.lang.dns_record_does_not_exist(domain)}) if a and a != ipv4: return self.render_json({'error': request.session.lang.dns_record_mismatch(domain, a, ipv4)}) if aaaa and aaaa != ipv6: return self.render_json({'error': request.session.lang.dns_record_mismatch(domain, aaaa, ipv6)}) except: return self.render_json({'error': request.session.lang.dns_timeout()}) return self.render_json({'ok': request.session.lang.dns_records_ok()}) def verify_http_action(self, request, **kwargs): # Check if all applications are accessible from the internet using 3rd party ping service proto = kwargs['proto'] host = config.get_host() port = host['port'] if proto == 'https' else '80' domains = [host['domain']]+[f'{definition["host"]}.{host["domain"]}' for app,definition in config.get_apps().items()] for domain in domains: url = net.compile_url(domain, port, proto) try: if not net.ping_url(url): return self.render_json({'error': request.session.lang.http_host_not_reachable(url)}) except: return self.render_json({'error': request.session.lang.http_timeout()}) return self.render_json({'ok': request.session.lang.http_hosts_ok(port)}) def update_cert_action(self, request): # Update certificate - either request via Let's Encrypt or manually upload files if request.form['method'] == 'selfsigned': vmmgr.create_selfsigned_cert() elif request.form['method'] == 'automatic': vmmgr.request_acme_cert() elif request.form['method'] == 'manual': if not request.files['public']: return self.render_json({'error': request.session.lang.cert_file_missing()}) if not request.files['private']: return self.render_json({'error': request.session.lang.key_file_missing()}) request.files['public'].save('/tmp/public.pem') request.files['private'].save('/tmp/private.pem') vmmgr.install_manual_cert('/tmp/public.pem', '/tmp/private.pem') os.unlink('/tmp/public.pem') os.unlink('/tmp/private.pem') else: return self.render_json({'error': request.session.lang.cert_request_error()}) host = config.get_host() url = net.compile_url(host['domain'], host['port']) return self.render_json({'ok': request.session.lang.cert_installed(url, url)}) def update_repo_action(self, request): # Update repository URL and credentials url = request.form['repourl'] if not validator.is_valid_repo_url(url): request.session['msg'] = f'repo:error:{request.session.lang.invalid_url(url)}' else: vmmgr.update_repo_conf(url, request.form['repousername'], request.form['repopassword']) request.session['msg'] = f'repo:info:{request.session.lang.repo_updated()}' return redirect('/setup-apps') def update_app_visibility_action(self, request): # Update application visibility on portal page vmmgr.update_app_visibility(request.form['app'], request.form['value'] == 'true') return self.render_json({'ok': 'ok'}) def update_app_autostart_action(self, request): # Update value determining if the app should be automatically started after VM boot vmmgr.update_app_autostart(request.form['app'], request.form['value'] == 'true') return self.render_json({'ok': 'ok'}) def enqueue_app_action(self, request, action): # Common method for enqueuing app actions app = request.form['app'] self.queue.enqueue_action(app, action) response = self.render_json({'ok': self.render_setup_apps_table(request)}) response.call_on_close(self.queue.process) return response def start_app_action(self, request): # Queues application start along with its dependencies return self.enqueue_app_action(request, vmmgr.start_app) def stop_app_action(self, request): # Queues application stop along with its dependencies return self.enqueue_app_action(request, vmmgr.stop_app) def install_app_action(self, request): # Queues application installation return self.enqueue_app_action(request, vmmgr.install_app) def uninstall_app_action(self, request): # Queues application uninstallation return self.enqueue_app_action(request, vmmgr.uninstall_app) def update_app_action(self, request): # Queues application update return self.enqueue_app_action(request, vmmgr.update_app) def get_app_status_action(self, request): # Gets application and queue status return self.render_json({'ok': self.render_setup_apps_table(request)}) def clear_app_status_action(self, request): # Clears error status for an application self.queue.clear_action(request.form['app']) return self.render_json({'ok': self.render_setup_apps_table(request)}) def update_password_action(self, request): # Updates password for both HDD encryption (LUKS-on-LVM) and web interface admin account try: if request.form['newpassword'] != request.form['newpassword2']: return self.render_json({'error': request.session.lang.password_mismatch()}) if request.form['newpassword'] == '': return self.render_json({'error': request.session.lang.password_empty()}) # No need to explicitly validate old password, vmmgr.update_password will raise exception if it's wrong vmmgr.update_password(request.form['oldpassword'], request.form['newpassword']) except: return self.render_json({'error': request.session.lang.bad_password()}) return self.render_json({'ok': request.session.lang.password_changed()}) def reboot_vm_action(self, request): # Reboots VM response = self.render_json({'ok': request.session.lang.reboot_initiated()}) response.call_on_close(vmmgr.reboot_vm) return response def shutdown_vm_action(self, request): # Shuts down VM response = self.render_json({'ok': request.session.lang.shutdown_initiated()}) response.call_on_close(vmmgr.shutdown_vm) return response def update_ssh_keys_action(self, request): # Update authorized_keys file remote.set_authorized_keys(request.form['ssh-keys'].replace('\r', '')) request.session['msg'] = f'ssh:info:{request.session.lang.ssh_keys_installed()}' return redirect('/setup-remote') def update_vpn_action(self, request): # Update WireGuard VPN listen port, virtual IP and peer list ip = request.form['vpn-lip'] if not ip.isdigit() or not 0 < int(ip) < 255: request.session['msg'] = f'vpn:error:{request.session.lang.invalid_ip()}' return redirect('/setup-remote') port = request.form['vpn-port'] if not port.isdigit() or not 0 < int(port) < 65536: request.session['msg'] = f'vpn:error:{request.session.lang.invalid_port()}' return redirect('/setup-remote') peers = request.form['vpn-peers'].replace('\r', '') remote.set_wireguard_conf(ip, port, peers) request.session['msg'] = f'vpn:info:{request.session.lang.vpn_updated()}' return redirect('/setup-remote') def generate_vpn_key_action(self, request): # Regenerate WireGuard key pair remote.regenerate_wireguard_key() return redirect('/setup-remote') def start_vpn_action(self, request): # Start WireGuard VPN remote.start_wireguard() return redirect('/setup-remote') def stop_vpn_action(self, request): # Stop WireGuard VPN remote.stop_wireguard() return redirect('/setup-remote')