101 lines
3.2 KiB
Python
101 lines
3.2 KiB
Python
|
import errno
|
||
|
import fcntl
|
||
|
import pytest
|
||
|
from unittest.mock import patch, call, mock_open
|
||
|
|
||
|
from spoc import flock
|
||
|
|
||
|
def fail_callback(pid):
|
||
|
print(f'Lock held by {pid}')
|
||
|
|
||
|
@flock.locked('test.lock', fail_callback=fail_callback)
|
||
|
def mock_func():
|
||
|
pass
|
||
|
|
||
|
@patch('fcntl.flock')
|
||
|
@patch('time.sleep')
|
||
|
@patch('os.getpid', return_value=1234)
|
||
|
@patch('builtins.open', new_callable=mock_open)
|
||
|
def test_lock_success(lock_open, getpid, sleep, fcntl_flock):
|
||
|
mock_func()
|
||
|
|
||
|
lock_open.assert_has_calls([
|
||
|
call('test.lock', 'a'),
|
||
|
call().__enter__(),
|
||
|
call().__exit__(None, None, None),
|
||
|
call('test.lock', 'r+'),
|
||
|
call().__enter__(),
|
||
|
call().truncate(),
|
||
|
call().write('1234'),
|
||
|
call().flush(),
|
||
|
call().__exit__(None, None, None),
|
||
|
])
|
||
|
|
||
|
fcntl_flock.assert_called_once_with(lock_open(), fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||
|
sleep.assert_not_called()
|
||
|
getpid.assert_called_once()
|
||
|
|
||
|
@patch('fcntl.flock')
|
||
|
@patch('time.sleep')
|
||
|
@patch('os.getpid', return_value=5678)
|
||
|
@patch('builtins.open', new_callable=mock_open, read_data='1234')
|
||
|
def test_lock_fail(lock_open, getpid, sleep, fcntl_flock, capsys):
|
||
|
fcntl_flock.side_effect = [
|
||
|
OSError(errno.EAGAIN, 'in use'),
|
||
|
OSError(errno.EAGAIN, 'in use'),
|
||
|
None,
|
||
|
]
|
||
|
|
||
|
mock_func()
|
||
|
|
||
|
lock_open.assert_has_calls([
|
||
|
call('test.lock', 'a'),
|
||
|
call().__enter__(),
|
||
|
call().__exit__(None, None, None),
|
||
|
call('test.lock', 'r+'),
|
||
|
call().__enter__(),
|
||
|
call().read(),
|
||
|
call().seek(0),
|
||
|
call().truncate(),
|
||
|
call().write('5678'),
|
||
|
call().flush(),
|
||
|
call().__exit__(None, None, None),
|
||
|
])
|
||
|
|
||
|
expected_fcntl_flock_call = call(lock_open(), fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||
|
assert fcntl_flock.call_args_list.count(expected_fcntl_flock_call) == 3
|
||
|
expected_sleep_call = call(0.1)
|
||
|
assert sleep.call_args_list.count(expected_sleep_call) == 2
|
||
|
getpid.assert_called_once()
|
||
|
|
||
|
captured = capsys.readouterr()
|
||
|
assert captured.out == 'Lock held by 1234\n'
|
||
|
|
||
|
@patch('fcntl.flock', side_effect=OSError(errno.EBADF, 'nope'))
|
||
|
@patch('time.sleep')
|
||
|
@patch('os.getpid', return_value=5678)
|
||
|
@patch('builtins.open', new_callable=mock_open, read_data='1234')
|
||
|
def test_lock_error(lock_open, getpid, sleep, fcntl_flock):
|
||
|
with pytest.raises(OSError):
|
||
|
mock_func()
|
||
|
|
||
|
# Last call is
|
||
|
# call().__exit__(<class 'OSError'>, OSError(9, 'nope'), <traceback object at 0xaddress>)
|
||
|
# The exception can be captured above and checked as follows
|
||
|
# call().__exit__(ex.type, ex.value, ex.tb.tb_next.tb_next.tb_next)
|
||
|
# but it may by CPython specific, and frankly, that tb_next chain looks horrible.
|
||
|
# hence checking just the method and comparing the args with themselves
|
||
|
last_exit_call_args = lock_open().__exit__.call_args_list[-1][0]
|
||
|
lock_open.assert_has_calls([
|
||
|
call('test.lock', 'a'),
|
||
|
call().__enter__(),
|
||
|
call().__exit__(None, None, None),
|
||
|
call('test.lock', 'r+'),
|
||
|
call().__enter__(),
|
||
|
call().__exit__(*last_exit_call_args),
|
||
|
])
|
||
|
|
||
|
fcntl_flock.assert_called_once_with(lock_open(), fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||
|
sleep.assert_not_called()
|
||
|
getpid.assert_not_called()
|