Apply a few pylint suggestions

This commit is contained in:
Disassembler 2018-11-07 18:35:48 +01:00
parent bd066710f5
commit 2e97742d56
No known key found for this signature in database
GPG Key ID: 524BD33A0EE29499
7 changed files with 28 additions and 55 deletions

View File

@ -28,37 +28,12 @@ def create_cert(domain):
.serial_number(x509.random_serial_number()) \ .serial_number(x509.random_serial_number()) \
.not_valid_before(now) \ .not_valid_before(now) \
.not_valid_after(now + datetime.timedelta(days=7305)) \ .not_valid_after(now + datetime.timedelta(days=7305)) \
.add_extension( .add_extension(x509.SubjectAlternativeName((x509.DNSName(domain), x509.DNSName('*.{}'.format(domain)))), critical=False) \
x509.SubjectAlternativeName(( .add_extension(x509.SubjectKeyIdentifier.from_public_key(public_key), critical=False) \
x509.DNSName(domain), .add_extension(x509.AuthorityKeyIdentifier.from_issuer_public_key(public_key), critical=False) \
x509.DNSName('*.{}'.format(domain)), .add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True) \
)), .add_extension(x509.KeyUsage(digital_signature=True, content_commitment=False, key_encipherment=False, data_encipherment=False, key_agreement=False, key_cert_sign=False, crl_sign=False, encipher_only=False, decipher_only=False), critical=True) \
critical=False) \ .add_extension(x509.ExtendedKeyUsage((ExtendedKeyUsageOID.SERVER_AUTH, ExtendedKeyUsageOID.CLIENT_AUTH)), critical=False) \
.add_extension(
x509.SubjectKeyIdentifier.from_public_key(public_key),
critical=False) \
.add_extension(
x509.AuthorityKeyIdentifier.from_issuer_public_key(public_key),
critical=False) \
.add_extension(
x509.BasicConstraints(ca=False, path_length=None),
critical=True) \
.add_extension(
x509.KeyUsage(digital_signature=True,
content_commitment=False,
key_encipherment=False,
data_encipherment=False,
key_agreement=False,
key_cert_sign=False,
crl_sign=False,
encipher_only=False,
decipher_only=False),
critical=True) \
.add_extension(
x509.ExtendedKeyUsage((
ExtendedKeyUsageOID.SERVER_AUTH,
ExtendedKeyUsageOID.CLIENT_AUTH)),
critical=False) \
.sign(private_key, hashes.SHA256(), default_backend()) .sign(private_key, hashes.SHA256(), default_backend())
with open(CERT_PUB_FILE, 'wb') as f: with open(CERT_PUB_FILE, 'wb') as f:
f.write(cert.public_bytes(serialization.Encoding.PEM)) f.write(cert.public_bytes(serialization.Encoding.PEM))
@ -91,5 +66,5 @@ def verify_signature(file, signature):
def adminpwd_hash(password): def adminpwd_hash(password):
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode() return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
def adminpwd_verify(password, hash): def adminpwd_verify(password, pwhash):
return bcrypt.checkpw(password.encode(), hash.encode()) return bcrypt.checkpw(password.encode(), pwhash.encode())

View File

@ -2,7 +2,6 @@
import dns.exception import dns.exception
import dns.resolver import dns.resolver
import os
import requests import requests
import socket import socket
import subprocess import subprocess
@ -40,10 +39,10 @@ resolver.timeout = 3
resolver.lifetime = 3 resolver.lifetime = 3
resolver.nameservers = ['8.8.8.8', '8.8.4.4', '2001:4860:4860::8888', '2001:4860:4860::8844'] resolver.nameservers = ['8.8.8.8', '8.8.4.4', '2001:4860:4860::8888', '2001:4860:4860::8844']
def resolve_ip(domain, type): def resolve_ip(domain, qtype):
# Resolve domain name using Google Public DNS # Resolve domain name using Google Public DNS
try: try:
return resolver.query(domain, type)[0].address return resolver.query(domain, qtype)[0].address
except dns.exception.Timeout: except dns.exception.Timeout:
raise raise
except: except:
@ -51,7 +50,7 @@ def resolve_ip(domain, type):
def ping_url(url): def ping_url(url):
try: try:
return requests.get('https://tools.dasm.cz/vm-ping.php', params = {'url': url}, timeout=5).text == 'vm-pong' return requests.get('https://tools.dasm.cz/vm-ping.php', params={'url': url}, timeout=5).text == 'vm-pong'
except requests.exceptions.Timeout: except requests.exceptions.Timeout:
raise raise
except: except:

