diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index 92fdf67..ab8eaae 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -31,7 +31,7 @@ jobs: - name: Install pip dependencies run: | python -m pip install --upgrade pip wheel setuptools - pip install -e .[server] + pip install -e .[server,tests] pip install pytest - name: Run tests diff --git a/filetracker/servers/storage.py b/filetracker/servers/storage.py index 8ef5e7c..7614413 100644 --- a/filetracker/servers/storage.py +++ b/filetracker/servers/storage.py @@ -47,6 +47,7 @@ _LOCK_RETRIES = 20 _LOCK_SLEEP_TIME_S = 1 +_DB_DEADLOCK_RETRIES = 50 logger = logging.getLogger(__name__) @@ -81,6 +82,8 @@ def __init__(self, base_dir): # https://docs.oracle.com/cd/E17076_05/html/programmer_reference/transapp_env_open.html self.db_env = bsddb3.db.DBEnv() + # Enable running the deadlock detector when lock conflicts are automatically detected. + self.db_env.set_lk_detect(bsddb3.db.DB_LOCK_DEFAULT) try: self.db_env.open( self.db_dir, @@ -182,9 +185,8 @@ def store( logger.debug('Acquired lock for blob %s.', digest) digest_bytes = digest.encode() - with self._db_transaction() as txn: - logger.debug('Started DB transaction (adding link).') - link_count = int(self.db.get(digest_bytes, 0, txn=txn)) + def transaction_contents(txn): + link_count = int(self.db.get(digest_bytes, 0, txn=txn, flags=bsddb3.db.DB_RMW)) new_count = str(link_count + 1).encode() self.db.put(digest_bytes, new_count, txn=txn) @@ -194,9 +196,11 @@ def store( str(logical_size).encode(), txn=txn, ) - logger.debug('Commiting DB transaction (adding link).') + return link_count - logger.debug('Committed DB transaction (adding link).') + link_count = self._call_in_transaction_with_retries( + transaction_contents, "adding link" + ) # Create a new blob if this isn't a duplicate. if link_count == 0: @@ -268,12 +272,11 @@ def delete(self, name, version, _lock=True): with _exclusive_lock(self._lock_path('blobs', digest)): logger.debug('Acquired lock for blob %s.', digest) - should_delete_blob = False - with self._db_transaction() as txn: - logger.debug('Started DB transaction (deleting link).') + def transaction_contents(txn): + should_delete_blob = False digest_bytes = digest.encode() - link_count = self.db.get(digest_bytes, txn=txn) + link_count = self.db.get(digest_bytes, txn=txn, flags=bsddb3.db.DB_RMW) if link_count is None: raise RuntimeError("File exists but has no key in db") @@ -288,9 +291,11 @@ def delete(self, name, version, _lock=True): else: new_count = str(link_count - 1).encode() self.db.put(digest_bytes, new_count, txn=txn) - logger.debug('Committing DB transaction (deleting link).') + return should_delete_blob - logger.debug('Committed DB transaction (deleting link).') + should_delete_blob = self._call_in_transaction_with_retries( + transaction_contents, "deleting link" + ) os.unlink(link_path) logger.debug('Deleted link %s.', name) @@ -339,6 +344,30 @@ def _db_transaction(self): else: txn.commit() + def _call_in_transaction_with_retries(self, func, description): + retries = 0 + result = None + while True: + try: + with self._db_transaction() as txn: + logger.debug('Started DB transaction ({}).'.format(description)) + result = func(txn) + logger.debug('Commiting DB transaction ({}).'.format(description)) + except bsddb3.db.DBLockDeadlockError: + retries += 1 + if retries > _DB_DEADLOCK_RETRIES: + logger.error('BSDDB deadlock detected, retry limit exceeded ({}).'.format(description)) + raise + logger.warning( + 'BSDDB deadlock detected in transaction ({}), retry no {}.'.format( + description, retries, + ) + ) + continue + break + logger.debug('Committed DB transaction ({}).'.format(description)) + return result + def _digest_for_link(self, name): link = self._link_path(name) blob_path = os.readlink(link) diff --git a/filetracker/tests/parallel_test.py b/filetracker/tests/parallel_test.py index 7b6f274..8107659 100644 --- a/filetracker/tests/parallel_test.py +++ b/filetracker/tests/parallel_test.py @@ -4,8 +4,9 @@ from __future__ import division from __future__ import print_function -from multiprocessing import Process +from multiprocessing import Barrier, Process import os +import psutil import shutil import tempfile import time @@ -25,6 +26,17 @@ _FILE_SIZE = 6 * 1024 * 1024 _PARALLEL_CLIENTS = 5 _TEST_PORT_NUMBER = 45745 +_COPIES_TO_UPLOAD = 500 +_SUBPROCESS_TIMEOUT_S = 20 + + +def kill_process_tree(pid): + parent = psutil.Process(pid) + for p in parent.children(recursive=True) + [parent]: + try: + p.kill() + except psutil.NoSuchProcess: + pass class ParallelTest(unittest.TestCase): @@ -52,19 +64,23 @@ def setUpClass(cls): @classmethod def tearDownClass(cls): - cls.server_process.terminate() + kill_process_tree(cls.server_process.pid) + cls.server_process.join() shutil.rmtree(cls.server_dir) shutil.rmtree(cls.temp_dir) def setUp(self): # Shortcuts for convenience - self.server_dir = ParallelTest.server_dir - self.temp_dir = ParallelTest.temp_dir - self.clients = ParallelTest.clients + cls = self.__class__ + self.server_dir = cls.server_dir + self.temp_dir = cls.temp_dir + self.clients = cls.clients # For non-parallel requests - self.client = ParallelTest.clients[0] + self.client = cls.clients[0] + +class ParallelTestSameFile(ParallelTest): def test_only_last_parallel_upload_of_same_file_should_succeed(self): processes = [] @@ -98,7 +114,43 @@ def test_only_last_parallel_upload_of_same_file_should_succeed(self): self.assertEqual(f.read(), lf.read()) +class ParallelTestDeadlocks(ParallelTest): + def test_bsddb_deadlocks(self): + processes = [] + + # Initialize different files for every client. + for i in range(len(self.clients)): + temp_file = os.path.join(self.temp_dir, 'foo{}.txt'.format(i)) + text = str(i).encode() + with open(temp_file, 'wb') as tf: + tf.write(text) + + # The deadlocks are visible even without this barrier. + barrier = Barrier(len(self.clients)) + + def job(id, barrier): + temp_file = os.path.join(self.temp_dir, 'foo{}.txt'.format(id)) + for i in range(0, _COPIES_TO_UPLOAD): + ft_name = '/foo{}.{}.txt'.format(id, i) + barrier.wait() + client.put_file(ft_name, temp_file, compress_hint=False) + + for i, client in enumerate(self.clients): + process = Process(target=lambda: job(i, barrier)) + process.start() + processes.append(process) + + for process in processes: + process.join(timeout=_SUBPROCESS_TIMEOUT_S) + self.assertFalse(process.is_alive()) + self.assertEqual(process.exitcode, 0) + process.join() + + # Put one final file to check for e.g. corruption. + client.put_file('/foo_last', temp_file, compress_hint=False) + + def _start_server(server_dir): server_main( - ['-p', str(_TEST_PORT_NUMBER), '-d', server_dir, '-D', '--workers', '6'] + ['-p', str(_TEST_PORT_NUMBER), '-d', server_dir, '-D', '--workers', '3'] ) diff --git a/pyproject.toml b/pyproject.toml index 2ad6e49..3571b79 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,6 +40,7 @@ server = [ ] tests = [ "pytest", + "psutil", ] [project.scripts] @@ -47,4 +48,4 @@ filetracker = "filetracker.client.shell:main" filetracker-server = "filetracker.servers.run:main" filetracker-cache-cleaner = "filetracker.scripts.cachecleaner:main" filetracker-migrate = "filetracker.scripts.migrate:main" -filetracker-recover = "filetracker.scripts.recover:main" \ No newline at end of file +filetracker-recover = "filetracker.scripts.recover:main" diff --git a/tox.ini b/tox.ini index b51cf0a..fc07c09 100644 --- a/tox.ini +++ b/tox.ini @@ -3,6 +3,7 @@ envlist = py313 [testenv] extras = server + tests deps = pytest pytest-cov setenv =