Skip to content

adds Valkey Storage Implementation#3

Open
MatthiasHowellYopp wants to merge 1 commit intomainfrom
feat/valkey-storage
Open

adds Valkey Storage Implementation#3
MatthiasHowellYopp wants to merge 1 commit intomainfrom
feat/valkey-storage

Conversation

@MatthiasHowellYopp
Copy link
Copy Markdown
Owner

@MatthiasHowellYopp MatthiasHowellYopp commented Apr 21, 2026

Adds Valkey as a storage backend for CrewAI's unified memory system, using the valkey-glide client. Valkey is a high-performance, Redis-compatible key-value store that provides a distributed, production-ready alternative to the existing LanceDB and Qdrant storage options.

What's included
ValkeyStorage (valkey_storage.py) — full StorageBackend implementation:

CRUD operations (save, get, update, delete) with both sync and async APIs
Server-side vector search via Valkey Search module (FT.SEARCH with KNN)
Scope-based record organization with hierarchical scope queries
Category indexing and filtering
Metadata filtering with AND logic
Pagination support (limit/offset) for list_records
Scope introspection (get_scope_info, list_scopes, list_categories, count)
Bulk delete with scope, category, metadata, and age-based filters
Connection retry with exponential backoff for transient errors
Lazy client initialization and async context manager support
reset for clearing all or scoped records
ValkeyCache (valkey_cache.py) — lightweight cache interface:

get/set/delete/exists with optional TTL
JSON serialization for complex values
Connection timeout handling
Used by A2A agent card caching and file upload caching as an alternative to Redis
Integration points:

unified_memory.py — wired as a storage backend option
encoding_flow.py — encoding flow support for Valkey storage
memory_tools.py — memory tool descriptions updated with clearer parameter docs
upload_cache.py — added ValkeyCache as a cache backend option (alongside memory/Redis)
agent_card.py / task.py — added VALKEY_URL environment variable support for aiocache configuration, with priority over REDIS_URL
Optional dependency:

Added valkey = ["valkey-glide>=1.3.0"] to [project.optional-dependencies] in
pyproject.toml
Install with pip install crewai[valkey]
Tests
~5,300 lines of tests across 5 test files covering:

Core CRUD and index operations (test_valkey_storage.py)
Vector search with filters, scoring, pagination (test_valkey_storage_search.py)
Scope operations, listing, categories, count, reset (test_valkey_storage_scope.py)
Error handling, retry, serialization/deserialization (test_valkey_storage_errors.py)
Cache operations with TTL (test_valkey_cache.py)
All tests use mocked Valkey clients — no running Valkey instance required.

fixes crewAIInc#5578

Copy link
Copy Markdown

@jbrinkman jbrinkman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review: Valkey Storage Implementation

Thanks for this substantial contribution — adding Valkey as a distributed storage backend is a valuable addition to CrewAI's memory system. The overall architecture is sound: ValkeyStorage for the full StorageBackend protocol, ValkeyCache for lightweight caching, proper optional dependency handling in pyproject.toml, and comprehensive test coverage (~5300 lines of tests with mocked clients).

What works well

  • Clean separation between ValkeyStorage (full storage) and ValkeyCache (simple cache)
  • Proper use of Valkey Search module for server-side vector similarity (FT.SEARCH with KNN)
  • Good error handling with descriptive error messages and graceful degradation
  • Comprehensive test suite covering CRUD, search, scopes, errors, and edge cases
  • Optional dependency pattern (pip install crewai[valkey]) follows existing conventions
  • Lazy client initialization and connection timeout handling

Issues requiring changes

Bugs:

  1. _run_async destroys cached client on every sync call — clears self._client = None then creates a new thread pool + event loop + TCP connection per call. This defeats lazy initialization and will be extremely slow under load.
  2. _retry_operation can't actually retry — accepts a Coroutine object which can only be awaited once. Subsequent retry iterations will raise RuntimeError. (Currently dead code, but the interface is broken.)

Performance:
3. _alist_records has N+1 query pattern — fetches ALL record IDs across scopes, issues individual HGETALL per record, sorts in Python, then applies pagination. For large scopes this loads everything into memory. Same pattern in _aget_scope_info and _alist_categories.

