Add tox, pylint, pytest, fix reported findings

This commit is contained in:
Disassembler 2021-12-20 22:21:58 +01:00
parent c3a73f2b28
commit 855c5526f6
No known key found for this signature in database
GPG Key ID: 524BD33A0EE29499
18 changed files with 115 additions and 82 deletions

View File

@ -11,7 +11,6 @@ classifiers =
Intended Audience :: System Administrators Intended Audience :: System Administrators
License :: OSI Approved :: GNU General Public License v3 or later License :: OSI Approved :: GNU General Public License v3 or later
Operating System :: POSIX Operating System :: POSIX
Programming Language :: Python :: 3.5
Programming Language :: Python :: 3.6 Programming Language :: Python :: 3.6
Programming Language :: Python :: 3.7 Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8 Programming Language :: Python :: 3.8
@ -23,7 +22,7 @@ classifiers =
packages = find: packages = find:
package_dir = = src package_dir = = src
py_modules = spoc_cli py_modules = spoc_cli
python_requires = >= 3.5 python_requires = >= 3.6
install_requires = requests install_requires = requests
[options.packages.find] [options.packages.find]
@ -38,3 +37,24 @@ testpaths = tests
[coverage:run] [coverage:run]
branch = True branch = True
omit =
*/dist-packages/*
*/site-packages/*
[pylint.BASIC]
good-names = e,ex,f,_
[pylint.'MESSAGES CONTROL']
disable = missing-docstring
[tox:tox]
[testenv:{pylint,pytest}]
skipsdist = True
usedevelop = True
deps =
pylint
pytest-cov
commands =
pytest: pytest -vv --cov src --cov tests --cov-report term --cov-report xml --cov-fail-under 100 {posargs}
pylint: pylint src tests {posargs}

View File

@ -11,7 +11,7 @@ class App:
self.app_name = app_name self.app_name = app_name
self.env_file = os.path.join(config.DATA_DIR, f'{app_name}.env') self.env_file = os.path.join(config.DATA_DIR, f'{app_name}.env')
def install(self, update=False): def install(self, is_update=False):
definition = repo.get_apps()[self.app_name] definition = repo.get_apps()[self.app_name]
version = definition['version'] version = definition['version']
containers = definition['containers'] containers = definition['containers']
@ -21,7 +21,7 @@ class App:
for container in containers.values(): for container in containers.values():
volumes |= set(container.get('volumes', {})) volumes |= set(container.get('volumes', {}))
existing_volumes = self.get_existing_volumes() existing_volumes = self.get_existing_volumes()
if update: if is_update:
# Remove volumes no longer referenced by the containers # Remove volumes no longer referenced by the containers
volumes_to_remove = existing_volumes - volumes volumes_to_remove = existing_volumes - volumes
volumes -= existing_volumes volumes -= existing_volumes
@ -33,7 +33,7 @@ class App:
# Create env file # Create env file
envs = definition.get('environment', {}) envs = definition.get('environment', {})
if update: if is_update:
# Keep old values on update # Keep old values on update
for key,value in self.read_env_vars().items(): for key,value in self.read_env_vars().items():
if key in envs: if key in envs:
@ -44,6 +44,9 @@ class App:
self.create_pod(version) self.create_pod(version)
self.create_containers(containers) self.create_containers(containers)
def update(self):
self.install(is_update=True)
def uninstall(self): def uninstall(self):
autostart.set_app(self.app_name, False) autostart.set_app(self.app_name, False)
self.remove_pod() self.remove_pod()
@ -60,7 +63,7 @@ class App:
def read_env_vars(self): def read_env_vars(self):
env_vars = {} env_vars = {}
try: try:
with open(self.env_file) as f: with open(self.env_file, encoding='utf-8') as f:
lines = f.read().splitlines() lines = f.read().splitlines()
for line in lines: for line in lines:
key,value = line.split('=', 1) key,value = line.split('=', 1)
@ -71,7 +74,7 @@ class App:
def write_env_vars(self, env_vars): def write_env_vars(self, env_vars):
os.makedirs(config.DATA_DIR, exist_ok=True) os.makedirs(config.DATA_DIR, exist_ok=True)
with open(self.env_file, 'w') as f: with open(self.env_file, 'w', encoding='utf-8') as f:
for key,value in env_vars.items(): for key,value in env_vars.items():
f.write(f'{key}={value}\n') f.write(f'{key}={value}\n')
@ -125,7 +128,7 @@ def install(app_name):
App(app_name).install() App(app_name).install()
def update(app_name): def update(app_name):
App(app_name).install(update=True) App(app_name).update()
def uninstall(app_name): def uninstall(app_name):
App(app_name).uninstall() App(app_name).uninstall()

View File

@ -4,7 +4,7 @@ from . import config
def get_apps(): def get_apps():
try: try:
with open(config.AUTOSTART_FILE) as f: with open(config.AUTOSTART_FILE, encoding='utf-8') as f:
lines = f.read().splitlines() lines = f.read().splitlines()
return set(lines) return set(lines)
except FileNotFoundError: except FileNotFoundError:
@ -20,6 +20,6 @@ def set_app(app_name, enabled):
except KeyError: except KeyError:
return return
os.makedirs(config.DATA_DIR, exist_ok=True) os.makedirs(config.DATA_DIR, exist_ok=True)
with open(config.AUTOSTART_FILE, 'w') as f: with open(config.AUTOSTART_FILE, 'w', encoding='utf-8') as f:
for app in apps: for app in apps:
f.write(f'{app}\n') f.write(f'{app}\n')

View File

@ -14,7 +14,7 @@ def read_auth():
global REGISTRY_HOST, REGISTRY_AUTH, REPO_FILE_URL # pylint: disable=global-statement global REGISTRY_HOST, REGISTRY_AUTH, REPO_FILE_URL # pylint: disable=global-statement
try: try:
with open(REGISTRY_AUTH_FILE) as f: with open(REGISTRY_AUTH_FILE, encoding='utf-8') as f:
data = json.load(f) data = json.load(f)
REGISTRY_HOST = next(iter(data['auths'].keys())) REGISTRY_HOST = next(iter(data['auths'].keys()))
auth = b64decode(data['auths'][REGISTRY_HOST]['auth'].encode()).decode() auth = b64decode(data['auths'][REGISTRY_HOST]['auth'].encode()).decode()
@ -29,7 +29,7 @@ def write_auth(host, username, password):
b64auth = b64encode(f'{username}:{password}'.encode()).decode() b64auth = b64encode(f'{username}:{password}'.encode()).decode()
data = json.dumps({'auths': {host: {'auth': b64auth}}}) data = json.dumps({'auths': {host: {'auth': b64auth}}})
with open(REGISTRY_AUTH_FILE, 'w') as f: with open(REGISTRY_AUTH_FILE, 'w', encoding='utf-8') as f:
f.write(data) f.write(data)
REGISTRY_HOST = host REGISTRY_HOST = host
REGISTRY_AUTH = (username, password) REGISTRY_AUTH = (username, password)

View File

@ -11,7 +11,7 @@ class CircularDependencyError(Exception):
return '\n'.join(result) return '\n'.join(result)
class MissingDependencyError(Exception): class MissingDependencyError(Exception):
# Dependecy solver has found an items depents on a nonexistent item # Dependecy solver has found an item that depends on a nonexistent item
def __init__(self, deps, missing): def __init__(self, deps, missing):
super().__init__(deps, missing) super().__init__(deps, missing)
self.deps = deps self.deps = deps

View File

@ -8,18 +8,18 @@ from contextlib import contextmanager
from . import config from . import config
def print_lock(pid): def print_lock(pid):
with open(os.path.join('/proc', pid, 'cmdline')) as f: with open(os.path.join('/proc', pid, 'cmdline'), 'rb') as f:
cmdline = f.read().replace('\0', ' ').strip() cmdline = f.read().decode().replace('\0', ' ').strip()
print(f'Waiting for lock currently held by process {pid} - {cmdline}', file=sys.stderr) print(f'Waiting for lock currently held by process {pid} - {cmdline}', file=sys.stderr)
@contextmanager @contextmanager
def locked(): def locked():
with open(config.LOCK_FILE, 'a'): with open(config.LOCK_FILE, 'a', encoding='utf-8'):
# Open the lock file in append mode first to ensure its existence # Open the lock file in append mode first to ensure its existence
# but not modify any data if it already exists # but not modify any data if it already exists
pass pass
# Open the lock file in read + write mode without truncation # Open the lock file in read + write mode without truncation
with open(config.LOCK_FILE, 'r+') as f: with open(config.LOCK_FILE, 'r+', encoding='utf-8') as f:
lock_printed = False lock_printed = False
while True: while True:
try: try:

View File

@ -54,17 +54,21 @@ def remove_pod(app_name):
stop_pod(app_name) stop_pod(app_name)
run(['podman', 'pod', 'rm', '--ignore', app_name]) run(['podman', 'pod', 'rm', '--ignore', app_name])
def create_container(app_name, cnt_name, image, env_file=None, volumes=None, def create_container(app_name, cnt_name, image, **kwargs):
requires=None, hosts=None):
cmd = ['podman', 'container', 'create', '--name', cnt_name, '--pod', app_name, cmd = ['podman', 'container', 'create', '--name', cnt_name, '--pod', app_name,
'--subgidname', 'spoc', '--subgidname', 'spoc',
'--restart', 'unless-stopped'] '--restart', 'unless-stopped']
env_file = kwargs.get('env_file')
if env_file: if env_file:
cmd.extend(['--env-file', env_file]) cmd.extend(['--env-file', env_file])
requires = kwargs.get('requires')
if requires: if requires:
cmd.extend(['--requires', ','.join(sorted(requires))]) cmd.extend(['--requires', ','.join(sorted(requires))])
volumes = kwargs.get('volumes')
if volumes: if volumes:
for volume,mount in sorted(volumes.items(), key=lambda x: x[1]): for volume,mount in sorted(volumes.items(), key=lambda x: x[1]):
cmd.extend(['--volume', f'{volume}:{mount}']) cmd.extend(['--volume', f'{volume}:{mount}'])
hosts = kwargs.get('hosts')
if hosts: if hosts:
for host in sorted(hosts): for host in sorted(hosts):
cmd.extend(['--add-host', f'{host}:127.0.0.1']) cmd.extend(['--add-host', f'{host}:127.0.0.1'])
@ -72,5 +76,4 @@ def create_container(app_name, cnt_name, image, env_file=None, volumes=None,
run(cmd) run(cmd)
def prune(): def prune():
run(['podman', 'image', 'prune', '--all', '--force']) run(['podman', 'image', 'prune', '--all', '--force', '--volumes'])
run(['podman', 'volume', 'prune', '--force'])

View File

@ -2,16 +2,16 @@ import requests
from . import config from . import config
_data = {} _DATA = {}
def load(force=False): def load(force=False):
global _data # pylint: disable=global-statement global _DATA # pylint: disable=global-statement
if not _data or force: if not _DATA or force:
_data = {} _DATA = {}
response = requests.get(config.REPO_FILE_URL, auth=config.REGISTRY_AUTH, timeout=5) response = requests.get(config.REPO_FILE_URL, auth=config.REGISTRY_AUTH, timeout=5)
response.raise_for_status() response.raise_for_status()
_data = response.json() _DATA = response.json()
def get_apps(): def get_apps():
load() load()
return _data return _DATA

View File

@ -1,8 +1,9 @@
import argparse import argparse
import getpass import getpass
import requests
import sys import sys
import requests
import spoc import spoc
APP_ERROR_STRINGS = { APP_ERROR_STRINGS = {
@ -18,11 +19,11 @@ def handle_app_error(exception):
def handle_repo_error(exception): def handle_repo_error(exception):
if isinstance(exception, requests.HTTPError): if isinstance(exception, requests.HTTPError):
status = exception.response.status_code status_code = exception.response.status_code
if status == 401: if status_code == 401:
reason = 'Invalid username/password' reason = 'Invalid username/password'
else: else:
reason = f'{status} {exception.response.reason}' reason = f'{status_code} {exception.response.reason}'
else: else:
ex_type = type(exception) ex_type = type(exception)
reason = f'{ex_type.__module__}.{ex_type.__name__}' reason = f'{ex_type.__module__}.{ex_type.__name__}'
@ -136,7 +137,7 @@ def parse_args(args=None):
return parser.parse_args(args) return parser.parse_args(args)
def main(): def main(): # pylint: disable=too-many-branches
args = parse_args() args = parse_args()
try: try:
if args.action is listing: if args.action is listing:

View File

@ -7,7 +7,7 @@ from spoc import config
TEST_DATA_DIR = os.path.join(os.path.dirname(__file__), 'test_data') TEST_DATA_DIR = os.path.join(os.path.dirname(__file__), 'test_data')
with open(os.path.join(TEST_DATA_DIR, 'repository.json')) as f: with open(os.path.join(TEST_DATA_DIR, 'repository.json'), encoding='utf-8') as f:
MOCK_REPODATA = json.load(f) MOCK_REPODATA = json.load(f)
MOCK_ENV = 'RAILS_ENV=test\n' \ MOCK_ENV = 'RAILS_ENV=test\n' \
@ -35,8 +35,9 @@ def test_init():
@patch('spoc.app.App.write_env_vars') @patch('spoc.app.App.write_env_vars')
@patch('spoc.app.App.create_pod') @patch('spoc.app.App.create_pod')
@patch('spoc.app.App.create_containers') @patch('spoc.app.App.create_containers')
def test_install(create_containers, create_pod, write_env_vars, read_env_vars, create_volumes, def test_install(create_containers, create_pod, write_env_vars, #pylint: disable=too-many-arguments
remove_volumes, get_existing_volumes, repo_get_apps): read_env_vars, create_volumes, remove_volumes,
get_existing_volumes, repo_get_apps):
instance = app.App('someapp') instance = app.App('someapp')
instance.install() instance.install()
@ -57,10 +58,11 @@ def test_install(create_containers, create_pod, write_env_vars, read_env_vars, c
@patch('spoc.app.App.write_env_vars') @patch('spoc.app.App.write_env_vars')
@patch('spoc.app.App.create_pod') @patch('spoc.app.App.create_pod')
@patch('spoc.app.App.create_containers') @patch('spoc.app.App.create_containers')
def test_update(create_containers, create_pod, write_env_vars, read_env_vars, create_volumes, def test_update(create_containers, create_pod, write_env_vars, #pylint: disable=too-many-arguments
remove_volumes, get_existing_volumes, repo_get_apps): read_env_vars, create_volumes, remove_volumes,
get_existing_volumes, repo_get_apps):
instance = app.App('someapp') instance = app.App('someapp')
instance.install(update=True) instance.update()
repo_get_apps.assert_called_once() repo_get_apps.assert_called_once()
get_existing_volumes.assert_called_once() get_existing_volumes.assert_called_once()
@ -111,7 +113,7 @@ def test_read_env_vars(env_open):
env_vars = instance.read_env_vars() env_vars = instance.read_env_vars()
env_file = os.path.join(config.DATA_DIR, 'someapp.env') env_file = os.path.join(config.DATA_DIR, 'someapp.env')
env_open.assert_called_once_with(env_file) env_open.assert_called_once_with(env_file, encoding='utf-8')
assert env_vars == MOCK_ENV_DATA assert env_vars == MOCK_ENV_DATA
@patch('builtins.open', side_effect=FileNotFoundError('someapp.env')) @patch('builtins.open', side_effect=FileNotFoundError('someapp.env'))
@ -120,8 +122,8 @@ def test_read_env_vars_filenotfound(env_open):
env_vars = instance.read_env_vars() env_vars = instance.read_env_vars()
env_file = os.path.join(config.DATA_DIR, 'someapp.env') env_file = os.path.join(config.DATA_DIR, 'someapp.env')
env_open.assert_called_once_with(env_file) env_open.assert_called_once_with(env_file, encoding='utf-8')
assert env_vars == {} assert not env_vars
@patch('os.makedirs') @patch('os.makedirs')
@patch('builtins.open', new_callable=mock_open) @patch('builtins.open', new_callable=mock_open)
@ -131,7 +133,7 @@ def test_write_env_vars(env_open, makedirs):
makedirs.assert_called_once_with(config.DATA_DIR, exist_ok=True) makedirs.assert_called_once_with(config.DATA_DIR, exist_ok=True)
env_file = os.path.join(config.DATA_DIR, 'someapp.env') env_file = os.path.join(config.DATA_DIR, 'someapp.env')
env_open.assert_called_once_with(env_file, 'w') env_open.assert_called_once_with(env_file, 'w', encoding='utf-8')
expected_writes = [call(line) for line in MOCK_ENV.splitlines(True)] expected_writes = [call(line) for line in MOCK_ENV.splitlines(True)]
env_open().write.assert_has_calls(expected_writes, any_order=True) env_open().write.assert_has_calls(expected_writes, any_order=True)
@ -234,7 +236,7 @@ def test_module_update(instance):
app.update('someapp') app.update('someapp')
instance.assert_called_once_with('someapp') instance.assert_called_once_with('someapp')
instance.return_value.install.assert_called_once_with(update=True) instance.return_value.update.assert_called_once_with()
@patch('spoc.app.App') @patch('spoc.app.App')
def test_module_uninstall(instance): def test_module_uninstall(instance):

View File

@ -7,14 +7,14 @@ from spoc import config
def test_get_apps(file_open): def test_get_apps(file_open):
apps = autostart.get_apps() apps = autostart.get_apps()
file_open.assert_called_once_with(config.AUTOSTART_FILE) file_open.assert_called_once_with(config.AUTOSTART_FILE, encoding='utf-8')
assert apps == {'someapp', 'anotherapp'} assert apps == {'someapp', 'anotherapp'}
@patch('builtins.open', side_effect=FileNotFoundError('someapp.env')) @patch('builtins.open', side_effect=FileNotFoundError('someapp.env'))
def test_get_apps_filenotfounderror(file_open): def test_get_apps_filenotfounderror(file_open):
apps = autostart.get_apps() apps = autostart.get_apps()
file_open.assert_called_once_with(config.AUTOSTART_FILE) file_open.assert_called_once_with(config.AUTOSTART_FILE, encoding='utf-8')
assert apps == set() assert apps == set()
@patch('os.makedirs') @patch('os.makedirs')
@ -25,7 +25,7 @@ def test_set_app_enable(file_open, get_apps, makedirs):
get_apps.assert_called_once() get_apps.assert_called_once()
makedirs.assert_called_once_with(config.DATA_DIR, exist_ok=True) makedirs.assert_called_once_with(config.DATA_DIR, exist_ok=True)
file_open.assert_called_once_with(config.AUTOSTART_FILE, 'w') file_open.assert_called_once_with(config.AUTOSTART_FILE, 'w', encoding='utf-8')
file_open().write.assert_has_calls([ file_open().write.assert_has_calls([
call('someapp\n'), call('someapp\n'),
call('anotherapp\n'), call('anotherapp\n'),
@ -39,7 +39,7 @@ def test_set_app_disable(file_open, get_apps, makedirs):
get_apps.assert_called_once() get_apps.assert_called_once()
makedirs.assert_called_once_with(config.DATA_DIR, exist_ok=True) makedirs.assert_called_once_with(config.DATA_DIR, exist_ok=True)
file_open.assert_called_once_with(config.AUTOSTART_FILE, 'w') file_open.assert_called_once_with(config.AUTOSTART_FILE, 'w', encoding='utf-8')
file_open().write.assert_has_calls([ file_open().write.assert_has_calls([
call('someapp\n'), call('someapp\n'),
]) ])

View File

@ -1,13 +1,13 @@
from argparse import Namespace
from unittest.mock import patch
import pytest import pytest
import requests import requests
from argparse import Namespace
from unittest.mock import patch
import spoc import spoc
import spoc_cli import spoc_cli
class MockResponse: class MockResponse: # pylint: disable=too-few-public-methods
def __init__(self, status_code, reason): def __init__(self, status_code, reason):
self.status_code = status_code self.status_code = status_code
self.reason = reason self.reason = reason
@ -304,8 +304,8 @@ def test_main_prune(prune):
@patch('spoc_cli.stop_all') @patch('spoc_cli.stop_all')
@patch('spoc_cli.login') @patch('spoc_cli.login')
@patch('spoc_cli.prune') @patch('spoc_cli.prune')
def test_main_invalid(prune, login, stop_all, start_autostarted, status, stop, start, def test_main_invalid(prune, login, stop_all, start_autostarted, # pylint: disable=too-many-arguments
uninstall, update, install, listing, parse_args): status, stop, start, uninstall, update, install, listing, parse_args):
spoc_cli.main() spoc_cli.main()
parse_args.assert_called_once() parse_args.assert_called_once()

View File

@ -7,7 +7,7 @@ from spoc import config
def test_read_auth(auth_open): def test_read_auth(auth_open):
config.read_auth() config.read_auth()
auth_open.assert_called_once_with(config.REGISTRY_AUTH_FILE) auth_open.assert_called_once_with(config.REGISTRY_AUTH_FILE, encoding='utf-8')
assert config.REGISTRY_HOST == 'example.com' assert config.REGISTRY_HOST == 'example.com'
assert config.REGISTRY_AUTH == ('someuser', 'somepassword') assert config.REGISTRY_AUTH == ('someuser', 'somepassword')
assert config.REPO_FILE_URL == 'https://example.com/repository.json' assert config.REPO_FILE_URL == 'https://example.com/repository.json'
@ -16,7 +16,7 @@ def test_read_auth(auth_open):
def test_read_auth_fallback(auth_open): def test_read_auth_fallback(auth_open):
config.read_auth() config.read_auth()
auth_open.assert_called_once_with(config.REGISTRY_AUTH_FILE) auth_open.assert_called_once_with(config.REGISTRY_AUTH_FILE, encoding='utf-8')
assert config.REGISTRY_HOST == 'localhost' assert config.REGISTRY_HOST == 'localhost'
assert config.REGISTRY_AUTH is None assert config.REGISTRY_AUTH is None
assert config.REPO_FILE_URL == 'https://localhost/repository.json' assert config.REPO_FILE_URL == 'https://localhost/repository.json'
@ -25,7 +25,7 @@ def test_read_auth_fallback(auth_open):
def test_write_auth(auth_open): def test_write_auth(auth_open):
config.write_auth('example.org', 'user', 'anotherpwd') config.write_auth('example.org', 'user', 'anotherpwd')
auth_open.assert_called_once_with(config.REGISTRY_AUTH_FILE, 'w') auth_open.assert_called_once_with(config.REGISTRY_AUTH_FILE, 'w', encoding='utf-8')
expected_data = '{"auths": {"example.org": {"auth": "dXNlcjphbm90aGVycHdk"}}}' expected_data = '{"auths": {"example.org": {"auth": "dXNlcjphbm90aGVycHdk"}}}'
auth_open().write.assert_called_once_with(expected_data) auth_open().write.assert_called_once_with(expected_data)

View File

@ -27,7 +27,7 @@ def test_missingdependencyerror():
def test_depsolver(): def test_depsolver():
solver = depsolver.DepSolver() solver = depsolver.DepSolver()
assert solver.unresolved == {} assert not solver.unresolved
solver.add('dep1', ['dep2', 'dep3']) solver.add('dep1', ['dep2', 'dep3'])
solver.add('dep2', ['dep3', 'dep3']) solver.add('dep2', ['dep3', 'dep3'])

View File

@ -1,8 +1,9 @@
import errno import errno
import fcntl import fcntl
import pytest
from unittest.mock import call, patch, mock_open from unittest.mock import call, patch, mock_open
import pytest
from spoc import config from spoc import config
from spoc import flock from spoc import flock
@ -10,11 +11,11 @@ from spoc import flock
def mock_func(): def mock_func():
pass pass
@patch('builtins.open', new_callable=mock_open, read_data='foo\0arg1\0arg2\n') @patch('builtins.open', new_callable=mock_open, read_data='foo\0arg1\0arg2\n'.encode())
def test_print_lock(cmdline_open, capsys): def test_print_lock(cmdline_open, capsys):
flock.print_lock('123') flock.print_lock('123')
cmdline_open.assert_called_once_with('/proc/123/cmdline') cmdline_open.assert_called_once_with('/proc/123/cmdline', 'rb')
captured = capsys.readouterr() captured = capsys.readouterr()
assert captured.err == 'Waiting for lock currently held by process 123 - foo arg1 arg2\n' assert captured.err == 'Waiting for lock currently held by process 123 - foo arg1 arg2\n'
@ -27,10 +28,10 @@ def test_locked_success(lock_open, getpid, sleep, fcntl_flock, print_lock):
mock_func() mock_func()
lock_open.assert_has_calls([ lock_open.assert_has_calls([
call(config.LOCK_FILE, 'a'), call(config.LOCK_FILE, 'a', encoding='utf-8'),
call().__enter__(), call().__enter__(),
call().__exit__(None, None, None), call().__exit__(None, None, None),
call(config.LOCK_FILE, 'r+'), call(config.LOCK_FILE, 'r+', encoding='utf-8'),
call().__enter__(), call().__enter__(),
call().truncate(), call().truncate(),
call().write('1234'), call().write('1234'),
@ -58,10 +59,10 @@ def test_locked_fail(lock_open, getpid, sleep, fcntl_flock, print_lock):
mock_func() mock_func()
lock_open.assert_has_calls([ lock_open.assert_has_calls([
call(config.LOCK_FILE, 'a'), call(config.LOCK_FILE, 'a', encoding='utf-8'),
call().__enter__(), call().__enter__(),
call().__exit__(None, None, None), call().__exit__(None, None, None),
call(config.LOCK_FILE, 'r+'), call(config.LOCK_FILE, 'r+', encoding='utf-8'),
call().__enter__(), call().__enter__(),
call().read(), call().read(),
call().seek(0), call().seek(0),
@ -95,10 +96,10 @@ def test_locked_error(lock_open, getpid, sleep, fcntl_flock, print_lock):
# hence checking just the method and comparing the args with themselves # hence checking just the method and comparing the args with themselves
last_exit_call_args = lock_open().__exit__.call_args_list[-1][0] last_exit_call_args = lock_open().__exit__.call_args_list[-1][0]
lock_open.assert_has_calls([ lock_open.assert_has_calls([
call(config.LOCK_FILE, 'a'), call(config.LOCK_FILE, 'a', encoding='utf-8'),
call().__enter__(), call().__enter__(),
call().__exit__(None, None, None), call().__exit__(None, None, None),
call(config.LOCK_FILE, 'r+'), call(config.LOCK_FILE, 'r+', encoding='utf-8'),
call().__enter__(), call().__enter__(),
call().__exit__(*last_exit_call_args), call().__exit__(*last_exit_call_args),
]) ])

View File

@ -24,7 +24,7 @@ def test_out(run):
@patch('spoc.podman.out') @patch('spoc.podman.out')
def test_get_apps(out): def test_get_apps(out):
with open(os.path.join(TEST_DATA_DIR, 'podman_pod_ps.json')) as f: with open(os.path.join(TEST_DATA_DIR, 'podman_pod_ps.json'), encoding='utf-8') as f:
out.return_value = f.read() out.return_value = f.read()
pods = podman.get_apps() pods = podman.get_apps()
@ -35,14 +35,14 @@ def test_get_apps(out):
@patch('spoc.podman.out') @patch('spoc.podman.out')
def test_get_volumes_for_app(out): def test_get_volumes_for_app(out):
with open(os.path.join(TEST_DATA_DIR, 'podman_volume_ls.json')) as f: with open(os.path.join(TEST_DATA_DIR, 'podman_volume_ls.json'), encoding='utf-8') as f:
out.return_value = f.read() out.return_value = f.read()
volumes = podman.get_volumes_for_app('someapp') volumes = podman.get_volumes_for_app('someapp')
expected_cmd = ['podman', 'volume', 'ls', '--filter', 'label=spoc.app=someapp', expected_cmd = ['podman', 'volume', 'ls', '--filter', 'label=spoc.app=someapp',
'--format', 'json'] '--format', 'json']
out.asert_called_once_with(expected_cmd) out.assert_called_once_with(expected_cmd)
assert volumes == {'someapp-conf', 'someapp-data'} assert volumes == {'someapp-conf', 'someapp-data'}
@patch('spoc.podman.run') @patch('spoc.podman.run')
@ -117,6 +117,7 @@ def test_create_container(run):
hosts={'cnt2', 'cnt3', 'cnt'}) hosts={'cnt2', 'cnt3', 'cnt'})
expected_cmd = ['podman', 'container', 'create', '--name', 'someapp-cnt', '--pod', 'someapp', expected_cmd = ['podman', 'container', 'create', '--name', 'someapp-cnt', '--pod', 'someapp',
'--subgidname', 'spoc', '--subgidname', 'spoc',
'--restart', 'unless-stopped', '--env-file', '/var/lib/spoc/someapp.env', '--restart', 'unless-stopped', '--env-file', '/var/lib/spoc/someapp.env',
'--requires', 'someapp-cnt2,someapp-cnt3', '--volume', 'someapp-mnt:/mnt', '--requires', 'someapp-cnt2,someapp-cnt3', '--volume', 'someapp-mnt:/mnt',
'--volume', 'someapp-srv:/srv', '--add-host', 'cnt:127.0.0.1', '--volume', 'someapp-srv:/srv', '--add-host', 'cnt:127.0.0.1',
@ -129,6 +130,7 @@ def test_create_container_minimal(run):
podman.create_container('someapp', 'someapp-cnt', 'example.com/someapp:0.23.6-210515') podman.create_container('someapp', 'someapp-cnt', 'example.com/someapp:0.23.6-210515')
expected_cmd = ['podman', 'container', 'create', '--name', 'someapp-cnt', '--pod', 'someapp', expected_cmd = ['podman', 'container', 'create', '--name', 'someapp-cnt', '--pod', 'someapp',
'--subgidname', 'spoc', '--subgidname', 'spoc',
'--restart', 'unless-stopped', 'example.com/someapp:0.23.6-210515'] '--restart', 'unless-stopped', 'example.com/someapp:0.23.6-210515']
run.assert_called_once_with(expected_cmd) run.assert_called_once_with(expected_cmd)
@ -137,6 +139,5 @@ def test_prune(run):
podman.prune() podman.prune()
run.assert_has_calls([ run.assert_has_calls([
call(['podman', 'image', 'prune', '--all', '--force']), call(['podman', 'image', 'prune', '--all', '--force', '--volumes']),
call(['podman', 'volume', 'prune', '--force']),
]) ])

View File

@ -1,29 +1,30 @@
import pytest
from unittest.mock import patch, call from unittest.mock import patch, call
import pytest
from spoc import config from spoc import config
from spoc import repo from spoc import repo
@patch('spoc.repo._data', {}) @patch('spoc.repo._DATA', {})
@patch('requests.get') @patch('requests.get')
def test_load(req_get): def test_load(req_get):
repo.load() repo.load()
req_get.assert_called_once_with(config.REPO_FILE_URL, auth=config.REGISTRY_AUTH, timeout=5) req_get.assert_called_once_with(config.REPO_FILE_URL, auth=config.REGISTRY_AUTH, timeout=5)
req_get.return_value.raise_for_status.asert_called_once() req_get.return_value.raise_for_status.assert_called_once()
req_get.return_value.json.asert_called_once() req_get.return_value.json.assert_called_once()
@patch('spoc.repo._data', {}) @patch('spoc.repo._DATA', {})
@patch('requests.get') @patch('requests.get')
def test_load_twice_no_force(req_get): def test_load_twice_no_force(req_get):
repo.load() repo.load()
repo.load() repo.load()
req_get.assert_called_once_with(config.REPO_FILE_URL, auth=config.REGISTRY_AUTH, timeout=5) req_get.assert_called_once_with(config.REPO_FILE_URL, auth=config.REGISTRY_AUTH, timeout=5)
req_get.return_value.raise_for_status.asert_called_once() req_get.return_value.raise_for_status.assert_called_once()
req_get.return_value.json.asert_called_once() req_get.return_value.json.assert_called_once()
@patch('spoc.repo._data', {}) @patch('spoc.repo._DATA', {})
@patch('requests.get') @patch('requests.get')
def test_load_twice_force(req_get): def test_load_twice_force(req_get):
repo.load() repo.load()
@ -34,16 +35,16 @@ def test_load_twice_force(req_get):
assert req_get.return_value.raise_for_status.call_count == 2 assert req_get.return_value.raise_for_status.call_count == 2
assert req_get.return_value.json.call_count == 2 assert req_get.return_value.json.call_count == 2
@patch('spoc.repo._data', {'someapp': {'version': '0.1'}}) @patch('spoc.repo._DATA', {'someapp': {'version': '0.1'}})
@patch('requests.get', side_effect=IOError()) @patch('requests.get', side_effect=IOError())
def test_load_empty_on_fail(req_get): def test_load_empty_on_fail(req_get):
with pytest.raises(IOError): with pytest.raises(IOError):
repo.load(force=True) repo.load(force=True)
req_get.assert_called_once_with(config.REPO_FILE_URL, auth=config.REGISTRY_AUTH, timeout=5) req_get.assert_called_once_with(config.REPO_FILE_URL, auth=config.REGISTRY_AUTH, timeout=5)
assert repo._data == {} # pylint: disable=protected-access assert repo._DATA == {} # pylint: disable=protected-access
@patch('spoc.repo._data', {'someapp': {'version': '0.1'}}) @patch('spoc.repo._DATA', {'someapp': {'version': '0.1'}})
@patch('spoc.repo.load') @patch('spoc.repo.load')
def test_get_apps(repo_load): def test_get_apps(repo_load):
apps = repo.get_apps() apps = repo.get_apps()

View File

@ -1,6 +1,7 @@
import pytest
from unittest.mock import call, patch from unittest.mock import call, patch
import pytest
import spoc import spoc
def test_apperror(): def test_apperror():