Rework validator
This commit is contained in:
parent
b64f9c2c9e
commit
75e86b0dcb
@ -5,7 +5,6 @@ import shutil
|
||||
import subprocess
|
||||
|
||||
from . import tools
|
||||
from . import validator
|
||||
from .config import Config
|
||||
|
||||
VERSION = '0.0.1'
|
||||
@ -153,8 +152,6 @@ class VMMgr:
|
||||
def register_app(self, app, login, password):
|
||||
# Write a file with credentials of a newly installed application which
|
||||
# will be picked up by thread performing the installation after the install script finishes
|
||||
if app not in self.conf['packages']:
|
||||
raise validator.InvalidValueException('app', app)
|
||||
login = login if login else 'N/A'
|
||||
password = password if password else 'N/A'
|
||||
with open('/tmp/{}.credentials'.format(app), 'w') as f:
|
||||
@ -207,10 +204,6 @@ class VMMgr:
|
||||
|
||||
def update_host(self, domain, port):
|
||||
# Update domain and port and rebuild all configuration. Web interface calls tools.restart_nginx() in WSGI close handler
|
||||
if not validator.is_valid_domain(domain):
|
||||
raise validator.InvalidValueException('domain', domain)
|
||||
if not validator.is_valid_port(port):
|
||||
raise validator.InvalidValueException('port', port)
|
||||
self.domain = self.conf['host']['domain'] = domain
|
||||
self.port = self.conf['host']['port'] = port
|
||||
self.conf.save()
|
||||
@ -223,15 +216,9 @@ class VMMgr:
|
||||
f.write(NGINX_DEFAULT_TEMPLATE.format(port=self.port, domain_esc=self.domain.replace('.', '\.')))
|
||||
|
||||
def rebuild_issue(self):
|
||||
# Compile the URLs displayed in terminal banner
|
||||
ip = tools.get_local_ip(4)
|
||||
if not ip:
|
||||
ip = tools.get_local_ip(6)
|
||||
if not ip:
|
||||
ip = '127.0.0.1'
|
||||
# Rebuild the terminal banner
|
||||
# Compile the URLs displayed in terminal banner and rebuild the file
|
||||
with open(ISSUE_FILE, 'w') as f:
|
||||
f.write(ISSUE_TEMPLATE.format(url=tools.compile_url(self.domain, self.port), ip=tools.compile_url(ip, self.port)))
|
||||
f.write(ISSUE_TEMPLATE.format(url=tools.compile_url(self.domain, self.port), ip=tools.compile_url(tools.get_local_ip(), self.port)))
|
||||
|
||||
def update_password(self, oldpassword, newpassword):
|
||||
# Update LUKS password and adminpwd for WSGI application
|
||||
|
@ -14,7 +14,6 @@ from cryptography.hazmat.primitives.asymmetric import ec
|
||||
from cryptography.hazmat.primitives.serialization import load_pem_public_key
|
||||
|
||||
from . import tools
|
||||
from . import validator
|
||||
|
||||
PUB_FILE = '/etc/vmmgr/packages.pub'
|
||||
LXC_ROOT = '/var/lib/lxc'
|
||||
@ -53,17 +52,13 @@ class AppMgr:
|
||||
def start_app(self, item):
|
||||
# Start the actual app service
|
||||
app = item.key
|
||||
if app not in self.conf['apps']:
|
||||
raise validator.InvalidValueException('app', app)
|
||||
if not tools.is_service_started(app):
|
||||
if app in self.conf['apps'] and not tools.is_service_started(app):
|
||||
tools.start_service(app)
|
||||
|
||||
def stop_app(self, item):
|
||||
# Stop the actual app service
|
||||
app = item.key
|
||||
if app not in self.conf['apps']:
|
||||
raise validator.InvalidValueException('app', app)
|
||||
if tools.is_service_started(app):
|
||||
if app in self.conf['apps'] and tools.is_service_started(app):
|
||||
tools.stop_service(app)
|
||||
# Stop the app service's dependencies if they are not used by another running app
|
||||
deps = self.get_services_deps()
|
||||
@ -73,16 +68,14 @@ class AppMgr:
|
||||
|
||||
def update_app_visibility(self, app, visible):
|
||||
# Update visibility for the app in the configuration
|
||||
if app not in self.conf['apps']:
|
||||
raise validator.InvalidValueException('app', app)
|
||||
self.conf['apps'][app]['visible'] = visible
|
||||
self.conf.save()
|
||||
if app in self.conf['apps']:
|
||||
self.conf['apps'][app]['visible'] = visible
|
||||
self.conf.save()
|
||||
|
||||
def update_app_autostart(self, app, enabled):
|
||||
# Add/remove the app to OpenRC default runlevel
|
||||
if app not in self.conf['apps']:
|
||||
raise validator.InvalidValueException('app', app)
|
||||
subprocess.run(['/sbin/rc-update', 'add' if enabled else 'del', app])
|
||||
if app in self.conf['apps']:
|
||||
subprocess.run(['/sbin/rc-update', 'add' if enabled else 'del', app])
|
||||
|
||||
def install_app(self, item):
|
||||
# Main installation function. Wrapper for download, registration and install script
|
||||
@ -235,8 +228,6 @@ class AppMgr:
|
||||
|
||||
def update_common_settings(self, email, gmaps_api_key):
|
||||
# Update common configuration values
|
||||
if not validator.is_valid_email(email):
|
||||
raise validator.InvalidValueException('email', email)
|
||||
self.conf['common']['email'] = email
|
||||
self.conf['common']['gmaps-api-key'] = gmaps_api_key
|
||||
self.conf.save()
|
||||
|
@ -18,8 +18,11 @@ def compile_url(domain, port, proto='https'):
|
||||
port = '' if (proto == 'https' and port == '443') or (proto == 'http' and port == '80') else ':{}'.format(port)
|
||||
return '{}://{}{}'.format(proto, domain, port)
|
||||
|
||||
def get_local_ip(version):
|
||||
def get_local_ip(version=None):
|
||||
# Return first routable IPv4/6 address of the VM (container host)
|
||||
# If version is not given, try getting IPv4, if that fails, try getting IPv6, if that fails too, fall back to 127.0.0.1
|
||||
if not version:
|
||||
return get_local_ip(4) or get_local_ip(6) or '127.0.0.1'
|
||||
try:
|
||||
output = subprocess.run(['/sbin/ip', 'route', 'get', '1' if version == 4 else '2003::'], check=True, stdout=subprocess.PIPE).stdout.decode().split()
|
||||
# Get field right after 'src'
|
||||
|
@ -1,25 +1,30 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import re
|
||||
from urllib.parse import urlparse
|
||||
|
||||
domain_re = re.compile(r'^(?!-)[a-z0-9-]{1,63}(?<!-)(?:\.(?!-)[a-z0-9-]{1,63}(?<!-)){0,125}\.(?!-)(?![0-9]+$)[a-z0-9-]{1,63}(?<!-)$')
|
||||
box_re = re.compile(r'^[a-z0-9!#$%&\'*+/=?^_`{|}~-]+(?:\.[a-z0-9!#$%&\'*+/=?^_`{|}~-]+)*$')
|
||||
|
||||
class InvalidValueException(Exception):
|
||||
pass
|
||||
|
||||
def is_valid_domain(domain):
|
||||
return bool(domain_re.match(domain))
|
||||
|
||||
def is_valid_port(port):
|
||||
try:
|
||||
port = int(port)
|
||||
return port > 0 and port < 65536
|
||||
return port > 0 and port < 65536 and port not in (22, 25, 80, 8080)
|
||||
except:
|
||||
return False
|
||||
pass
|
||||
return False
|
||||
|
||||
def is_valid_email(email):
|
||||
parts = email.split('@')
|
||||
if len(parts) != 2:
|
||||
return False
|
||||
return bool(box_re.match(parts[0])) and bool(domain_re.match(parts[1]))
|
||||
return len(parts) == 2 and bool(box_re.match(parts[0])) and bool(domain_re.match(parts[1]))
|
||||
|
||||
def is_valid_url(url):
|
||||
try:
|
||||
parsed = urlparse(url)
|
||||
return parsed.scheme in ('http', 'https')
|
||||
except:
|
||||
pass
|
||||
return False
|
||||
|
@ -14,10 +14,10 @@ from cryptography.exceptions import InvalidSignature
|
||||
|
||||
from . import VMMgr, CERT_PUB_FILE
|
||||
from . import tools
|
||||
from . import validator
|
||||
from .actionqueue import ActionQueue
|
||||
from .appmgr import AppMgr
|
||||
from .config import Config
|
||||
from .validator import InvalidValueException
|
||||
from .wsgilang import WSGILang
|
||||
from .wsgisession import WSGISession
|
||||
|
||||
@ -232,22 +232,17 @@ class WSGIApp(object):
|
||||
|
||||
def update_host_action(self, request):
|
||||
# Update domain and port, then restart nginx
|
||||
try:
|
||||
domain = request.form['domain']
|
||||
port = request.form['port']
|
||||
self.vmmgr.update_host(domain, port)
|
||||
ip = tools.get_local_ip(4)
|
||||
if not ip:
|
||||
ip = tools.get_local_ip(6)
|
||||
url = '{}/setup-host'.format(tools.compile_url(ip, port))
|
||||
response = self.render_json({'ok': request.session.lang.host_updated(url, url)})
|
||||
response.call_on_close(tools.restart_nginx)
|
||||
return response
|
||||
except InvalidValueException as e:
|
||||
if e.args[0] == 'domain':
|
||||
return self.render_json({'error': request.session.lang.invalid_domain(domain)})
|
||||
if e.args[0] == 'port':
|
||||
return self.render_json({'error': request.session.lang.invalid_port(port)})
|
||||
domain = request.form['domain']
|
||||
port = request.form['port']
|
||||
if not validator.is_valid_domain(domain)
|
||||
return self.render_json({'error': request.session.lang.invalid_domain(domain)})
|
||||
elif not validator.is_valid_port(port):
|
||||
return self.render_json({'error': request.session.lang.invalid_port(port)})
|
||||
self.vmmgr.update_host(domain, port)
|
||||
url = '{}/setup-host'.format(tools.compile_url(tools.get_local_ip(), port))
|
||||
response = self.render_json({'ok': request.session.lang.host_updated(url, url)})
|
||||
response.call_on_close(tools.restart_nginx)
|
||||
return response
|
||||
|
||||
def verify_dns_action(self, request):
|
||||
# Check if all FQDNs for all applications are resolvable and point to current external IP
|
||||
@ -284,43 +279,43 @@ class WSGIApp(object):
|
||||
|
||||
def update_cert_action(self, request):
|
||||
# Update certificate - either request via Let's Encrypt or manually upload files
|
||||
try:
|
||||
if request.form['method'] not in ['selfsigned', 'automatic', 'manual']:
|
||||
raise BadRequest()
|
||||
if request.form['method'] == 'selfsigned':
|
||||
self.vmmgr.create_selfsigned_cert()
|
||||
elif request.form['method'] == 'automatic':
|
||||
self.vmmgr.request_acme_cert()
|
||||
else:
|
||||
if not request.files['public']:
|
||||
return self.render_json({'error': request.session.lang.cert_file_missing()})
|
||||
if not request.files['private']:
|
||||
return self.render_json({'error': request.session.lang.key_file_missing()})
|
||||
request.files['public'].save('/tmp/public.pem')
|
||||
request.files['private'].save('/tmp/private.pem')
|
||||
self.vmmgr.install_manual_cert('/tmp/public.pem', '/tmp/private.pem')
|
||||
os.unlink('/tmp/public.pem')
|
||||
os.unlink('/tmp/private.pem')
|
||||
except:
|
||||
if request.form['method'] == 'selfsigned':
|
||||
self.vmmgr.create_selfsigned_cert()
|
||||
elif request.form['method'] == 'automatic':
|
||||
self.vmmgr.request_acme_cert()
|
||||
elif request.form['method'] == 'manual':
|
||||
if not request.files['public']:
|
||||
return self.render_json({'error': request.session.lang.cert_file_missing()})
|
||||
if not request.files['private']:
|
||||
return self.render_json({'error': request.session.lang.key_file_missing()})
|
||||
request.files['public'].save('/tmp/public.pem')
|
||||
request.files['private'].save('/tmp/private.pem')
|
||||
self.vmmgr.install_manual_cert('/tmp/public.pem', '/tmp/private.pem')
|
||||
os.unlink('/tmp/public.pem')
|
||||
os.unlink('/tmp/private.pem')
|
||||
else:
|
||||
return self.render_json({'error': request.session.lang.cert_request_error()})
|
||||
url = tools.compile_url(self.vmmgr.domain, self.vmmgr.port)
|
||||
return self.render_json({'ok': request.session.lang.cert_installed(url, url)})
|
||||
|
||||
def update_common_action(self, request):
|
||||
# Update common settings shared between apps - admin e-mail address, Google Maps API key
|
||||
try:
|
||||
email = request.form['email']
|
||||
gmaps_api_key = request.form['gmaps-api-key']
|
||||
self.appmgr.update_common_settings(email, gmaps_api_key)
|
||||
request.session['msg'] = 'common:info:{}'.format(request.session.lang.common_updated())
|
||||
except InvalidValueException:
|
||||
email = request.form['email']
|
||||
if not validator.is_valid_email(email):
|
||||
request.session['msg'] = 'common:error:{}'.format(request.session.lang.invalid_email(email))
|
||||
else:
|
||||
self.appmgr.update_common_settings(email, request.form['gmaps-api-key'])
|
||||
request.session['msg'] = 'common:info:{}'.format(request.session.lang.common_updated())
|
||||
return redirect('/setup-apps')
|
||||
|
||||
def update_repo_action(self, request):
|
||||
# Update repository URL and credentials
|
||||
self.appmgr.update_repo_settings(request.form['repourl'], request.form['repousername'], request.form['repopassword'])
|
||||
request.session['msg'] = 'repo:info:{}'.format(request.session.lang.repo_updated())
|
||||
url = request.form['repourl']
|
||||
if not validator.is_valid_url(url):
|
||||
request.session['msg'] = 'repo:error:{}'.format(request.session.lang.invalid_url(request.form['repourl']))
|
||||
else:
|
||||
self.appmgr.update_repo_settings(url, request.form['repousername'], request.form['repopassword'])
|
||||
request.session['msg'] = 'repo:info:{}'.format(request.session.lang.repo_updated())
|
||||
return redirect('/setup-apps')
|
||||
|
||||
def update_app_visibility_action(self, request):
|
||||
|
@ -17,6 +17,7 @@ class WSGILang:
|
||||
'cert_request_error': 'Došlo k chybě při žádosti o certifikát. Zkontrolujte, zda je virtuální stroj dostupný z internetu na portu 80.',
|
||||
'cert_installed': 'Certifikát byl úspěšně nainstalován. Přejděte na URL <a href="{}">{}</a> nebo restartujte webový prohlížeč pro jeho načtení.',
|
||||
'invalid_email': 'Zadaný e-mail "{}" není platný.',
|
||||
'invalid_url': 'Zadaná adresa "{}" není platná.',
|
||||
'common_updated': 'Nastavení aplikací bylo úspěšně změněno. Pokud je některá z aplikací spuštěna, změny se projeví po jejím restartu.',
|
||||
'repo_updated': 'Nastavení distribučního serveru bylo úspěšně změněno.',
|
||||
'stop_start_error': 'Došlo k chybě při spouštění/zastavování. Zkuste akci opakovat nebo restartuje virtuální stroj.',
|
||||
|
@ -14,7 +14,7 @@
|
||||
<tr>
|
||||
<td>Port</td>
|
||||
<td><input type="text" name="port" id="port" value="{{ conf['host']['port'] }}"></td>
|
||||
<td class="remark">HTTPS port na kterém budou dostupné aplikace. Výchozí HTTPS port je 443.</td>
|
||||
<td class="remark">HTTPS port na kterém budou dostupné aplikace. Porty 22, 25, 80 a 8080 jsou vyhrazeny k jiným účelům. Výchozí HTTPS port je 443.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td> </td>
|
||||
|
Loading…
x
Reference in New Issue
Block a user