Design:
4. asave/adelete/_aupdate claim atomicity but aren't atomic — individual commands without MULTI/EXEC or pipeline. Docstrings should reflect actual behavior.
5. Massive code duplication in upload_cache.py — every method has if self._use_valkey: ... else: ... with repeated null checks. A strategy/adapter pattern would be cleaner.
6. Duplicate URL parsing — VALKEY_URL/REDIS_URL parsing is copy-pasted across agent_card.py, task.py, and upload_cache.py. Extract a shared helper.
7. Module-level side effectsagent_card.py and task.py both call caches.set_config() at import time. Import order determines which config wins. task.py also does a conditional from crewai.memory.storage.valkey_cache import ValkeyCache at module level when VALKEY_URL is set, which will fail if valkey-glide isn't installed.

Missing integration:
8. No way to actually select ValkeyStorageunified_memory.py changes only modify drain_writes(). There's no configuration path for users to choose Valkey as their storage backend.

Other:
9. uv.lock includes unrelated lockfile regeneration changes — consider splitting.
10. embed_texts creates a new ThreadPoolExecutor(max_workers=1) on every call in async context — should reuse a pool.

Recommendations

  • Fix bugs #1 and #2 before merging
  • Address the missing integration (crewAIInc#8) — without it, ValkeyStorage is unreachable
  • The performance and design issues (#3-7) are acceptable for a first version but should be tracked as follow-up work
  • Consider adding an integration test that runs against a real Valkey instance (even if skipped in CI by default)

Comment on lines +178 to +196
so this lock is primarily for API compatibility with other storage backends.
"""
return self._write_lock

def _run_async(self, coro: Coroutine[Any, Any, Any]) -> Any:
"""Bridge async operations to sync context.

If in async context, runs coroutine in a new thread with its own event loop.
Otherwise creates event loop in current thread.

Args:
coro: Coroutine to execute.

Returns:
Result of the coroutine execution.
"""
try:
asyncio.get_running_loop()
# We're in async context - run in thread with new event loop
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug/Performance: _run_async clears self._client = None on every sync call, then creates a brand-new ThreadPoolExecutor, event loop, and Valkey connection. This means every sync operation (save, get_record, search, etc.) opens and closes a TCP connection to Valkey, which will be extremely slow under load and defeats the purpose of lazy client initialization.

Consider caching the thread-pool and reusing the event loop, or using asyncio.run_coroutine_threadsafe with a long-lived background loop. At minimum, avoid clearing self._client unless the event loop has actually changed.

Comment on lines +198 to +225

def _run_in_new_loop() -> Any:
# Clear cached client to avoid event loop conflicts
self._client = None
return asyncio.run(coro)

with concurrent.futures.ThreadPoolExecutor() as pool:
future = pool.submit(_run_in_new_loop)
return future.result()
except RuntimeError as e:
if "no running event loop" in str(e).lower():
# Clear cached client to avoid event loop conflicts
self._client = None
return asyncio.run(coro)
raise

async def _retry_operation(
self,
operation: Coroutine[Any, Any, Any],
max_retries: int = 5,
) -> Any:
"""Retry operation with exponential backoff on connection errors.

Retries operations that fail due to connection errors using exponential
backoff starting at 0.2 seconds. Logs connection errors at debug level.

Args:
operation: Coroutine to execute.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: _retry_operation accepts a Coroutine object, but a coroutine can only be awaited once. On the first attempt the coroutine is consumed; subsequent retry iterations will raise RuntimeError: cannot reuse already awaited coroutine. The retry loop will never actually retry.

This needs to accept a callable (factory) that produces a new coroutine on each attempt:

async def _retry_operation(self, operation_fn: Callable[[], Coroutine], max_retries: int = 5) -> Any:
    for attempt in range(max_retries + 1):
        try:
            return await operation_fn()
        except (ClosingError, ConnectionError) as e:
            ...

Note: _retry_operation doesn't appear to be called anywhere in the PR, so this is currently dead code. If it's intended for future use, the interface needs fixing first.

Comment on lines +49 to +99
# Configure aiocache to use Valkey if VALKEY_URL is set, otherwise use Redis or memory
_valkey_url = os.environ.get("VALKEY_URL")
_redis_url = os.environ.get("REDIS_URL")

if _valkey_url:
# Parse Valkey URL for aiocache Redis backend (Valkey is Redis-compatible)
parsed = urlparse(_valkey_url)
caches.set_config(
{
"default": {
"cache": "aiocache.RedisCache",
"endpoint": parsed.hostname or "localhost",
"port": parsed.port or 6379,
"db": (
int(parsed.path.lstrip("/"))
if parsed.path and parsed.path != "/"
else 0
),
"password": parsed.password,
}
}
)
elif _redis_url:
# Use existing Redis configuration
parsed = urlparse(_redis_url)
caches.set_config(
{
"default": {
"cache": "aiocache.RedisCache",
"endpoint": parsed.hostname or "localhost",
"port": parsed.port or 6379,
"db": (
int(parsed.path.lstrip("/"))
if parsed.path and parsed.path != "/"
else 0
),
"password": parsed.password,
}
}
)
else:
# Use memory cache (default)
caches.set_config(
{
"default": {
"cache": "aiocache.SimpleMemoryCache",
}
}
)


Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplication: The VALKEY_URL and REDIS_URL parsing blocks are nearly identical (same urlparsecaches.set_config pattern repeated 3 times across agent_card.py, task.py, and upload_cache.py). Consider extracting a shared helper like _parse_valkey_or_redis_url() to reduce duplication and ensure consistent behavior.

Also, this module-level code runs at import time and has a side effect (calling caches.set_config). If both agent_card.py and task.py are imported, they'll each call caches.set_config with potentially different configurations, and the last one wins. This could lead to subtle ordering bugs.

Comment on lines +449 to +476
return None

async def _ensure_vector_index(self) -> None:
"""Create Valkey Search vector index if it doesn't exist.

Creates an index named 'memory_index' on record:* hashes with:
- Vector field for embeddings (HNSW or FLAT algorithm)
- TAG fields for scope and categories
- NUMERIC fields for created_at and importance

Raises:
RuntimeError: If Valkey Search module is not available.
"""
if self._index_created:
return

client = await self._get_client()

try:
# Check if index exists using FT.INFO
cmd: list[str | bytes] = ["FT.INFO", "memory_index"]
await client.custom_command(cmd)
_logger.debug("Vector index 'memory_index' already exists")
self._index_created = True
return
except Exception as e:
# Index doesn't exist, create it
_logger.debug(f"Index does not exist, will create: {e}")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correctness: The docstring says "Save multiple records atomically" but the implementation issues separate HSET, ZADD, and SADD commands per record without using a Valkey transaction (MULTI/EXEC) or pipeline. If the process crashes mid-batch, some records will be saved with incomplete indexes.

For a first version this is probably acceptable, but the docstring should be updated to reflect the actual behavior, or a pipeline should be used for true atomicity. Same applies to adelete and _aupdate.

Comment on lines 364 to +379
if not valid:
return [[] for _ in texts]

result = embedder([t for _, t in valid])
# Check if we're in an async context
result: Any
try:
asyncio.get_running_loop()
# We're in an async context, but this is a sync function
# Run embedder in thread pool to avoid blocking the event loop
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
sync_future = pool.submit(embedder, [t for _, t in valid])
result = sync_future.result(timeout=30)
except RuntimeError:
# Not in async context, run directly
result = embedder([t for _, t in valid])

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concern: This creates a new ThreadPoolExecutor(max_workers=1) on every call to embed_texts when in an async context. Thread pool creation has overhead and this function may be called frequently during batch encoding. Consider using a module-level or class-level thread pool.

Also, sync_future.result(timeout=30) will raise concurrent.futures.TimeoutError (not TimeoutError), so the error handling may not behave as expected. The 30s timeout is hardcoded — consider making it configurable or at least documenting it.

Comment thread lib/crewai/src/crewai/a2a/utils/task.py Outdated
Comment on lines +102 to +131
# Initialize cache based on configuration
# Priority: VALKEY_URL > REDIS_URL > SimpleMemoryCache
_valkey_url = os.environ.get("VALKEY_URL")
_redis_url = os.environ.get("REDIS_URL")

caches.set_config(
{
"default": _parse_redis_url(_redis_url)
if _redis_url
else {
"cache": "aiocache.SimpleMemoryCache",
if _valkey_url:
# Use ValkeyCache for caching
from crewai.memory.storage.valkey_cache import ValkeyCache

parsed = urlparse(_valkey_url)
_task_cache = ValkeyCache(
host=parsed.hostname or "localhost",
port=parsed.port or 6379,
db=int(parsed.path.lstrip("/")) if parsed.path and parsed.path != "/" else 0,
password=parsed.password,
default_ttl=3600, # 1 hour default TTL
)
_use_valkey_cache = True
else:
# Fallback to existing aiocache configuration
caches.set_config(
{
"default": _parse_redis_url(_redis_url)
if _redis_url
else {
"cache": "aiocache.SimpleMemoryCache",
}
}
}
)
)
_use_valkey_cache = False
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design concern: When VALKEY_URL is set, this creates a ValkeyCache instance at module import time (_task_cache). This means:

  1. Importing this module triggers a from crewai.memory.storage.valkey_cache import ValkeyCache which requires valkey-glide to be installed — but it's an optional dependency. Users who have VALKEY_URL set in their environment but haven't installed crewai[valkey] will get an ImportError at import time.
  2. The _task_cache is a module-level global with no cleanup path. The ValkeyCache.close() method is never called.

Consider lazy initialization or guarding the import with a try/except.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing integration: ValkeyStorage is mentioned in the PR description as being "wired as a storage backend option" in unified_memory.py, but looking at the diff for unified_memory.py, there's no code that actually instantiates ValkeyStorage based on configuration. The unified_memory.py changes only modify drain_writes() behavior.

How does a user actually select Valkey as their storage backend? Is there a configuration option or environment variable? This seems like a missing piece of the integration.

Comment thread uv.lock
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: The uv.lock diff includes unrelated changes — exclude-newer timestamp changed, unstructured-inference version resolution simplified, onnxruntime/coloredlogs/humanfriendly marker changes, etc. These appear to be lockfile regeneration artifacts. Consider splitting these into a separate commit to keep the Valkey changes isolated and reviewable.

@MatthiasHowellYopp MatthiasHowellYopp force-pushed the feat/valkey-storage branch 22 times, most recently from 4b3f15f to 50ff341 Compare April 30, 2026 14:55
Copy link
Copy Markdown
Collaborator

@edlng edlng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have not looked too deeply at the tests yet. Will take another look later. Mostly comments about using native GLIDE features instead of custom command

Comment on lines +175 to +182
# ---------------------------------------------------------------------------
# Helper: parse Valkey/Redis URL from environment
# ---------------------------------------------------------------------------


# ---------------------------------------------------------------------------
# UploadCache
# ---------------------------------------------------------------------------
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need these sections? Otherwise I think we can just

Suggested change
# ---------------------------------------------------------------------------
# Helper: parse Valkey/Redis URL from environment
# ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
# UploadCache
# ---------------------------------------------------------------------------

Comment on lines +233 to 239
return ValkeyCacheBackend(
host=cache_kwargs.get("host", conn.get("host", "localhost")),
port=cache_kwargs.get("port", conn.get("port", 6379)),
db=cache_kwargs.get("db", conn.get("db", 0)),
password=cache_kwargs.get("password", conn.get("password")),
default_ttl=ttl,
)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other two options seem to return AiocacheBackend(Cache(...)). Not sure if we need to follow this

Comment on lines -153 to -157
"""Evict oldest entries if limit exceeded.

Returns:
Number of entries evicted.
"""
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think these docs were meant to be deleted? Provides good details regarding args and return. Applies to the rest of the file too.

Comment on lines +70 to +73
TLS Support: The current version of valkey-glide Python client has limited
TLS configuration options. Custom CA certificates and client certificates
are not yet supported in the Python binding. Use use_tls=True for basic
TLS encryption with system CA certificates.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this true? I think there's support for custom certs

Comment on lines +81 to +83
tls_ca_cert_path: Reserved for future use (not yet supported by GLIDE).
tls_client_cert_path: Reserved for future use (not yet supported by GLIDE).
tls_client_key_path: Reserved for future use (not yet supported by GLIDE).
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +1305 to +1333
search_cmd: list[str | bytes] = [
"FT.SEARCH",
"memory_index",
query,
"PARAMS",
"2",
"BLOB",
embedding_blob,
"RETURN",
"11", # Increased from 10 to include score
"id",
"content",
"scope",
"categories",
"metadata",
"importance",
"created_at",
"last_accessed",
"source",
"private",
"score", # Add score field
"LIMIT",
"0",
str(limit),
]

try:
# Execute FT.SEARCH
result = await client.custom_command(search_cmd)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace custom command with native method

Comment on lines +1335 to +1340
# Parse results
# Result format can be either:
# 1. Flat list: [total_count, doc1_key, [field1, value1, ...], doc2_key, ...]
# 2. Dict format: [total_count, {doc1_key: {field: value, ...}, doc2_key: {...}}]
if not result or not isinstance(result, list) or len(result) < 1:
return []
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's kind of weird that it would return two formats... maybe it's because of custom command? If we use the native method, i think we can just remove it to just the dict format.

Comment on lines +1609 to +1616
key_for_cmd = (
key_bytes
if isinstance(key_bytes, bytes)
else key_bytes.encode("utf-8")
)
members_result = await client.custom_command(
[b"ZRANGE", key_for_cmd, b"0", b"-1"]
)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replace with native method

Comment thread lib/crewai/src/crewai/memory/types.py Outdated
Comment on lines +79 to +82
if isinstance(v, list):
return [float(x) for x in v]
# Fallback: assume it's already a valid list[float]
return v # type: ignore[no-any-return]
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can collapse it down to just

Suggested change
if isinstance(v, list):
return [float(x) for x in v]
# Fallback: assume it's already a valid list[float]
return v # type: ignore[no-any-return]
return [float(x) for x in v]

Comment on lines +64 to +68
# Mock FT.INFO to simulate index exists
mock_glide_client.custom_command.side_effect = [
{"index_name": "memory_index"}, # FT.INFO response
None, # HSET response
]
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use native method, applies to the rest of the file too

@MatthiasHowellYopp MatthiasHowellYopp force-pushed the feat/valkey-storage branch 2 times, most recently from 2e55991 to c282ab2 Compare April 30, 2026 20:06
Copy link
Copy Markdown
Collaborator

@edlng edlng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review 2: Test Files

Comment on lines +1692 to +1693
# Mock sinter to return intersection (records with ANY category)
mock_glide_client.sunion.return_value = {"record-1", "record-2", "record-3", "record-4"}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think sunion is ever called though right?

"""Integration tests demonstrating retry behavior patterns."""

@pytest.mark.asyncio
async def test_mock_client_operation_with_retry_pattern(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this test be using _retry_operation?

Comment on lines +916 to +931
@pytest.mark.asyncio
@patch("crewai.memory.storage.valkey_storage.ft.search")
@patch("crewai.memory.storage.valkey_storage.ft.info")
async def test_search_with_zero_limit_returns_empty(
self, mock_ft_info: AsyncMock, mock_ft_search: AsyncMock,
valkey_storage: ValkeyStorage, mock_glide_client: AsyncMock
) -> None:
"""Test search with limit=0 returns empty results."""
mock_ft_info.return_value = {"index_name": "memory_index"}
mock_ft_search.return_value = [0]

query_embedding = [0.1, 0.2, 0.3, 0.4]
results = await valkey_storage.asearch(query_embedding, limit=0)

# Verify empty results
assert len(results) == 0
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be redundant. We're trying to test that limit=0 gives us 0 results but this logic can also give 0 when we have limit=100 or something...

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for all test files. I feel like there's room for significant trimming. Most of these can be combined into 1 unit test instead of multiple tests (for example, test_set_and_get_string_value, test_set_and_get_dict_value, test_set_and_get_list_value). Doing this can help reduce a bunch of lines and make it easier for the reviewer.

fields["private"] = "true" if record.private else "false"

# Add score (Valkey Search returns cosine distance, not similarity)
# Convert similarity to distance: distance = 2 * (1 - similarity)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be a tiny bit clearer

Suggested change
# Convert similarity to distance: distance = 2 * (1 - similarity)
# Convert similarity to cosine distance: distance = 2 * (1 - similarity)

@MatthiasHowellYopp MatthiasHowellYopp force-pushed the feat/valkey-storage branch 2 times, most recently from 148515c to 3a5ecce Compare May 1, 2026 20:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEATURE] Add Valkey as a storage backend for the unified memory system

3 participants