108 lines
3.6 KiB
Python
108 lines
3.6 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import os
|
|
import subprocess
|
|
|
|
from .paths import AUTHORIZED_KEYS, INTERFACES_FILE, WG_CONF_FILE, WG_CONF_FILE_DISABLED
|
|
|
|
def get_authorized_keys():
|
|
# Fetches content of root's authorized_files
|
|
try:
|
|
with open(AUTHORIZED_KEYS) as f:
|
|
return f.read()
|
|
except FileNotFoundError:
|
|
return ''
|
|
|
|
def set_authorized_keys(keys):
|
|
# Saves content of root's authorized_files
|
|
with open(AUTHORIZED_KEYS, 'w') as f:
|
|
f.write(keys)
|
|
# Enable or disable SSH service
|
|
if keys.strip():
|
|
subprocess.run(['/sbin/rc-update', 'add', 'sshd', 'boot'])
|
|
subprocess.run(['/sbin/service', 'sshd', 'start'])
|
|
else:
|
|
subprocess.run(['/sbin/service', 'sshd', 'stop'])
|
|
subprocess.run(['/sbin/rc-update', 'del', 'sshd', 'boot'])
|
|
|
|
def is_wireguard_running():
|
|
# Returns status of wg0 interface (inferred from existence of its config file)
|
|
return os.path.exists(WG_CONF_FILE)
|
|
|
|
def regenerate_wireguard_key():
|
|
# Regenerates WireGuard key pair for wg0 interface
|
|
was_running = is_wireguard_running()
|
|
if was_running:
|
|
stop_wireguard()
|
|
privkey = subprocess.run(['wg', 'genkey'], stdout=subprocess.PIPE).stdout.decode().strip()
|
|
with open(WG_CONF_FILE_DISABLED) as f:
|
|
conf_lines = f.readlines()
|
|
conf_lines[2] = f'PrivateKey = {privkey}\n'
|
|
with open(WG_CONF_FILE_DISABLED, 'w') as f:
|
|
f.writelines(conf_lines)
|
|
if was_running:
|
|
start_wireguard()
|
|
return privkey
|
|
|
|
def get_wireguard_conf():
|
|
# Returns dictionary with WireGuard wg0 interface VPN IP, public key, listen port and peer info
|
|
result = {'running': is_wireguard_running()}
|
|
# IP
|
|
with open(INTERFACES_FILE) as f:
|
|
for line in f.read().splitlines():
|
|
if '172.17.255' in line:
|
|
result['ip'] = line.split('.')[-1]
|
|
break
|
|
# Listen port and peers
|
|
with open(WG_CONF_FILE if result['running'] else WG_CONF_FILE_DISABLED) as f:
|
|
conf_lines = f.read().splitlines()
|
|
result['port'] = conf_lines[1].split()[2]
|
|
result['peers'] = '\n'.join(conf_lines[4:])
|
|
# Public key
|
|
privkey = conf_lines[2].split()[2]
|
|
if privkey == 'None':
|
|
privkey = regenerate_wireguard_key()
|
|
p = subprocess.Popen(['wg', 'pubkey'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
|
|
result['pubkey'] = p.communicate(privkey.encode())[0].decode().strip()
|
|
return result
|
|
|
|
def set_wireguard_conf(ip, port, peers):
|
|
# Sets WireGuard wg0 interface VPN IP, listen port and peer info
|
|
was_running = is_wireguard_running()
|
|
if was_running:
|
|
stop_wireguard()
|
|
# IP
|
|
interface_lines = []
|
|
with open(INTERFACES_FILE) as f:
|
|
for line in f.readlines():
|
|
if '172.17.255' in line:
|
|
line = f' address 172.17.255.{ip}\n'
|
|
interface_lines.append(line)
|
|
with open(INTERFACES_FILE, 'w') as f:
|
|
f.writelines(interface_lines)
|
|
# Recreate config (listen port and peers)
|
|
with open(WG_CONF_FILE_DISABLED) as f:
|
|
conf_lines = f.readlines()[:4]
|
|
conf_lines[1] = f'ListenPort = {port}\n'
|
|
with open(WG_CONF_FILE_DISABLED, 'w') as f:
|
|
f.writelines(conf_lines)
|
|
f.write(peers)
|
|
if was_running:
|
|
start_wireguard()
|
|
|
|
def start_wireguard():
|
|
# Sets up WireGuard interface
|
|
try:
|
|
os.rename(WG_CONF_FILE_DISABLED, WG_CONF_FILE)
|
|
except FileNotFoundError:
|
|
subprocess.run(['ifdown', 'wg0'])
|
|
subprocess.run(['ifup', 'wg0'])
|
|
|
|
def stop_wireguard():
|
|
# Tears down WireGuard interface
|
|
subprocess.run(['ifdown', 'wg0'])
|
|
try:
|
|
os.rename(WG_CONF_FILE, WG_CONF_FILE_DISABLED)
|
|
except FileNotFoundError:
|
|
pass
|