# -*- coding: utf-8 -*- import bcrypt import dns.exception import dns.resolver import os import requests import socket import ssl import subprocess NULL_IP = '[100::1]' def compile_url(domain, port, proto='https'): port = ':{}'.format(port) if (proto == 'https' and port != '443') or (proto == 'http' and port != '80') else '' host = '{}{}'.format(domain, port) return '{}://{}'.format(proto, host) if proto is not None else host def get_container_ip(app): # Return an IP address of a container. If the container is not running, return address from IPv6 discard prefix instead try: return subprocess.run(['/usr/bin/docker', 'inspect', '-f', '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}', app], check=True, stdout=subprocess.PIPE).stdout.decode().strip() except: return NULL_IP def get_local_ipv4(): # Return first routable IPv4 address try: return subprocess.run(['/sbin/ip', 'route', 'get', '1'], check=True, stdout=subprocess.PIPE).stdout.decode().split()[-1] except: return None def get_local_ipv6(): # Return first routable IPv6 address try: return subprocess.run(['/sbin/ip', 'route', 'get', '2003::'], check=True, stdout=subprocess.PIPE).stdout.decode().split()[-3] except: return None def get_external_ip(family): # Return external IP address of given family via 3rd party service allowed_gai_family = requests.packages.urllib3.util.connection.allowed_gai_family try: requests.packages.urllib3.util.connection.allowed_gai_family = lambda: family return requests.get('https://tools.dasm.cz/myip.php', timeout=5).text except: return None finally: requests.packages.urllib3.util.connection.allowed_gai_family = allowed_gai_family def get_external_ipv4(): # Return external IPv4 address return get_external_ip(socket.AF_INET) def get_external_ipv6(): # Return external IPv6 address return get_external_ip(socket.AF_INET6) resolver = dns.resolver.Resolver() resolver.timeout = 3 resolver.lifetime = 3 resolver.nameservers = ['8.8.8.8', '8.8.4.4', '2001:4860:4860::8888', '2001:4860:4860::8844'] def resolve_ip(domain, type): # Resolve domain name using Google Public DNS try: return resolver.query(domain, type)[0].address except dns.exception.Timeout: raise except: return None def ping_url(url): try: return requests.post('https://tools.dasm.cz/spotter-ping.php', data = {'url': url}, timeout=5).text == 'spotter-pong' except requests.exceptions.Timeout: raise except: return False def is_service_started(app): # Check OpenRC service status without calling any binary return os.path.exists(os.path.join('/run/openrc/started', app)) def is_service_autostarted(app): # Check OpenRC service enablement return os.path.exists(os.path.join('/etc/runlevels/default', app)) def start_service(service): subprocess.run(['/sbin/service', service, 'start'], check=True) def stop_service(service): subprocess.run(['/sbin/service', service, 'stop'], check=True) def restart_service(service): subprocess.run(['/sbin/service', service, 'restart']) def reload_nginx(): subprocess.run(['/sbin/service', 'nginx', 'reload']) def restart_nginx(): restart_service('nginx') def get_cert_info(): data = ssl._ssl._test_decode_cert('/etc/ssl/certs/services.pem') data['subject'] = dict(data['subject'][i][0] for i in range(len(data['subject']))) data['issuer'] = dict(data['issuer'][i][0] for i in range(len(data['issuer']))) return data def adminpwd_hash(password): return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode() def adminpwd_verify(password, hash): return bcrypt.checkpw(password.encode(), hash.encode()) def update_luks_password(oldpassword, newpassword): input = '{}\n{}'.format(oldpassword, newpassword).encode() subprocess.run(['cryptsetup', 'luksChangeKey', '/dev/sda2'], input=input, check=True) def shutdown_vm(): subprocess.run(['/sbin/poweroff']) def reboot_vm(): subprocess.run(['/sbin/reboot'])