Skip to content

Add TP=2 support for KVP2P#5

Open
aranadive wants to merge 9 commits into
oandreeva-nv:oandreeva/remote-g2-beta-registryfrom
aranadive:aranadive/kv_p2p_multi_tp_support
Open

Add TP=2 support for KVP2P#5
aranadive wants to merge 9 commits into
oandreeva-nv:oandreeva/remote-g2-beta-registryfrom
aranadive:aranadive/kv_p2p_multi_tp_support

Conversation

@aranadive

Copy link
Copy Markdown

Why

TP>1 deployments need all ranks to participate in cross-worker KV cache reuse, but each TP rank owns an independent slice of the KV cache and its own NIXL agent. The original MPI-based gather design collided with the connector framework's own mpi_broadcast in _run_on_leader, and dynamo's client.direct() response stream drops responses under TP>1 — both had to be worked around.

What

At startup, all TP ranks exchange NIXL metadata via mpi_allgather (S1). On resolve, rank 0 fans out resolve_hashes to sibling ranks over intra-pod ZMQ IPC, gathers per-rank descriptors and source metadata, and returns a per_rank_descriptors + per_rank_source_metadata response. Each target TP rank indexes by mpi_rank() to load the correct source peer and issue its own parallel NIXL transfer. On release, rank 0 fans out release_hashes to siblings so pinned blocks on non-rank-0 source ranks are properly freed (without this, the host-pinned secondary pool would leak under sustained TP>1 traffic). The C++ layer adds findAndPinSecondaryBlockByHash for atomic hash-keyed block lookup with pin, pinBlocksById/unpinBlocksById symmetry, and secondary pool exposure via BaseKVCacheManager.

How

ZMQ IPC replaces MPI for all intra-pod communication — each TP rank binds a per-rank REP socket (_tp{rank}.sock), and rank 0 queries siblings via fresh REQ sockets. A sibling_pin_tracker dict on each sibling maps block_hash → [block_id] to track pins from resolve and drain them on release. New files: remote_g2_source_setup.py (registry bootstrap, ZMQ REP, per-rank fan-out), remote_g2_target_setup.py (target IPC bridge, b64 decode, metadata RPC), remote_g2_raw_nixl_adapter.py (rank-aware NIXL transfers with [NIXL-XFER] lifecycle logging), remote_g2_source_adapter.py (pin callbacks). Unit tests cover MPI gather assembly (UT1), per-rank data flow round-trip (UT2), and pin/unpin callbacks.

aranadive added 9 commits June 8, 2026 23:19
- Fix ppid <= 1 to ppid < 1 for containers where dynamo is PID 1
- Add TP=1 inline mode detection in proc-tree walk
- Add _get_secondary_pool() shim for builds without
  get_unique_secondary_pool
- Unwrap kv.impl before feature-detecting C++ methods on the KV cache
  manager
- Add pin_blocks_by_id fallback for builds without
  get_slot_idx_by_block_id

Signed-off-by: Adit Ranadive <aranadive@nvidia.com>
… model (S1-S5)

Add TP>1 support to the source side of the remote-G2 KV cache connector.
With tensor parallelism, each TP rank holds its own slice of every KV
block in a separate host-pinned secondary pool with its own NIXL agent.
The target needs per-rank metadata and descriptors to issue parallel
RDMA READs to each source rank independently.

Source-side changes (S1-S5):
  S1: _gather_per_rank_nixl_metadata() — at startup, every rank builds
      its own NIXL agent, then all ranks participate in an MPI allgather
      to collect (agent_name, agent_desc, pool_base_ptr, pool_size_bytes)
      from all TP siblings. Rank 0 caches the bundle.
  S2: _gather_per_rank_descriptors() — on resolve RPC, rank 0 broadcasts
      block hashes to TP siblings via MPI, each calls local
      findAndPinSecondaryBlockByHash, rank 0 gathers offsets.
  S3: _start_sibling_rank_resolve_loop() — non-rank-0 TP siblings run
      a daemon thread that blocks on MPI broadcast, does local resolve,
      and participates in the MPI allgather back to rank 0.
  S4: _result_to_dict serializes per_rank_descriptors and
      per_rank_source_metadata alongside flat descriptors (backward-
      compatible — TP=1 omits the per-rank fields).
  S5: get_metadata RPC returns per_rank_metadata list from the S1 bundle.

Data model changes:
  RemoteG2ResolveResult gains per_rank_descriptors (dict[int, ...]) and
  per_rank_source_metadata (dict[int, dict]) fields. Both default to
  empty dicts for TP=1 backward compatibility.

Orchestration:
  py_executor.py passes tp_rank and tp_size from self.dist to both
  maybe_start_remote_g2_service and maybe_start_remote_g2_target_client.
  NIXL agent names are rank-qualified for TP>1:
    remote-g2-source-{worker_id}-tp{rank}

DP mode note:
  Uses mpi_allgather/mpi_broadcast from tensorrt_llm._utils (MPI world
  communicator). This is correct for attention-DP OFF where MPI world =
  TP group. For attention-DP ON, the MPI world includes multiple DP
  groups — a TP-group-scoped communicator is needed to avoid cross-DP
  contamination. That is deferred to a follow-up.

