vmmgr/usr/lib/python3.8/vmmgr/appmgr.py

101 lines
3.6 KiB
Python

# -*- coding: utf-8 -*-
import os
import subprocess
#from .pkgmgr import Pkg, PkgMgr
class AppMgr:
def __init__(self, conf):
self.conf = conf
self.pkgmgr = None #PkgMgr(conf)
def start_app(self, item):
# Start the actual app service
app = item.key
if app in self.conf['apps'] and not self.is_service_started(app):
self.start_service(app)
def start_service(self, service):
subprocess.run(['/sbin/service', service, 'start'], check=True)
def stop_app(self, item):
# Stop the actual app service
app = item.key
if app in self.conf['apps'] and self.is_service_started(app):
self.stop_service(app)
# Stop the app service's dependencies if they are not used by another running app
deps = self.get_services_deps()
for dep in self.get_service_deps(app):
if not any([self.is_service_started(d) for d in deps[dep]]):
self.stop_service(dep)
def stop_service(self, service):
subprocess.run(['/sbin/service', service, 'stop'], check=True)
def update_app_visibility(self, app, visible):
# Update visibility for the app in the configuration
if app in self.conf['apps']:
self.conf['apps'][app]['visible'] = visible
self.conf.save()
def update_app_autostart(self, app, enabled):
# Add/remove the app to OpenRC default runlevel
if app in self.conf['apps']:
subprocess.run(['/sbin/rc-update', 'add' if enabled else 'del', app])
def is_service_started(self, app):
# Check OpenRC service status without calling any binary
return os.path.exists(os.path.join('/run/openrc/started', app))
def is_service_autostarted(self, app):
# Check OpenRC service enablement
return os.path.exists(os.path.join('/etc/runlevels/default', app))
def install_app(self, item):
# Main installation function. Wrapper for download, registration and install script
item.data = None #Pkg()
self.pkgmgr.install_app(item.key, item.data)
def uninstall_app(self, item):
# Main uninstallation function. Wrapper for uninstall script, filesystem purge and unregistration
app = item.key
self.stop_app(item)
if self.is_service_autostarted(app):
self.update_app_autostart(app, False)
self.pkgmgr.uninstall_app(app)
def update_app(self, item):
# Main update function. Wrapper for download and update script
self.stop_app(item)
item.data = None #Pkg()
self.pkgmgr.update_app(item.key, item.data)
def get_services_deps(self):
# Fisrt, build a dictionary of {app: [needs]}
needs = {}
for app in self.conf['apps'].copy():
needs[app] = self.get_service_deps(app)
# Then reverse it to {need: [apps]}
deps = {}
for app, need in needs.items():
for n in need:
deps.setdefault(n, []).append(app)
return deps
def get_service_deps(self, app):
# Get "need" line from init script and split it to a list
try:
with open(os.path.join('/etc/init.d', app), 'r') as f:
return [l for l in f.readlines() if l.strip().startswith('need')][0].split()[1:]
except:
pass
return []
def update_repo_settings(self, url, user, pwd):
# Update lxc repository configuration
self.conf['repo']['url'] = url
self.conf['repo']['user'] = user
self.conf['repo']['pwd'] = pwd
self.conf.save()