import errno import fcntl from unittest.mock import call, patch, mock_open import pytest from spoc import config from spoc import flock @flock.locked() def mock_func(): pass @patch('builtins.open', new_callable=mock_open, read_data='foo\0arg1\0arg2\n'.encode()) def test_print_lock(cmdline_open, capsys): flock.print_lock('123') cmdline_open.assert_called_once_with('/proc/123/cmdline', 'rb') captured = capsys.readouterr() assert captured.err == 'Waiting for lock currently held by process 123 - foo arg1 arg2\n' @patch('spoc.flock.print_lock') @patch('fcntl.flock') @patch('time.sleep') @patch('os.getpid', return_value=1234) @patch('builtins.open', new_callable=mock_open) def test_locked_success(lock_open, getpid, sleep, fcntl_flock, print_lock): mock_func() lock_open.assert_has_calls([ call(config.LOCK_FILE, 'a', encoding='utf-8'), call().__enter__(), call().__exit__(None, None, None), call(config.LOCK_FILE, 'r+', encoding='utf-8'), call().__enter__(), call().truncate(), call().write('1234'), call().flush(), call().__exit__(None, None, None), ]) fcntl_flock.assert_called_once_with(lock_open(), fcntl.LOCK_EX | fcntl.LOCK_NB) sleep.assert_not_called() getpid.assert_called_once() print_lock.assert_not_called() @patch('spoc.flock.print_lock') @patch('fcntl.flock') @patch('time.sleep') @patch('os.getpid', return_value=5678) @patch('builtins.open', new_callable=mock_open, read_data='1234') def test_locked_fail(lock_open, getpid, sleep, fcntl_flock, print_lock): fcntl_flock.side_effect = [ OSError(errno.EAGAIN, 'in use'), OSError(errno.EAGAIN, 'in use'), None, ] mock_func() lock_open.assert_has_calls([ call(config.LOCK_FILE, 'a', encoding='utf-8'), call().__enter__(), call().__exit__(None, None, None), call(config.LOCK_FILE, 'r+', encoding='utf-8'), call().__enter__(), call().read(), call().seek(0), call().truncate(), call().write('5678'), call().flush(), call().__exit__(None, None, None), ]) expected_fcntl_flock_call = call(lock_open(), fcntl.LOCK_EX | fcntl.LOCK_NB) assert fcntl_flock.call_args_list.count(expected_fcntl_flock_call) == 3 expected_sleep_call = call(0.1) assert sleep.call_args_list.count(expected_sleep_call) == 2 getpid.assert_called_once() print_lock.assert_called_once_with('1234') @patch('spoc.flock.print_lock') @patch('fcntl.flock', side_effect=OSError(errno.EBADF, 'nope')) @patch('time.sleep') @patch('os.getpid', return_value=5678) @patch('builtins.open', new_callable=mock_open, read_data='1234') def test_locked_error(lock_open, getpid, sleep, fcntl_flock, print_lock): with pytest.raises(OSError): mock_func() # Last call is # call().__exit__(, OSError(9, 'nope'), ) # The exception can be passed by the context manager above and checked as follows # call().__exit__(ex.type, ex.value, ex.tb.tb_next.tb_next.tb_next) # but it may by CPython specific, and frankly, that tb_next chain looks horrible. # hence checking just the method and comparing the args with themselves last_exit_call_args = lock_open().__exit__.call_args_list[-1][0] lock_open.assert_has_calls([ call(config.LOCK_FILE, 'a', encoding='utf-8'), call().__enter__(), call().__exit__(None, None, None), call(config.LOCK_FILE, 'r+', encoding='utf-8'), call().__enter__(), call().__exit__(*last_exit_call_args), ]) fcntl_flock.assert_called_once_with(lock_open(), fcntl.LOCK_EX | fcntl.LOCK_NB) sleep.assert_not_called() getpid.assert_not_called() print_lock.assert_not_called()