Known gap: S2/S3 gather is defined but not yet wired into the ZMQ
resolve handler — rank 0 currently resolves only locally. Integration
requires the ZMQ handler to trigger the MPI broadcast/gather when
tp_size > 1.

Signed-off-by: Adit Ranadive <aranadive@nvidia.com>
Add TP>1 support to the target side of the remote-G2 connector.
Each target TP rank loads the corresponding source TP rank's NIXL
agent and issues its own make_prepped_xfer independently. The
framework's existing allgather intersection in get_finished ensures
a request completes only when ALL ranks' transfers are done.

Target-side changes (T1-T4):
  T1/T2: per_rank_descriptors and per_rank_source_metadata flow
      through RemoteG2ResolveResult (added in S1-S5 commit) into the
      binding record and out to the connector worker. No new fields
      on RemoteG2BindingRecord — it holds a resolve_result reference.
  T3: _start_transfer_impl in remote_g2_raw_nixl_adapter.py indexes
      per_rank_descriptors[mpi_rank()] for source byte offsets and
      per_rank_source_metadata[mpi_rank()] for _ensure_peer_loaded.
      Falls back to flat descriptors/metadata when per-rank dicts are
      empty (TP=1 backward compatibility).
  T4: _result_from_dict in remote_g2_target_setup.py extracts
      per_rank_descriptors and per_rank_source_metadata from the
      resolve response dict. _make_source_metadata_fetcher stashes
      per_rank_metadata from the S5 metadata RPC on the response
      object so the adapter can read it at transfer time.

maybe_start_remote_g2_target_client now accepts tp_rank and tp_size
kwargs (passed from py_executor.py in the S1-S5 commit).

DP mode note:
  With attention-DP OFF, mpi_rank() maps 1:1 to source TP rank
  since MPI world = TP group. With attention-DP ON, mpi_rank()
  returns the world rank which spans multiple DP groups — a mapping
  from world rank to TP-local rank is needed to index the correct
  source peer. Deferred to a follow-up alongside the TP-group-scoped
  communicator change on the source side.

Signed-off-by: Adit Ranadive <aranadive@nvidia.com>
Wire _gather_per_rank_descriptors into ZMQ resolve handler when tp_size>1.
Attach per_rank_descriptors and per_rank_source_metadata to resolve response.

UT1: source-side MPI gather (TP=1 single entry, TP=2 mock allgather, b64).
UT2: per-rank serde round-trip, TP=1 backward compat, binding store flow.

Run: python3 tests/unittest/_torch/test_remote_g2_tp_multi.py
Signed-off-by: Adit Ranadive <aranadive@nvidia.com>
When py_executor.py doesn't pass tp_rank/tp_size (e.g. patched
connectors deployed without patching py_executor), fall back to
mpi_rank() and mpi_world_size() at the top of
maybe_start_remote_g2_service. This ensures TP>1 agent names get
rank-qualified (-tp0, -tp1) and S1 gather runs even when the
caller doesn't have the TP>1 py_executor changes.

Signed-off-by: Adit Ranadive <aranadive@nvidia.com>
With TP=2, dynamo(PID 1) -> orted -> engine. The ppid<=1 check bailed
at PID 1 without checking its cmdline. Changed to ppid<1 so PID 1 gets
checked. Also added dynamo/trtllm pattern match.

Signed-off-by: Adit Ranadive <aranadive@nvidia.com>
…framework)

Signed-off-by: Adit Ranadive <aranadive@nvidia.com>
…ata stream port conflict

With TP>1, multiple NIXL agents per pod (2 source + 2 target + dynamo's
own agent) all try to bind the metadata stream listener on the same
default port. The first succeeds, the rest get 'Address already in use',
then continuously spam 'Cannot accept client connection: Socket
operation on non-socket' — 32K+ errors that corrupt the TCP transport
layer and cause dynamo RPC resolve timeouts.

Fix: set enable_listen_thread=False on both source and target agents.
We exchange metadata explicitly via add_remote_agent (through the
dynamo RPC bridge), not via NIXL's auto-discovery metadata stream.
The prepped-flow RDMA READ should work without the listen thread.

Signed-off-by: Adit Ranadive <aranadive@nvidia.com>
…n up debug logging

The per-rank NIXL metadata (agent_metadata_b64) carried in the resolve
response was not being decoded to raw bytes before _ensure_peer_loaded
called add_remote_agent, causing "metadata missing from source response"
errors on the target side. Add the b64decode step in _result_from_dict,
matching the existing conversion in the metadata RPC path.

Also remove all temporary PROBE diagnostic logging added during TP>1
development.

Signed-off-by: Adit Ranadive <aranadive@nvidia.com>
@aranadive aranadive force-pushed the aranadive/kv_p2p_multi_tp_support branch from 4eed564 to 7c1aa09 Compare June 9, 2026 07:00
)
except (TypeError, ValueError):
return None
# PROBE: confirm the target worker received the plan and parsed the

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it ok, if we keep it for now?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok will keep it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants