# -*- coding: utf-8 -*- import bcrypt import dns.exception import dns.resolver import os import requests import shutil import socket import subprocess from cryptography import x509 from cryptography.hazmat.backends import default_backend from cryptography.x509.oid import NameOID def compile_url(domain, port, proto='https'): port = '' if (proto == 'https' and port == '443') or (proto == 'http' and port == '80') else ':{}'.format(port) return '{}://{}{}'.format(proto, domain, port) def get_local_ipv4(): # Return first routable IPv4 address of the VM (container host) try: return subprocess.run(['/sbin/ip', 'route', 'get', '1'], check=True, stdout=subprocess.PIPE).stdout.decode().split()[-1] except: return None def get_local_ipv6(): # Return first routable IPv6 address of the VM (container host) try: return subprocess.run(['/sbin/ip', 'route', 'get', '2003::'], check=True, stdout=subprocess.PIPE).stdout.decode().split()[-3] except: return None def get_external_ip(family): # Return external IP address of given family via 3rd party service allowed_gai_family = requests.packages.urllib3.util.connection.allowed_gai_family try: requests.packages.urllib3.util.connection.allowed_gai_family = lambda: family return requests.get('https://tools.dasm.cz/myip.php', timeout=5).text except: return None finally: requests.packages.urllib3.util.connection.allowed_gai_family = allowed_gai_family def get_external_ipv4(): # Return external IPv4 address return get_external_ip(socket.AF_INET) def get_external_ipv6(): # Return external IPv6 address return get_external_ip(socket.AF_INET6) resolver = dns.resolver.Resolver() resolver.timeout = 3 resolver.lifetime = 3 resolver.nameservers = ['8.8.8.8', '8.8.4.4', '2001:4860:4860::8888', '2001:4860:4860::8844'] def resolve_ip(domain, type): # Resolve domain name using Google Public DNS try: return resolver.query(domain, type)[0].address except dns.exception.Timeout: raise except: return None def ping_url(url): try: return requests.get('https://tools.dasm.cz/vm-ping.php', params = {'url': url}, timeout=5).text == 'vm-pong' except requests.exceptions.Timeout: raise except: return False def is_service_started(app): # Check OpenRC service status without calling any binary return os.path.exists(os.path.join('/run/openrc/started', app)) def is_service_autostarted(app): # Check OpenRC service enablement return os.path.exists(os.path.join('/etc/runlevels/default', app)) def start_service(service): subprocess.run(['/sbin/service', service, 'start'], check=True) def stop_service(service): subprocess.run(['/sbin/service', service, 'stop'], check=True) def restart_service(service): subprocess.run(['/sbin/service', service, 'restart']) def reload_nginx(): subprocess.run(['/usr/sbin/nginx', '-s', 'reload']) def restart_nginx(): restart_service('nginx') def get_cert_info(cert): # Gather certificate data important for setup-host with open(cert, 'rb') as f: cert = x509.load_pem_x509_certificate(f.read(), default_backend()) data = {'subject': cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value, 'issuer': cert.issuer.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value, 'expires': '{} UTC'.format(cert.not_valid_after), 'method': 'manual'} if os.path.exists('/etc/periodic/daily/acme-sh'): data['method'] = 'letsencrypt' # This is really naive method of inferring if the cert is selfsigned and should never be used in production :) elif data['subject'] == data['issuer']: data['method'] = 'selfsigned' return data def adminpwd_hash(password): return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode() def adminpwd_verify(password, hash): return bcrypt.checkpw(password.encode(), hash.encode()) def shutdown_vm(): subprocess.run(['/sbin/poweroff']) def reboot_vm(): subprocess.run(['/sbin/reboot']) def get_unused_ip(): # This is a poor man's DHCP server which uses /etc/hosts as lease database # Leases the first unused IP from range 172.17.0.0/16 leased = [] with open('/etc/hosts', 'r') as f: for line in f.read().splitlines(): if line.startswith('172.17'): ip = line.split()[0].split('.') leased.append(int(ip[2]) * 256 + int(ip[3])) for i in range(1, 65534): if i not in leased: break return '172.17.{}.{}'. format(i // 256, i % 256) def update_hosts_lease(ip, app): hosts = [] with open('/etc/hosts', 'r') as f: for line in f: if not line.strip().endswith(' {}'.format(app)): hosts.append(line) if ip: hosts.append('{} {}\n'.format(ip, app)) with open('/etc/hosts', 'w') as f: f.writelines(hosts) def set_container_ip(pid, ip): # Set IP in container based on PID given via lxc.hook.start-host hook cmd = 'ip addr add {}/16 broadcast 172.17.255.255 dev eth0 && ip route add default via 172.17.0.1'.format(ip) subprocess.run(['nsenter', '-a', '-t', pid, '--', '/bin/sh', '-c', cmd]) def clean_ephemeral_layer(app): layer = os.path.join('/var/lib/lxc', app, 'delta0') if os.path.exists(layer): for item in os.scandir(layer): shutil.rmtree(item.path) if item.is_dir() else os.unlink(item.path)