View File

@ -64,7 +64,7 @@ server {{
server {{ server {{
listen [::]:{port} ssl http2; listen [::]:{port} ssl http2;
server_name ~^(.*)\.{domain_esc}$; server_name ~^(.*)\\.{domain_esc}$;
location / {{ location / {{
return 503; return 503;

View File

@ -12,7 +12,7 @@ def is_valid_domain(domain):
def is_valid_port(port): def is_valid_port(port):
try: try:
port = int(port) port = int(port)
return port > 0 and port < 65536 and port not in (22, 25, 80, 8080) return 0 < port < 65536 and port not in (22, 25, 80, 8080)
except: except:
pass pass
return False return False

View File

@ -8,7 +8,6 @@ import subprocess
from . import crypto from . import crypto
from . import templates from . import templates
from . import net from . import net
from .config import Config
VERSION = '0.0.1' VERSION = '0.0.1'
@ -42,7 +41,7 @@ class VMMgr:
def rebuild_nginx(self): def rebuild_nginx(self):
# Rebuild nginx config for the portal app. Web interface calls restart_nginx() in WSGI close handler # Rebuild nginx config for the portal app. Web interface calls restart_nginx() in WSGI close handler
with open(os.path.join(NGINX_DIR, 'default.conf'), 'w') as f: with open(os.path.join(NGINX_DIR, 'default.conf'), 'w') as f:
f.write(templates.NGINX_DEFAULT.format(port=self.port, domain_esc=self.domain.replace('.', '\.'))) f.write(templates.NGINX_DEFAULT.format(port=self.port, domain_esc=self.domain.replace('.', '\\.')))
def reload_nginx(self): def reload_nginx(self):
subprocess.run(['/usr/sbin/nginx', '-s', 'reload']) subprocess.run(['/usr/sbin/nginx', '-s', 'reload'])
@ -57,8 +56,8 @@ class VMMgr:
def update_password(self, oldpassword, newpassword): def update_password(self, oldpassword, newpassword):
# Update LUKS password and adminpwd for WSGI application # Update LUKS password and adminpwd for WSGI application
input = '{}\n{}'.format(oldpassword, newpassword).encode() pwinput = '{}\n{}'.format(oldpassword, newpassword).encode()
subprocess.run(['cryptsetup', 'luksChangeKey', '/dev/sda2'], input=input, check=True) subprocess.run(['cryptsetup', 'luksChangeKey', '/dev/sda2'], input=pwinput, check=True)
# Update bcrypt-hashed password in config # Update bcrypt-hashed password in config
self.conf['host']['adminpwd'] = crypto.adminpwd_hash(newpassword) self.conf['host']['adminpwd'] = crypto.adminpwd_hash(newpassword)
# Save config to file # Save config to file

View File

@ -23,7 +23,7 @@ from .wsgisession import WSGISession
SESSION_KEY = os.urandom(26) SESSION_KEY = os.urandom(26)
class WSGIApp(object): class WSGIApp:
def __init__(self): def __init__(self):
self.conf = Config() self.conf = Config()
self.vmmgr = VMMgr(self.conf) self.vmmgr = VMMgr(self.conf)
@ -78,8 +78,8 @@ class WSGIApp(object):
return response(environ, start_response) return response(environ, start_response)
def dispatch_request(self, request): def dispatch_request(self, request):
map = self.admin_url_map if request.session['admin'] else self.url_map url_map = self.admin_url_map if request.session['admin'] else self.url_map
adapter = map.bind_to_environ(request.environ) adapter = url_map.bind_to_environ(request.environ)
try: try:
endpoint, values = adapter.match() endpoint, values = adapter.match()
response = getattr(self, endpoint)(request, **values) response = getattr(self, endpoint)(request, **values)
@ -120,7 +120,7 @@ class WSGIApp(object):
# Message is in format location:type:text # Message is in format location:type:text
return message.split(':', 3) return message.split(':', 3)
def login_view(self, request, **kwargs): def login_view(self, request):
redir = request.args.get('redir', '') redir = request.args.get('redir', '')
message = self.get_session_message(request) message = self.get_session_message(request)
return self.render_html('login.html', request, redir=redir, message=message) return self.render_html('login.html', request, redir=redir, message=message)
@ -172,7 +172,7 @@ class WSGIApp(object):
def render_setup_apps_table(self, request): def render_setup_apps_table(self, request):
lang = request.session.lang lang = request.session.lang
pending_actions = self.queue.get_actions() pending_actions = self.queue.get_actions()
actionable_apps = sorted(set([k for k,v in self.appmgr.online_packages.items() if 'host' in v] + list(self.conf['apps'].keys()))) actionable_apps = sorted(set([k for k, v in self.appmgr.online_packages.items() if 'host' in v] + list(self.conf['apps'].keys())))
app_data = {} app_data = {}
for app in actionable_apps: for app in actionable_apps:
installed = app in self.conf['apps'] installed = app in self.conf['apps']
@ -229,7 +229,7 @@ class WSGIApp(object):
else: else:
status = '<span class="error">{}</span>'.format(lang.status_stopped()) status = '<span class="error">{}</span>'.format(lang.status_stopped())
actions = '<a href="#" class="app-start">{}</a>, <a href="#" class="app-uninstall">{}</a>'.format(lang.action_start(), lang.action_uninstall()) actions = '<a href="#" class="app-start">{}</a>, <a href="#" class="app-uninstall">{}</a>'.format(lang.action_start(), lang.action_uninstall())
app_data[app] = {'title': title, 'visible': visible, 'installed': installed,'autostarted': autostarted, 'status': status, 'actions': actions} app_data[app] = {'title': title, 'visible': visible, 'installed': installed, 'autostarted': autostarted, 'status': status, 'actions': actions}
return self.render_template('setup-apps-table.html', request, app_data=app_data) return self.render_template('setup-apps-table.html', request, app_data=app_data)
def update_host_action(self, request): def update_host_action(self, request):
@ -238,7 +238,7 @@ class WSGIApp(object):
port = request.form['port'] port = request.form['port']
if not validator.is_valid_domain(domain): if not validator.is_valid_domain(domain):
return self.render_json({'error': request.session.lang.invalid_domain(domain)}) return self.render_json({'error': request.session.lang.invalid_domain(domain)})
elif not validator.is_valid_port(port): if not validator.is_valid_port(port):
return self.render_json({'error': request.session.lang.invalid_port(port)}) return self.render_json({'error': request.session.lang.invalid_port(port)})
self.vmmgr.update_host(domain, port) self.vmmgr.update_host(domain, port)
url = '{}/setup-host'.format(net.compile_url(net.get_local_ip(), port)) url = '{}/setup-host'.format(net.compile_url(net.get_local_ip(), port))
@ -276,7 +276,7 @@ class WSGIApp(object):
if not net.ping_url(url): if not net.ping_url(url):
return self.render_json({'error': request.session.lang.http_host_not_reachable(url)}) return self.render_json({'error': request.session.lang.http_host_not_reachable(url)})
except: except:
return self.render_json({'error': request.session.lang.http_timeout()}) return self.render_json({'error': request.session.lang.http_timeout()})
return self.render_json({'ok': request.session.lang.http_hosts_ok(port)}) return self.render_json({'ok': request.session.lang.http_hosts_ok(port)})
def update_cert_action(self, request): def update_cert_action(self, request):

View File

@ -1,4 +1,4 @@
{% for app,data in app_data.items() %} {% for app, data in app_data.items() %}
<tr data-app="{{ app }}"> <tr data-app="{{ app }}">
<td>{{ data['title'] }}</td> <td>{{ data['title'] }}</td>
<td class="center"><input type="checkbox" class="app-visible"{% if not data['installed'] %} disabled{% elif data['visible'] %} checked{% endif %}></td> <td class="center"><input type="checkbox" class="app-visible"{% if not data['installed'] %} disabled{% elif data['visible'] %} checked{% endif %}></td>