adds Valkey Storage Implementation#3
Conversation
jbrinkman
left a comment
There was a problem hiding this comment.
Code Review: Valkey Storage Implementation
Thanks for this substantial contribution — adding Valkey as a distributed storage backend is a valuable addition to CrewAI's memory system. The overall architecture is sound: ValkeyStorage for the full StorageBackend protocol, ValkeyCache for lightweight caching, proper optional dependency handling in pyproject.toml, and comprehensive test coverage (~5300 lines of tests with mocked clients).
What works well
- Clean separation between
ValkeyStorage(full storage) andValkeyCache(simple cache) - Proper use of Valkey Search module for server-side vector similarity (FT.SEARCH with KNN)
- Good error handling with descriptive error messages and graceful degradation
- Comprehensive test suite covering CRUD, search, scopes, errors, and edge cases
- Optional dependency pattern (
pip install crewai[valkey]) follows existing conventions - Lazy client initialization and connection timeout handling
Issues requiring changes
Bugs:
_run_asyncdestroys cached client on every sync call — clearsself._client = Nonethen creates a new thread pool + event loop + TCP connection per call. This defeats lazy initialization and will be extremely slow under load._retry_operationcan't actually retry — accepts aCoroutineobject which can only be awaited once. Subsequent retry iterations will raiseRuntimeError. (Currently dead code, but the interface is broken.)
Performance:
3. _alist_records has N+1 query pattern — fetches ALL record IDs across scopes, issues individual HGETALL per record, sorts in Python, then applies pagination. For large scopes this loads everything into memory. Same pattern in _aget_scope_info and _alist_categories.
Design:
4. asave/adelete/_aupdate claim atomicity but aren't atomic — individual commands without MULTI/EXEC or pipeline. Docstrings should reflect actual behavior.
5. Massive code duplication in upload_cache.py — every method has if self._use_valkey: ... else: ... with repeated null checks. A strategy/adapter pattern would be cleaner.
6. Duplicate URL parsing — VALKEY_URL/REDIS_URL parsing is copy-pasted across agent_card.py, task.py, and upload_cache.py. Extract a shared helper.
7. Module-level side effects — agent_card.py and task.py both call caches.set_config() at import time. Import order determines which config wins. task.py also does a conditional from crewai.memory.storage.valkey_cache import ValkeyCache at module level when VALKEY_URL is set, which will fail if valkey-glide isn't installed.
Missing integration:
8. No way to actually select ValkeyStorage — unified_memory.py changes only modify drain_writes(). There's no configuration path for users to choose Valkey as their storage backend.
Other:
9. uv.lock includes unrelated lockfile regeneration changes — consider splitting.
10. embed_texts creates a new ThreadPoolExecutor(max_workers=1) on every call in async context — should reuse a pool.
Recommendations
- Fix bugs #1 and #2 before merging
- Address the missing integration (crewAIInc#8) — without it,
ValkeyStorageis unreachable - The performance and design issues (#3-7) are acceptable for a first version but should be tracked as follow-up work
- Consider adding an integration test that runs against a real Valkey instance (even if skipped in CI by default)
| so this lock is primarily for API compatibility with other storage backends. | ||
| """ | ||
| return self._write_lock | ||
|
|
||
| def _run_async(self, coro: Coroutine[Any, Any, Any]) -> Any: | ||
| """Bridge async operations to sync context. | ||
|
|
||
| If in async context, runs coroutine in a new thread with its own event loop. | ||
| Otherwise creates event loop in current thread. | ||
|
|
||
| Args: | ||
| coro: Coroutine to execute. | ||
|
|
||
| Returns: | ||
| Result of the coroutine execution. | ||
| """ | ||
| try: | ||
| asyncio.get_running_loop() | ||
| # We're in async context - run in thread with new event loop |
There was a problem hiding this comment.
Bug/Performance: _run_async clears self._client = None on every sync call, then creates a brand-new ThreadPoolExecutor, event loop, and Valkey connection. This means every sync operation (save, get_record, search, etc.) opens and closes a TCP connection to Valkey, which will be extremely slow under load and defeats the purpose of lazy client initialization.
Consider caching the thread-pool and reusing the event loop, or using asyncio.run_coroutine_threadsafe with a long-lived background loop. At minimum, avoid clearing self._client unless the event loop has actually changed.
|
|
||
| def _run_in_new_loop() -> Any: | ||
| # Clear cached client to avoid event loop conflicts | ||
| self._client = None | ||
| return asyncio.run(coro) | ||
|
|
||
| with concurrent.futures.ThreadPoolExecutor() as pool: | ||
| future = pool.submit(_run_in_new_loop) | ||
| return future.result() | ||
| except RuntimeError as e: | ||
| if "no running event loop" in str(e).lower(): | ||
| # Clear cached client to avoid event loop conflicts | ||
| self._client = None | ||
| return asyncio.run(coro) | ||
| raise | ||
|
|
||
| async def _retry_operation( | ||
| self, | ||
| operation: Coroutine[Any, Any, Any], | ||
| max_retries: int = 5, | ||
| ) -> Any: | ||
| """Retry operation with exponential backoff on connection errors. | ||
|
|
||
| Retries operations that fail due to connection errors using exponential | ||
| backoff starting at 0.2 seconds. Logs connection errors at debug level. | ||
|
|
||
| Args: | ||
| operation: Coroutine to execute. |
There was a problem hiding this comment.
Bug: _retry_operation accepts a Coroutine object, but a coroutine can only be awaited once. On the first attempt the coroutine is consumed; subsequent retry iterations will raise RuntimeError: cannot reuse already awaited coroutine. The retry loop will never actually retry.
This needs to accept a callable (factory) that produces a new coroutine on each attempt:
async def _retry_operation(self, operation_fn: Callable[[], Coroutine], max_retries: int = 5) -> Any:
for attempt in range(max_retries + 1):
try:
return await operation_fn()
except (ClosingError, ConnectionError) as e:
...Note: _retry_operation doesn't appear to be called anywhere in the PR, so this is currently dead code. If it's intended for future use, the interface needs fixing first.
| # Configure aiocache to use Valkey if VALKEY_URL is set, otherwise use Redis or memory | ||
| _valkey_url = os.environ.get("VALKEY_URL") | ||
| _redis_url = os.environ.get("REDIS_URL") | ||
|
|
||
| if _valkey_url: | ||
| # Parse Valkey URL for aiocache Redis backend (Valkey is Redis-compatible) | ||
| parsed = urlparse(_valkey_url) | ||
| caches.set_config( | ||
| { | ||
| "default": { | ||
| "cache": "aiocache.RedisCache", | ||
| "endpoint": parsed.hostname or "localhost", | ||
| "port": parsed.port or 6379, | ||
| "db": ( | ||
| int(parsed.path.lstrip("/")) | ||
| if parsed.path and parsed.path != "/" | ||
| else 0 | ||
| ), | ||
| "password": parsed.password, | ||
| } | ||
| } | ||
| ) | ||
| elif _redis_url: | ||
| # Use existing Redis configuration | ||
| parsed = urlparse(_redis_url) | ||
| caches.set_config( | ||
| { | ||
| "default": { | ||
| "cache": "aiocache.RedisCache", | ||
| "endpoint": parsed.hostname or "localhost", | ||
| "port": parsed.port or 6379, | ||
| "db": ( | ||
| int(parsed.path.lstrip("/")) | ||
| if parsed.path and parsed.path != "/" | ||
| else 0 | ||
| ), | ||
| "password": parsed.password, | ||
| } | ||
| } | ||
| ) | ||
| else: | ||
| # Use memory cache (default) | ||
| caches.set_config( | ||
| { | ||
| "default": { | ||
| "cache": "aiocache.SimpleMemoryCache", | ||
| } | ||
| } | ||
| ) | ||
|
|
||
|
|
There was a problem hiding this comment.
Duplication: The VALKEY_URL and REDIS_URL parsing blocks are nearly identical (same urlparse → caches.set_config pattern repeated 3 times across agent_card.py, task.py, and upload_cache.py). Consider extracting a shared helper like _parse_valkey_or_redis_url() to reduce duplication and ensure consistent behavior.
Also, this module-level code runs at import time and has a side effect (calling caches.set_config). If both agent_card.py and task.py are imported, they'll each call caches.set_config with potentially different configurations, and the last one wins. This could lead to subtle ordering bugs.
| return None | ||
|
|
||
| async def _ensure_vector_index(self) -> None: | ||
| """Create Valkey Search vector index if it doesn't exist. | ||
|
|
||
| Creates an index named 'memory_index' on record:* hashes with: | ||
| - Vector field for embeddings (HNSW or FLAT algorithm) | ||
| - TAG fields for scope and categories | ||
| - NUMERIC fields for created_at and importance | ||
|
|
||
| Raises: | ||
| RuntimeError: If Valkey Search module is not available. | ||
| """ | ||
| if self._index_created: | ||
| return | ||
|
|
||
| client = await self._get_client() | ||
|
|
||
| try: | ||
| # Check if index exists using FT.INFO | ||
| cmd: list[str | bytes] = ["FT.INFO", "memory_index"] | ||
| await client.custom_command(cmd) | ||
| _logger.debug("Vector index 'memory_index' already exists") | ||
| self._index_created = True | ||
| return | ||
| except Exception as e: | ||
| # Index doesn't exist, create it | ||
| _logger.debug(f"Index does not exist, will create: {e}") |
There was a problem hiding this comment.
Correctness: The docstring says "Save multiple records atomically" but the implementation issues separate HSET, ZADD, and SADD commands per record without using a Valkey transaction (MULTI/EXEC) or pipeline. If the process crashes mid-batch, some records will be saved with incomplete indexes.
For a first version this is probably acceptable, but the docstring should be updated to reflect the actual behavior, or a pipeline should be used for true atomicity. Same applies to adelete and _aupdate.
| if not valid: | ||
| return [[] for _ in texts] | ||
|
|
||
| result = embedder([t for _, t in valid]) | ||
| # Check if we're in an async context | ||
| result: Any | ||
| try: | ||
| asyncio.get_running_loop() | ||
| # We're in an async context, but this is a sync function | ||
| # Run embedder in thread pool to avoid blocking the event loop | ||
| with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: | ||
| sync_future = pool.submit(embedder, [t for _, t in valid]) | ||
| result = sync_future.result(timeout=30) | ||
| except RuntimeError: | ||
| # Not in async context, run directly | ||
| result = embedder([t for _, t in valid]) | ||
|
|
There was a problem hiding this comment.
Concern: This creates a new ThreadPoolExecutor(max_workers=1) on every call to embed_texts when in an async context. Thread pool creation has overhead and this function may be called frequently during batch encoding. Consider using a module-level or class-level thread pool.
Also, sync_future.result(timeout=30) will raise concurrent.futures.TimeoutError (not TimeoutError), so the error handling may not behave as expected. The 30s timeout is hardcoded — consider making it configurable or at least documenting it.
| # Initialize cache based on configuration | ||
| # Priority: VALKEY_URL > REDIS_URL > SimpleMemoryCache | ||
| _valkey_url = os.environ.get("VALKEY_URL") | ||
| _redis_url = os.environ.get("REDIS_URL") | ||
|
|
||
| caches.set_config( | ||
| { | ||
| "default": _parse_redis_url(_redis_url) | ||
| if _redis_url | ||
| else { | ||
| "cache": "aiocache.SimpleMemoryCache", | ||
| if _valkey_url: | ||
| # Use ValkeyCache for caching | ||
| from crewai.memory.storage.valkey_cache import ValkeyCache | ||
|
|
||
| parsed = urlparse(_valkey_url) | ||
| _task_cache = ValkeyCache( | ||
| host=parsed.hostname or "localhost", | ||
| port=parsed.port or 6379, | ||
| db=int(parsed.path.lstrip("/")) if parsed.path and parsed.path != "/" else 0, | ||
| password=parsed.password, | ||
| default_ttl=3600, # 1 hour default TTL | ||
| ) | ||
| _use_valkey_cache = True | ||
| else: | ||
| # Fallback to existing aiocache configuration | ||
| caches.set_config( | ||
| { | ||
| "default": _parse_redis_url(_redis_url) | ||
| if _redis_url | ||
| else { | ||
| "cache": "aiocache.SimpleMemoryCache", | ||
| } | ||
| } | ||
| } | ||
| ) | ||
| ) | ||
| _use_valkey_cache = False |
There was a problem hiding this comment.
Design concern: When VALKEY_URL is set, this creates a ValkeyCache instance at module import time (_task_cache). This means:
- Importing this module triggers a
from crewai.memory.storage.valkey_cache import ValkeyCachewhich requiresvalkey-glideto be installed — but it's an optional dependency. Users who haveVALKEY_URLset in their environment but haven't installedcrewai[valkey]will get anImportErrorat import time. - The
_task_cacheis a module-level global with no cleanup path. TheValkeyCache.close()method is never called.
Consider lazy initialization or guarding the import with a try/except.
There was a problem hiding this comment.
Missing integration: ValkeyStorage is mentioned in the PR description as being "wired as a storage backend option" in unified_memory.py, but looking at the diff for unified_memory.py, there's no code that actually instantiates ValkeyStorage based on configuration. The unified_memory.py changes only modify drain_writes() behavior.
How does a user actually select Valkey as their storage backend? Is there a configuration option or environment variable? This seems like a missing piece of the integration.
There was a problem hiding this comment.
Nit: The uv.lock diff includes unrelated changes — exclude-newer timestamp changed, unstructured-inference version resolution simplified, onnxruntime/coloredlogs/humanfriendly marker changes, etc. These appear to be lockfile regeneration artifacts. Consider splitting these into a separate commit to keep the Valkey changes isolated and reviewable.
4b3f15f to
50ff341
Compare
edlng
left a comment
There was a problem hiding this comment.
Have not looked too deeply at the tests yet. Will take another look later. Mostly comments about using native GLIDE features instead of custom command
| # --------------------------------------------------------------------------- | ||
| # Helper: parse Valkey/Redis URL from environment | ||
| # --------------------------------------------------------------------------- | ||
|
|
||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # UploadCache | ||
| # --------------------------------------------------------------------------- |
There was a problem hiding this comment.
Do we need these sections? Otherwise I think we can just
| # --------------------------------------------------------------------------- | |
| # Helper: parse Valkey/Redis URL from environment | |
| # --------------------------------------------------------------------------- | |
| # --------------------------------------------------------------------------- | |
| # UploadCache | |
| # --------------------------------------------------------------------------- |
| return ValkeyCacheBackend( | ||
| host=cache_kwargs.get("host", conn.get("host", "localhost")), | ||
| port=cache_kwargs.get("port", conn.get("port", 6379)), | ||
| db=cache_kwargs.get("db", conn.get("db", 0)), | ||
| password=cache_kwargs.get("password", conn.get("password")), | ||
| default_ttl=ttl, | ||
| ) |
There was a problem hiding this comment.
The other two options seem to return AiocacheBackend(Cache(...)). Not sure if we need to follow this
| """Evict oldest entries if limit exceeded. | ||
|
|
||
| Returns: | ||
| Number of entries evicted. | ||
| """ |
There was a problem hiding this comment.
Don't think these docs were meant to be deleted? Provides good details regarding args and return. Applies to the rest of the file too.
| TLS Support: The current version of valkey-glide Python client has limited | ||
| TLS configuration options. Custom CA certificates and client certificates | ||
| are not yet supported in the Python binding. Use use_tls=True for basic | ||
| TLS encryption with system CA certificates. |
There was a problem hiding this comment.
Is this true? I think there's support for custom certs
| tls_ca_cert_path: Reserved for future use (not yet supported by GLIDE). | ||
| tls_client_cert_path: Reserved for future use (not yet supported by GLIDE). | ||
| tls_client_key_path: Reserved for future use (not yet supported by GLIDE). |
There was a problem hiding this comment.
Take a look at https://github.com/valkey-io/valkey-glide/blob/24adccbfe31a607e2bb4b9dd6fd81c9605733b89/python/glide-shared/glide_shared/config.py#L1388 and TlsAdvancedConfiguration
| search_cmd: list[str | bytes] = [ | ||
| "FT.SEARCH", | ||
| "memory_index", | ||
| query, | ||
| "PARAMS", | ||
| "2", | ||
| "BLOB", | ||
| embedding_blob, | ||
| "RETURN", | ||
| "11", # Increased from 10 to include score | ||
| "id", | ||
| "content", | ||
| "scope", | ||
| "categories", | ||
| "metadata", | ||
| "importance", | ||
| "created_at", | ||
| "last_accessed", | ||
| "source", | ||
| "private", | ||
| "score", # Add score field | ||
| "LIMIT", | ||
| "0", | ||
| str(limit), | ||
| ] | ||
|
|
||
| try: | ||
| # Execute FT.SEARCH | ||
| result = await client.custom_command(search_cmd) |
There was a problem hiding this comment.
Replace custom command with native method
| # Parse results | ||
| # Result format can be either: | ||
| # 1. Flat list: [total_count, doc1_key, [field1, value1, ...], doc2_key, ...] | ||
| # 2. Dict format: [total_count, {doc1_key: {field: value, ...}, doc2_key: {...}}] | ||
| if not result or not isinstance(result, list) or len(result) < 1: | ||
| return [] |
There was a problem hiding this comment.
it's kind of weird that it would return two formats... maybe it's because of custom command? If we use the native method, i think we can just remove it to just the dict format.
| key_for_cmd = ( | ||
| key_bytes | ||
| if isinstance(key_bytes, bytes) | ||
| else key_bytes.encode("utf-8") | ||
| ) | ||
| members_result = await client.custom_command( | ||
| [b"ZRANGE", key_for_cmd, b"0", b"-1"] | ||
| ) |
| if isinstance(v, list): | ||
| return [float(x) for x in v] | ||
| # Fallback: assume it's already a valid list[float] | ||
| return v # type: ignore[no-any-return] |
There was a problem hiding this comment.
I think we can collapse it down to just
| if isinstance(v, list): | |
| return [float(x) for x in v] | |
| # Fallback: assume it's already a valid list[float] | |
| return v # type: ignore[no-any-return] | |
| return [float(x) for x in v] |
| # Mock FT.INFO to simulate index exists | ||
| mock_glide_client.custom_command.side_effect = [ | ||
| {"index_name": "memory_index"}, # FT.INFO response | ||
| None, # HSET response | ||
| ] |
There was a problem hiding this comment.
use native method, applies to the rest of the file too
2e55991 to
c282ab2
Compare
| # Mock sinter to return intersection (records with ANY category) | ||
| mock_glide_client.sunion.return_value = {"record-1", "record-2", "record-3", "record-4"} |
There was a problem hiding this comment.
I don't think sunion is ever called though right?
| """Integration tests demonstrating retry behavior patterns.""" | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_mock_client_operation_with_retry_pattern( |
There was a problem hiding this comment.
Should this test be using _retry_operation?
| @pytest.mark.asyncio | ||
| @patch("crewai.memory.storage.valkey_storage.ft.search") | ||
| @patch("crewai.memory.storage.valkey_storage.ft.info") | ||
| async def test_search_with_zero_limit_returns_empty( | ||
| self, mock_ft_info: AsyncMock, mock_ft_search: AsyncMock, | ||
| valkey_storage: ValkeyStorage, mock_glide_client: AsyncMock | ||
| ) -> None: | ||
| """Test search with limit=0 returns empty results.""" | ||
| mock_ft_info.return_value = {"index_name": "memory_index"} | ||
| mock_ft_search.return_value = [0] | ||
|
|
||
| query_embedding = [0.1, 0.2, 0.3, 0.4] | ||
| results = await valkey_storage.asearch(query_embedding, limit=0) | ||
|
|
||
| # Verify empty results | ||
| assert len(results) == 0 |
There was a problem hiding this comment.
This might be redundant. We're trying to test that limit=0 gives us 0 results but this logic can also give 0 when we have limit=100 or something...
There was a problem hiding this comment.
Note for all test files. I feel like there's room for significant trimming. Most of these can be combined into 1 unit test instead of multiple tests (for example, test_set_and_get_string_value, test_set_and_get_dict_value, test_set_and_get_list_value). Doing this can help reduce a bunch of lines and make it easier for the reviewer.
| fields["private"] = "true" if record.private else "false" | ||
|
|
||
| # Add score (Valkey Search returns cosine distance, not similarity) | ||
| # Convert similarity to distance: distance = 2 * (1 - similarity) |
There was a problem hiding this comment.
Can be a tiny bit clearer
| # Convert similarity to distance: distance = 2 * (1 - similarity) | |
| # Convert similarity to cosine distance: distance = 2 * (1 - similarity) |
148515c to
3a5ecce
Compare
3a5ecce to
04b4254
Compare
Adds Valkey as a storage backend for CrewAI's unified memory system, using the valkey-glide client. Valkey is a high-performance, Redis-compatible key-value store that provides a distributed, production-ready alternative to the existing LanceDB and Qdrant storage options.
What's included
ValkeyStorage (valkey_storage.py) — full StorageBackend implementation:
CRUD operations (save, get, update, delete) with both sync and async APIs
Server-side vector search via Valkey Search module (FT.SEARCH with KNN)
Scope-based record organization with hierarchical scope queries
Category indexing and filtering
Metadata filtering with AND logic
Pagination support (limit/offset) for list_records
Scope introspection (get_scope_info, list_scopes, list_categories, count)
Bulk delete with scope, category, metadata, and age-based filters
Connection retry with exponential backoff for transient errors
Lazy client initialization and async context manager support
reset for clearing all or scoped records
ValkeyCache (valkey_cache.py) — lightweight cache interface:
get/set/delete/exists with optional TTL
JSON serialization for complex values
Connection timeout handling
Used by A2A agent card caching and file upload caching as an alternative to Redis
Integration points:
unified_memory.py — wired as a storage backend option
encoding_flow.py — encoding flow support for Valkey storage
memory_tools.py — memory tool descriptions updated with clearer parameter docs
upload_cache.py — added ValkeyCache as a cache backend option (alongside memory/Redis)
agent_card.py / task.py — added VALKEY_URL environment variable support for aiocache configuration, with priority over REDIS_URL
Optional dependency:
Added valkey = ["valkey-glide>=1.3.0"] to [project.optional-dependencies] in
pyproject.toml
Install with pip install crewai[valkey]
Tests
~5,300 lines of tests across 5 test files covering:
Core CRUD and index operations (test_valkey_storage.py)
Vector search with filters, scoring, pagination (test_valkey_storage_search.py)
Scope operations, listing, categories, count, reset (test_valkey_storage_scope.py)
Error handling, retry, serialization/deserialization (test_valkey_storage_errors.py)
Cache operations with TTL (test_valkey_cache.py)
All tests use mocked Valkey clients — no running Valkey instance required.
fixes crewAIInc#5578