Address some pylint issues

This commit is contained in:
Disassembler 2020-05-02 23:36:16 +02:00
parent 15c70db229
commit 5517b428fc
Signed by: Disassembler
GPG Key ID: 524BD33A0EE29499
7 changed files with 64 additions and 56 deletions

View File

@ -119,6 +119,8 @@ class ActionQueue:
app_queue.process()
# If the actions finished without errors, restore nominal state by deleting the item from action list
self.clear_action(app_name)
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as e:
# If the action failed, store the exception and leave it in the list for manual clearance
with self.lock:

View File

@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
import dns.exception
import dns.resolver
import requests
import socket
@ -21,7 +20,7 @@ def get_local_ip(version=None):
output = subprocess.run(['/sbin/ip', 'route', 'get', '1' if version == 4 else '2003::'], check=True, stdout=subprocess.PIPE).stdout.decode().split()
# Get field right after 'src'
return output[output.index('src')+1]
except:
except (subprocess.CalledProcessError, ValueError, IndexError):
return None
def get_external_ip(version):
@ -31,7 +30,7 @@ def get_external_ip(version):
try:
requests.packages.urllib3.util.connection.allowed_gai_family = lambda: family
return requests.get(paths.MYIP_URL, timeout=5).text
except:
except requests.RequestException:
return None
finally:
requests.packages.urllib3.util.connection.allowed_gai_family = allowed_gai_family
@ -45,15 +44,8 @@ def resolve_ip(domain, query_type):
# Resolve domain name using Google Public DNS
try:
return resolver.query(domain, query_type)[0].address
except dns.exception.Timeout:
raise
except:
except dns.resolver.NXDOMAIN:
return None
def ping_url(url):
try:
return requests.get(paths.PING_URL, params={'url': url}, timeout=5).text == 'vm-pong'
except requests.exceptions.Timeout:
raise
except:
return False
return requests.get(paths.PING_URL, params={'url': url}, timeout=5).text == 'vm-pong'

View File

@ -9,6 +9,8 @@ ACME_CRON = '/etc/periodic/daily/acme.sh'
ACME_DIR = '/etc/acme.sh.d'
CERT_KEY_FILE = '/etc/ssl/services.key'
CERT_PUB_FILE = '/etc/ssl/services.pem'
TMP_KEY_FILE = '/tmp/private.pem'
TMP_PUB_FILE = '/tmp/public.pem'
# OS
ISSUE_FILE = '/etc/issue'

View File

@ -13,9 +13,8 @@ def is_valid_port(port):
try:
port = int(port)
return 0 < port < 65536 and port not in (22, 25, 80, 8080)
except:
pass
return False
except TypeError:
return False
def is_valid_email(email):
parts = email.split('@')
@ -26,6 +25,5 @@ def is_valid_repo_url(url):
try:
parsed = urlparse(url)
return parsed.scheme in ('http', 'https') and not parsed.params and not parsed.query and not parsed.fragment
except:
pass
return False
except ValueError:
return False

View File

