From e49bbb90bcc5de30a013d63658a5d602b435ce71 Mon Sep 17 00:00:00 2001 From: Olaf Targowski Date: Fri, 27 Feb 2026 11:33:17 +0100 Subject: [PATCH 1/3] Add a test to showcase the bsddb deadlocks --- .github/workflows/pytest.yml | 2 +- filetracker/tests/parallel_test.py | 66 ++++++++++++++++++++++++++---- pyproject.toml | 3 +- tox.ini | 1 + 4 files changed, 63 insertions(+), 9 deletions(-) diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index 92fdf67..ab8eaae 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -31,7 +31,7 @@ jobs: - name: Install pip dependencies run: | python -m pip install --upgrade pip wheel setuptools - pip install -e .[server] + pip install -e .[server,tests] pip install pytest - name: Run tests diff --git a/filetracker/tests/parallel_test.py b/filetracker/tests/parallel_test.py index 7b6f274..8107659 100644 --- a/filetracker/tests/parallel_test.py +++ b/filetracker/tests/parallel_test.py @@ -4,8 +4,9 @@ from __future__ import division from __future__ import print_function -from multiprocessing import Process +from multiprocessing import Barrier, Process import os +import psutil import shutil import tempfile import time @@ -25,6 +26,17 @@ _FILE_SIZE = 6 * 1024 * 1024 _PARALLEL_CLIENTS = 5 _TEST_PORT_NUMBER = 45745 +_COPIES_TO_UPLOAD = 500 +_SUBPROCESS_TIMEOUT_S = 20 + + +def kill_process_tree(pid): + parent = psutil.Process(pid) + for p in parent.children(recursive=True) + [parent]: + try: + p.kill() + except psutil.NoSuchProcess: + pass class ParallelTest(unittest.TestCase): @@ -52,19 +64,23 @@ def setUpClass(cls): @classmethod def tearDownClass(cls): - cls.server_process.terminate() + kill_process_tree(cls.server_process.pid) + cls.server_process.join() shutil.rmtree(cls.server_dir) shutil.rmtree(cls.temp_dir) def setUp(self): # Shortcuts for convenience - self.server_dir = ParallelTest.server_dir - self.temp_dir = ParallelTest.temp_dir - self.clients = ParallelTest.clients + cls = self.__class__ + self.server_dir = cls.server_dir + self.temp_dir = cls.temp_dir + self.clients = cls.clients # For non-parallel requests - self.client = ParallelTest.clients[0] + self.client = cls.clients[0] + +class ParallelTestSameFile(ParallelTest): def test_only_last_parallel_upload_of_same_file_should_succeed(self): processes = [] @@ -98,7 +114,43 @@ def test_only_last_parallel_upload_of_same_file_should_succeed(self): self.assertEqual(f.read(), lf.read()) +class ParallelTestDeadlocks(ParallelTest): + def test_bsddb_deadlocks(self): + processes = [] + + # Initialize different files for every client. + for i in range(len(self.clients)): + temp_file = os.path.join(self.temp_dir, 'foo{}.txt'.format(i)) + text = str(i).encode() + with open(temp_file, 'wb') as tf: + tf.write(text) + + # The deadlocks are visible even without this barrier. + barrier = Barrier(len(self.clients)) + + def job(id, barrier): + temp_file = os.path.join(self.temp_dir, 'foo{}.txt'.format(id)) + for i in range(0, _COPIES_TO_UPLOAD): + ft_name = '/foo{}.{}.txt'.format(id, i) + barrier.wait() + client.put_file(ft_name, temp_file, compress_hint=False) + + for i, client in enumerate(self.clients): + process = Process(target=lambda: job(i, barrier)) + process.start() + processes.append(process) + + for process in processes: + process.join(timeout=_SUBPROCESS_TIMEOUT_S) + self.assertFalse(process.is_alive()) + self.assertEqual(process.exitcode, 0) + process.join() + + # Put one final file to check for e.g. corruption. + client.put_file('/foo_last', temp_file, compress_hint=False) + + def _start_server(server_dir): server_main( - ['-p', str(_TEST_PORT_NUMBER), '-d', server_dir, '-D', '--workers', '6'] + ['-p', str(_TEST_PORT_NUMBER), '-d', server_dir, '-D', '--workers', '3'] ) diff --git a/pyproject.toml b/pyproject.toml index 2ad6e49..3571b79 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,6 +40,7 @@ server = [ ] tests = [ "pytest", + "psutil", ] [project.scripts] @@ -47,4 +48,4 @@ filetracker = "filetracker.client.shell:main" filetracker-server = "filetracker.servers.run:main" filetracker-cache-cleaner = "filetracker.scripts.cachecleaner:main" filetracker-migrate = "filetracker.scripts.migrate:main" -filetracker-recover = "filetracker.scripts.recover:main" \ No newline at end of file +filetracker-recover = "filetracker.scripts.recover:main" diff --git a/tox.ini b/tox.ini index b51cf0a..fc07c09 100644 --- a/tox.ini +++ b/tox.ini @@ -3,6 +3,7 @@ envlist = py313 [testenv] extras = server + tests deps = pytest pytest-cov setenv = From ce3a15734c480267068a96c86a5a84347dcf6bab Mon Sep 17 00:00:00 2001 From: Olaf Targowski Date: Fri, 27 Feb 2026 17:06:04 +0100 Subject: [PATCH 2/3] Enable the deadlock detector for bsddb Also wrap transactions to retry after terminations from the deadlock detector. The reason for these changes is bsddb's use of page-level locks. According to the underlying library's documentation: "[deadlock detection] is necessary for almost all applications in which more than a single thread of control will be accessing the database at one time. Even when Berkeley DB automatically handles database locking, it is normally possible for deadlock to occur." Source (with more info): https://docs.oracle.com/database/bdb181/html/programmer_reference/transapp_deadlock.html https://web.archive.org/web/20260227161711/https://docs.oracle.com/database/bdb181/html/programmer_reference/transapp_deadlock.html --- filetracker/servers/storage.py | 47 +++++++++++++++++++++++++++------- 1 file changed, 38 insertions(+), 9 deletions(-) diff --git a/filetracker/servers/storage.py b/filetracker/servers/storage.py index 8ef5e7c..1d169d1 100644 --- a/filetracker/servers/storage.py +++ b/filetracker/servers/storage.py @@ -47,6 +47,7 @@ _LOCK_RETRIES = 20 _LOCK_SLEEP_TIME_S = 1 +_DB_DEADLOCK_RETRIES = 50 logger = logging.getLogger(__name__) @@ -81,6 +82,8 @@ def __init__(self, base_dir): # https://docs.oracle.com/cd/E17076_05/html/programmer_reference/transapp_env_open.html self.db_env = bsddb3.db.DBEnv() + # Enable running the deadlock detector when lock conflicts are automatically detected. + self.db_env.set_lk_detect(bsddb3.db.DB_LOCK_DEFAULT) try: self.db_env.open( self.db_dir, @@ -182,8 +185,7 @@ def store( logger.debug('Acquired lock for blob %s.', digest) digest_bytes = digest.encode() - with self._db_transaction() as txn: - logger.debug('Started DB transaction (adding link).') + def transaction_contents(txn): link_count = int(self.db.get(digest_bytes, 0, txn=txn)) new_count = str(link_count + 1).encode() self.db.put(digest_bytes, new_count, txn=txn) @@ -194,9 +196,11 @@ def store( str(logical_size).encode(), txn=txn, ) - logger.debug('Commiting DB transaction (adding link).') + return link_count - logger.debug('Committed DB transaction (adding link).') + link_count = self._call_in_transaction_with_retries( + transaction_contents, "adding link" + ) # Create a new blob if this isn't a duplicate. if link_count == 0: @@ -268,10 +272,9 @@ def delete(self, name, version, _lock=True): with _exclusive_lock(self._lock_path('blobs', digest)): logger.debug('Acquired lock for blob %s.', digest) - should_delete_blob = False - with self._db_transaction() as txn: - logger.debug('Started DB transaction (deleting link).') + def transaction_contents(txn): + should_delete_blob = False digest_bytes = digest.encode() link_count = self.db.get(digest_bytes, txn=txn) if link_count is None: @@ -288,9 +291,11 @@ def delete(self, name, version, _lock=True): else: new_count = str(link_count - 1).encode() self.db.put(digest_bytes, new_count, txn=txn) - logger.debug('Committing DB transaction (deleting link).') + return should_delete_blob - logger.debug('Committed DB transaction (deleting link).') + should_delete_blob = self._call_in_transaction_with_retries( + transaction_contents, "deleting link" + ) os.unlink(link_path) logger.debug('Deleted link %s.', name) @@ -339,6 +344,30 @@ def _db_transaction(self): else: txn.commit() + def _call_in_transaction_with_retries(self, func, description): + retries = 0 + result = None + while True: + try: + with self._db_transaction() as txn: + logger.debug('Started DB transaction ({}).'.format(description)) + result = func(txn) + logger.debug('Commiting DB transaction ({}).'.format(description)) + except bsddb3.db.DBLockDeadlockError: + retries += 1 + if retries > _DB_DEADLOCK_RETRIES: + logger.error('BSDDB deadlock detected, retry limit exceeded ({}).'.format(description)) + raise + logger.warning( + 'BSDDB deadlock detected in transaction ({}), retry no {}.'.format( + description, retries, + ) + ) + continue + break + logger.debug('Committed DB transaction ({}).'.format(description)) + return result + def _digest_for_link(self, name): link = self._link_path(name) blob_path = os.readlink(link) From 91f7388b7e9b793d612e3d5068ad7da5e567fb80 Mon Sep 17 00:00:00 2001 From: Olaf Targowski Date: Sun, 8 Mar 2026 13:10:37 +0100 Subject: [PATCH 3/3] Aquire write locks at preceding reads for bsddb --- filetracker/servers/storage.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/filetracker/servers/storage.py b/filetracker/servers/storage.py index 1d169d1..7614413 100644 --- a/filetracker/servers/storage.py +++ b/filetracker/servers/storage.py @@ -186,7 +186,7 @@ def store( digest_bytes = digest.encode() def transaction_contents(txn): - link_count = int(self.db.get(digest_bytes, 0, txn=txn)) + link_count = int(self.db.get(digest_bytes, 0, txn=txn, flags=bsddb3.db.DB_RMW)) new_count = str(link_count + 1).encode() self.db.put(digest_bytes, new_count, txn=txn) @@ -276,7 +276,7 @@ def delete(self, name, version, _lock=True): def transaction_contents(txn): should_delete_blob = False digest_bytes = digest.encode() - link_count = self.db.get(digest_bytes, txn=txn) + link_count = self.db.get(digest_bytes, txn=txn, flags=bsddb3.db.DB_RMW) if link_count is None: raise RuntimeError("File exists but has no key in db")