Add TP=2 support for KVP2P#5
Open
aranadive wants to merge 9 commits into
Open
Conversation
- Fix ppid <= 1 to ppid < 1 for containers where dynamo is PID 1 - Add TP=1 inline mode detection in proc-tree walk - Add _get_secondary_pool() shim for builds without get_unique_secondary_pool - Unwrap kv.impl before feature-detecting C++ methods on the KV cache manager - Add pin_blocks_by_id fallback for builds without get_slot_idx_by_block_id Signed-off-by: Adit Ranadive <aranadive@nvidia.com>
… model (S1-S5)
Add TP>1 support to the source side of the remote-G2 KV cache connector.
With tensor parallelism, each TP rank holds its own slice of every KV
block in a separate host-pinned secondary pool with its own NIXL agent.
The target needs per-rank metadata and descriptors to issue parallel
RDMA READs to each source rank independently.
Source-side changes (S1-S5):
S1: _gather_per_rank_nixl_metadata() — at startup, every rank builds
its own NIXL agent, then all ranks participate in an MPI allgather
to collect (agent_name, agent_desc, pool_base_ptr, pool_size_bytes)
from all TP siblings. Rank 0 caches the bundle.
S2: _gather_per_rank_descriptors() — on resolve RPC, rank 0 broadcasts
block hashes to TP siblings via MPI, each calls local
findAndPinSecondaryBlockByHash, rank 0 gathers offsets.
S3: _start_sibling_rank_resolve_loop() — non-rank-0 TP siblings run
a daemon thread that blocks on MPI broadcast, does local resolve,
and participates in the MPI allgather back to rank 0.
S4: _result_to_dict serializes per_rank_descriptors and
per_rank_source_metadata alongside flat descriptors (backward-
compatible — TP=1 omits the per-rank fields).
S5: get_metadata RPC returns per_rank_metadata list from the S1 bundle.
Data model changes:
RemoteG2ResolveResult gains per_rank_descriptors (dict[int, ...]) and
per_rank_source_metadata (dict[int, dict]) fields. Both default to
empty dicts for TP=1 backward compatibility.
Orchestration:
py_executor.py passes tp_rank and tp_size from self.dist to both
maybe_start_remote_g2_service and maybe_start_remote_g2_target_client.
NIXL agent names are rank-qualified for TP>1:
remote-g2-source-{worker_id}-tp{rank}
DP mode note:
Uses mpi_allgather/mpi_broadcast from tensorrt_llm._utils (MPI world
communicator). This is correct for attention-DP OFF where MPI world =
TP group. For attention-DP ON, the MPI world includes multiple DP
groups — a TP-group-scoped communicator is needed to avoid cross-DP
contamination. That is deferred to a follow-up.
Known gap: S2/S3 gather is defined but not yet wired into the ZMQ
resolve handler — rank 0 currently resolves only locally. Integration
requires the ZMQ handler to trigger the MPI broadcast/gather when
tp_size > 1.
Signed-off-by: Adit Ranadive <aranadive@nvidia.com>
Add TP>1 support to the target side of the remote-G2 connector.
Each target TP rank loads the corresponding source TP rank's NIXL
agent and issues its own make_prepped_xfer independently. The
framework's existing allgather intersection in get_finished ensures
a request completes only when ALL ranks' transfers are done.
Target-side changes (T1-T4):
T1/T2: per_rank_descriptors and per_rank_source_metadata flow
through RemoteG2ResolveResult (added in S1-S5 commit) into the
binding record and out to the connector worker. No new fields
on RemoteG2BindingRecord — it holds a resolve_result reference.
T3: _start_transfer_impl in remote_g2_raw_nixl_adapter.py indexes
per_rank_descriptors[mpi_rank()] for source byte offsets and
per_rank_source_metadata[mpi_rank()] for _ensure_peer_loaded.
Falls back to flat descriptors/metadata when per-rank dicts are
empty (TP=1 backward compatibility).
T4: _result_from_dict in remote_g2_target_setup.py extracts
per_rank_descriptors and per_rank_source_metadata from the
resolve response dict. _make_source_metadata_fetcher stashes
per_rank_metadata from the S5 metadata RPC on the response
object so the adapter can read it at transfer time.
maybe_start_remote_g2_target_client now accepts tp_rank and tp_size
kwargs (passed from py_executor.py in the S1-S5 commit).
DP mode note:
With attention-DP OFF, mpi_rank() maps 1:1 to source TP rank
since MPI world = TP group. With attention-DP ON, mpi_rank()
returns the world rank which spans multiple DP groups — a mapping
from world rank to TP-local rank is needed to index the correct
source peer. Deferred to a follow-up alongside the TP-group-scoped
communicator change on the source side.
Signed-off-by: Adit Ranadive <aranadive@nvidia.com>
Wire _gather_per_rank_descriptors into ZMQ resolve handler when tp_size>1. Attach per_rank_descriptors and per_rank_source_metadata to resolve response. UT1: source-side MPI gather (TP=1 single entry, TP=2 mock allgather, b64). UT2: per-rank serde round-trip, TP=1 backward compat, binding store flow. Run: python3 tests/unittest/_torch/test_remote_g2_tp_multi.py Signed-off-by: Adit Ranadive <aranadive@nvidia.com>
When py_executor.py doesn't pass tp_rank/tp_size (e.g. patched connectors deployed without patching py_executor), fall back to mpi_rank() and mpi_world_size() at the top of maybe_start_remote_g2_service. This ensures TP>1 agent names get rank-qualified (-tp0, -tp1) and S1 gather runs even when the caller doesn't have the TP>1 py_executor changes. Signed-off-by: Adit Ranadive <aranadive@nvidia.com>
With TP=2, dynamo(PID 1) -> orted -> engine. The ppid<=1 check bailed at PID 1 without checking its cmdline. Changed to ppid<1 so PID 1 gets checked. Also added dynamo/trtllm pattern match. Signed-off-by: Adit Ranadive <aranadive@nvidia.com>
…framework) Signed-off-by: Adit Ranadive <aranadive@nvidia.com>
…ata stream port conflict With TP>1, multiple NIXL agents per pod (2 source + 2 target + dynamo's own agent) all try to bind the metadata stream listener on the same default port. The first succeeds, the rest get 'Address already in use', then continuously spam 'Cannot accept client connection: Socket operation on non-socket' — 32K+ errors that corrupt the TCP transport layer and cause dynamo RPC resolve timeouts. Fix: set enable_listen_thread=False on both source and target agents. We exchange metadata explicitly via add_remote_agent (through the dynamo RPC bridge), not via NIXL's auto-discovery metadata stream. The prepped-flow RDMA READ should work without the listen thread. Signed-off-by: Adit Ranadive <aranadive@nvidia.com>
…n up debug logging The per-rank NIXL metadata (agent_metadata_b64) carried in the resolve response was not being decoded to raw bytes before _ensure_peer_loaded called add_remote_agent, causing "metadata missing from source response" errors on the target side. Add the b64decode step in _result_from_dict, matching the existing conversion in the metadata RPC path. Also remove all temporary PROBE diagnostic logging added during TP>1 development. Signed-off-by: Adit Ranadive <aranadive@nvidia.com>
4eed564 to
7c1aa09
Compare
| ) | ||
| except (TypeError, ValueError): | ||
| return None | ||
| # PROBE: confirm the target worker received the plan and parsed the |
Owner
There was a problem hiding this comment.
is it ok, if we keep it for now?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Why
TP>1 deployments need all ranks to participate in cross-worker KV cache reuse, but each TP rank owns an independent slice of the KV cache and its own NIXL agent. The original MPI-based gather design collided with the connector framework's own mpi_broadcast in _run_on_leader, and dynamo's client.direct() response stream drops responses under TP>1 — both had to be worked around.
What
At startup, all TP ranks exchange NIXL metadata via mpi_allgather (S1). On resolve, rank 0 fans out resolve_hashes to sibling ranks over intra-pod ZMQ IPC, gathers per-rank descriptors and source metadata, and returns a per_rank_descriptors + per_rank_source_metadata response. Each target TP rank indexes by mpi_rank() to load the correct source peer and issue its own parallel NIXL transfer. On release, rank 0 fans out release_hashes to siblings so pinned blocks on non-rank-0 source ranks are properly freed (without this, the host-pinned secondary pool would leak under sustained TP>1 traffic). The C++ layer adds findAndPinSecondaryBlockByHash for atomic hash-keyed block lookup with pin, pinBlocksById/unpinBlocksById symmetry, and secondary pool exposure via BaseKVCacheManager.
How
ZMQ IPC replaces MPI for all intra-pod communication — each TP rank binds a per-rank REP socket (_tp{rank}.sock), and rank 0 queries siblings via fresh REQ sockets. A sibling_pin_tracker dict on each sibling maps block_hash → [block_id] to track pins from resolve and drain them on release. New files: remote_g2_source_setup.py (registry bootstrap, ZMQ REP, per-rank fan-out), remote_g2_target_setup.py (target IPC bridge, b64 decode, metadata RPC), remote_g2_raw_nixl_adapter.py (rank-aware NIXL transfers with [NIXL-XFER] lifecycle logging), remote_g2_source_adapter.py (pin callbacks). Unit tests cover MPI gather assembly (UT1), per-rank data flow round-trip (UT2), and pin/unpin callbacks.