@ -14,6 +14,15 @@ from spoc.image import Image
from . import config, crypto, net, paths, templates
BIN_ACME_SH = '/usr/bin/acme.sh'
BIN_BLKID = '/sbin/blkid'
BIN_CRYPTSETUP = '/sbin/cryptsetup'
BIN_NGINX = '/usr/sbin/nginx'
BIN_POWEROFF = '/sbin/poweroff'
BIN_REBOOT = '/sbin/reboot'
BIN_SERVICE = '/sbin/service'
CRYPTTAB_FILE = '/etc/crypttab'
def register_app(app, host, login, password):
# Register newly installed application, its subdomain and credentials (called at the end of package install.sh)
config.register_app(app, {
@ -58,10 +67,10 @@ def update_host(domain, port):
update_app_config()
def reload_nginx():
subprocess.run(['/usr/sbin/nginx', '-s', 'reload'])
subprocess.run([BIN_NGINX, '-s', 'reload'])
def restart_nginx():
subprocess.run(['/sbin/service', 'nginx', 'restart'])
subprocess.run([BIN_SERVICE, 'nginx', 'restart'])
def rebuild_issue():
# Compile the URLs displayed in terminal banner and rebuild the issue and motd files
@ -83,9 +92,9 @@ def update_common_settings(email, gmaps_api_key):
def update_password(oldpassword, newpassword):
# Update LUKS password and adminpwd for WSGI application
pwinput = f'{oldpassword}\n{newpassword}'.encode()
partition_uuid = open('/etc/crypttab').read().split()[1][5:]
partition_name = subprocess.run(['/sbin/blkid', '-U', partition_uuid], check=True, stdout=subprocess.PIPE).stdout.decode().strip()
subprocess.run(['/sbin/cryptsetup', 'luksChangeKey', partition_name], input=pwinput, check=True)
partition_uuid = open(CRYPTTAB_FILE).read().split()[1][5:]
partition_name = subprocess.run([BIN_BLKID, '-U', partition_uuid], check=True, stdout=subprocess.PIPE).stdout.decode().strip()
subprocess.run([BIN_CRYPTSETUP, 'luksChangeKey', partition_name], input=pwinput, check=True)
# Update bcrypt-hashed password in config
hash = crypto.adminpwd_hash(newpassword)
config.set_host('adminpwd', hash)
@ -106,12 +115,12 @@ def request_acme_cert():
certs = [i for i in os.listdir(paths.ACME_DIR) if i not in ('account.conf', 'ca', 'http.header')]
for cert in certs:
if cert != domain:
subprocess.run(['/usr/bin/acme.sh', '--home', paths.ACME_DIR, '--remove', '-d', cert])
subprocess.run([BIN_ACME_SH, '--home', paths.ACME_DIR, '--remove', '-d', cert])
except FileNotFoundError:
pass
# Compile an acme.sh command for certificate requisition only if the certificate hasn't been requested before
if not os.path.exists(os.path.join(paths.ACME_DIR, domain)):
cmd = ['/usr/bin/acme.sh', '--home', paths.ACME_DIR, '--issue', '-d', domain]
cmd = [BIN_ACME_SH, '--home', paths.ACME_DIR, '--issue', '-d', domain]
for app,definition in config.get_apps().items():
cmd += ['-d', f'{definition["host"]}.{domain}']
cmd += ['-w', paths.ACME_DIR]
@ -120,13 +129,13 @@ def request_acme_cert():
# Otherwise just try to renew
else:
try:
subprocess.run(['/usr/bin/acme.sh', '--home', paths.ACME_DIR, '--renew', '-d', domain], check=True)
subprocess.run([BIN_ACME_SH, '--home', paths.ACME_DIR, '--renew', '-d', domain], check=True)
except subprocess.CalledProcessError as e:
# return code 2 means skipped renew, which is OK
if e.returncode != 2:
raise
# Install the issued certificate
subprocess.run(['/usr/bin/acme.sh', '--home', paths.ACME_DIR, '--install-cert', '-d', domain, '--key-file', paths.CERT_KEY_FILE, '--fullchain-file', paths.CERT_PUB_FILE, '--reloadcmd', '/sbin/service nginx reload'], check=True)
subprocess.run([BIN_ACME_SH, '--home', paths.ACME_DIR, '--install-cert', '-d', domain, '--key-file', paths.CERT_KEY_FILE, '--fullchain-file', paths.CERT_PUB_FILE, '--reloadcmd', f'{BIN_SERVICE} nginx reload'], check=True)
# Enable acme.sh cronjob
os.chmod(paths.ACME_CRON, 0o750)
@ -141,10 +150,10 @@ def install_manual_cert(public_file, private_file):
reload_nginx()
def shutdown_vm():
subprocess.run(['/sbin/poweroff'])
subprocess.run([BIN_POWEROFF])
def reboot_vm():
subprocess.run(['/sbin/reboot'])
subprocess.run([BIN_REBOOT])
def start_app(app_name, queue):
# Enqueue application start
@ -182,7 +191,7 @@ def update_app_config(app_name=None):
env['HOST'] = f'{apps[app_name]["host"]}.{host["domain"]}'
try:
subprocess.run(script_path, cwd=install_dir, env=env)
except:
except FileNotFoundError:
pass
def is_app_started(app_name):

View File

@ -1,19 +1,22 @@
# -*- coding: utf-8 -*-
import cryptography.exceptions
import dns.exception
import json
import logging
import os
from cryptography.exceptions import InvalidSignature
import requests.exceptions
import subprocess
from jinja2 import Environment, FileSystemLoader
from math import floor
from pkg_resources import parse_version
from spoc import repo_online, repo_local
from werkzeug.exceptions import HTTPException, NotFound, Unauthorized
from werkzeug.exceptions import HTTPException, NotFound
from werkzeug.routing import Map, Rule
from werkzeug.utils import redirect
from werkzeug.wrappers import Request, Response
from . import config, crypto, net, remote, validator, vmmgr
from . import config, crypto, net, paths, remote, validator, vmmgr
from .actionqueue import ActionQueue, ActionItemType
from .wsgilang import WSGILang
from .wsgisession import WSGISession
@ -166,12 +169,13 @@ class WSGIApp:
try:
# Repopulate online_repo cache or fail early when the repo can't be reached
repo_online.load(True)
except InvalidSignature:
except cryptography.exceptions.InvalidSignature:
repo_error = request.session.lang.invalid_packages_signature()
except Unauthorized:
repo_error = request.session.lang.repo_invalid_credentials()
except:
repo_error = request.session.lang.repo_unavailable()
except requests.exceptions.RequestException as e:
if isinstance(e, requests.exceptions.HTTPError) and e.response.status_code == 401:
repo_error = request.session.lang.repo_invalid_credentials()
else:
repo_error = request.session.lang.repo_unavailable()
table = self.render_setup_apps_table(request)
message = self.get_session_message(request)
repo_conf = vmmgr.get_repo_conf()
@ -183,7 +187,7 @@ class WSGIApp:
local_apps = repo_local.get_apps()
try:
online_apps = repo_online.get_apps()
except:
except (cryptography.exceptions.InvalidSignature, requests.exceptions.RequestException):
online_apps = {}
apps_config = config.get_apps()
actionable_apps = sorted(set(online_apps) | set(local_apps))
@ -205,7 +209,7 @@ class WSGIApp:
app_queue = pending_actions[app]
if app_queue.exception:
# Display failed task
if isinstance(app_queue.exception, InvalidSignature):
if isinstance(app_queue.exception, cryptography.exceptions.InvalidSignature):
status = lang.repo_package_invalid_signature()
elif isinstance(app_queue.exception, NotFound):
status = lang.repo_package_missing()
@ -314,8 +318,8 @@ class WSGIApp:
return self.render_json({'error': request.session.lang.dns_record_mismatch(domain, a, ipv4)})
if aaaa and aaaa != ipv6:
return self.render_json({'error': request.session.lang.dns_record_mismatch(domain, aaaa, ipv6)})
except:
return self.render_json({'error': request.session.lang.dns_timeout()})
except dns.exception.DNSException:
return self.render_json({'error': request.session.lang.dns_error()})
return self.render_json({'ok': request.session.lang.dns_records_ok()})
def verify_http_action(self, request, **kwargs):
@ -327,10 +331,11 @@ class WSGIApp:
for domain in domains:
url = net.compile_url(domain, port, proto)
try:
if not net.ping_url(url):
return self.render_json({'error': request.session.lang.http_host_not_reachable(url)})
except:
net.ping_url(url)
except requests.exceptions.Timeout:
return self.render_json({'error': request.session.lang.http_timeout()})
except requests.exceptions.RequestException:
return self.render_json({'error': request.session.lang.http_host_not_reachable(url)})
return self.render_json({'ok': request.session.lang.http_hosts_ok(port)})
def update_cert_action(self, request):
@ -344,11 +349,11 @@ class WSGIApp:
return self.render_json({'error': request.session.lang.cert_file_missing()})
if not request.files['private']:
return self.render_json({'error': request.session.lang.key_file_missing()})
request.files['public'].save('/tmp/public.pem')
request.files['private'].save('/tmp/private.pem')
vmmgr.install_manual_cert('/tmp/public.pem', '/tmp/private.pem')
os.unlink('/tmp/public.pem')
os.unlink('/tmp/private.pem')
request.files['public'].save(paths.TMP_PUB_FILE)
request.files['private'].save(paths.TMP_KEY_FILE)
vmmgr.install_manual_cert(paths.TMP_PUB_FILE, paths.TMP_KEY_FILE)
os.unlink(paths.TMP_PUB_FILE)
os.unlink(paths.TMP_KEY_FILE)
else:
return self.render_json({'error': request.session.lang.cert_request_error()})
host = config.get_host()
@ -424,14 +429,14 @@ class WSGIApp:
def update_password_action(self, request):
# Updates password for both HDD encryption (LUKS-on-LVM) and web interface admin account
if request.form['newpassword'] != request.form['newpassword2']:
return self.render_json({'error': request.session.lang.password_mismatch()})
if request.form['newpassword'] == '':
return self.render_json({'error': request.session.lang.password_empty()})
try:
if request.form['newpassword'] != request.form['newpassword2']:
return self.render_json({'error': request.session.lang.password_mismatch()})
if request.form['newpassword'] == '':
return self.render_json({'error': request.session.lang.password_empty()})
# No need to explicitly validate old password, vmmgr.update_password will raise exception if it's wrong
vmmgr.update_password(request.form['oldpassword'], request.form['newpassword'])
except:
except subprocess.CalledProcessError:
return self.render_json({'error': request.session.lang.bad_password()})
return self.render_json({'ok': request.session.lang.password_changed()})

View File

@ -8,7 +8,7 @@ class WSGILang:
'host_updated': 'Nastavení hostitele bylo úspěšně změněno. Přejděte na URL <a href="{}">{}</a> a pokračujte následujícími kroky.',
'dns_record_does_not_exist': 'DNS záznam pro název "{}" neexistuje.',
'dns_record_mismatch': 'DNS záznam pro název "{}" směřuje na IP {} místo očekávané {}.',
'dns_timeout': 'Nepodařilo se kontaktovat DNS server. Zkontrolujte, zda má virtuální stroj přístup k internetu.',
'dns_error': 'Nepodařilo se získat platnou odpověď z DNS serveru. Zkontrolujte, zda má virtuální stroj přístup k internetu.',
'dns_records_ok': 'DNS záznamy jsou nastaveny správně.',
'http_host_not_reachable': 'Adresa {} není dostupná z internetu. Zkontrolujte nastavení síťových komponent.',
'http_timeout': 'Nepodařilo se kontaktovat ping server. Zkontrolujte, zda má virtuální stroj přístup k internetu.',