spoc/tests/test_flock.py

111 lines
3.8 KiB
Python

import errno
import fcntl
from unittest.mock import call, patch, mock_open
import pytest
from spoc import config
from spoc import flock
@flock.locked()
def mock_func():
pass
@patch('builtins.open', new_callable=mock_open, read_data='foo\0arg1\0arg2\n'.encode())
def test_print_lock(cmdline_open, capsys):
flock.print_lock('123')
cmdline_open.assert_called_once_with('/proc/123/cmdline', 'rb')
captured = capsys.readouterr()
assert captured.err == 'Waiting for lock currently held by process 123 - foo arg1 arg2\n'
@patch('spoc.flock.print_lock')
@patch('fcntl.flock')
@patch('time.sleep')
@patch('os.getpid', return_value=1234)
@patch('builtins.open', new_callable=mock_open)
def test_locked_success(lock_open, getpid, sleep, fcntl_flock, print_lock):
mock_func()
lock_open.assert_has_calls([
call(config.LOCK_FILE, 'a', encoding='utf-8'),
call().__enter__(),
call().__exit__(None, None, None),
call(config.LOCK_FILE, 'r+', encoding='utf-8'),
call().__enter__(),
call().truncate(),
call().write('1234'),
call().flush(),
call().__exit__(None, None, None),
])
fcntl_flock.assert_called_once_with(lock_open(), fcntl.LOCK_EX | fcntl.LOCK_NB)
sleep.assert_not_called()
getpid.assert_called_once()
print_lock.assert_not_called()
@patch('spoc.flock.print_lock')
@patch('fcntl.flock')
@patch('time.sleep')
@patch('os.getpid', return_value=5678)
@patch('builtins.open', new_callable=mock_open, read_data='1234')
def test_locked_fail(lock_open, getpid, sleep, fcntl_flock, print_lock):
fcntl_flock.side_effect = [
OSError(errno.EAGAIN, 'in use'),
OSError(errno.EAGAIN, 'in use'),
None,
]
mock_func()
lock_open.assert_has_calls([
call(config.LOCK_FILE, 'a', encoding='utf-8'),
call().__enter__(),
call().__exit__(None, None, None),
call(config.LOCK_FILE, 'r+', encoding='utf-8'),
call().__enter__(),
call().read(),
call().seek(0),
call().truncate(),
call().write('5678'),
call().flush(),
call().__exit__(None, None, None),
])
expected_fcntl_flock_call = call(lock_open(), fcntl.LOCK_EX | fcntl.LOCK_NB)
assert fcntl_flock.call_args_list.count(expected_fcntl_flock_call) == 3
expected_sleep_call = call(0.1)
assert sleep.call_args_list.count(expected_sleep_call) == 2
getpid.assert_called_once()
print_lock.assert_called_once_with('1234')
@patch('spoc.flock.print_lock')
@patch('fcntl.flock', side_effect=OSError(errno.EBADF, 'nope'))
@patch('time.sleep')
@patch('os.getpid', return_value=5678)
@patch('builtins.open', new_callable=mock_open, read_data='1234')
def test_locked_error(lock_open, getpid, sleep, fcntl_flock, print_lock):
with pytest.raises(OSError):
mock_func()
# Last call is
# call().__exit__(<class 'OSError'>, OSError(9, 'nope'), <traceback object at 0xaddress>)
# The exception can be passed by the context manager above and checked as follows
# call().__exit__(ex.type, ex.value, ex.tb.tb_next.tb_next.tb_next)
# but it may by CPython specific, and frankly, that tb_next chain looks horrible.
# hence checking just the method and comparing the args with themselves
last_exit_call_args = lock_open().__exit__.call_args_list[-1][0]
lock_open.assert_has_calls([
call(config.LOCK_FILE, 'a', encoding='utf-8'),
call().__enter__(),
call().__exit__(None, None, None),
call(config.LOCK_FILE, 'r+', encoding='utf-8'),
call().__enter__(),
call().__exit__(*last_exit_call_args),
])
fcntl_flock.assert_called_once_with(lock_open(), fcntl.LOCK_EX | fcntl.LOCK_NB)
sleep.assert_not_called()
getpid.assert_not_called()
print_lock.assert_not_called()