Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 54 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,40 @@ slow or metered links, `--layers 20:42` is also supported: the coordinator will
load the output head and compute logits locally, trading extra coordinator work
for smaller per-token replies.

Reverse topology is also supported when the coordinator owns a final suffix
through the output head. In that layout workers cover the lower layers,
return hidden state upstream, and the coordinator runs the higher layers plus
output locally. For example:

```sh
# Machine A: worker, owns lower layers.
./ds4 \
-m gguf/DeepSeek-V4-Pro-Q4K-Layers00-30.gguf \
--role worker \
--layers 0:30 \
--coordinator 169.254.43.68 1234

# Machine B: coordinator, owns the upper suffix and output.
./ds4 \
-m gguf/DeepSeek-V4-Pro-Q4K-Layers-31-output.gguf \
--role coordinator \
--layers 31:output \
--listen 169.254.43.68 1234
```

Reverse `K:42` is intentionally unsupported. Reverse mode only supports
`K:output`, because the coordinator must own the output head.

You can also start the coordinator owning `K:output` layers with `--local-decode`.
In that mode the route still does distributed prefill, but after prefill the worker
pushes its KV shard to the coordinator, the coordinator finishes generation locally
using full model residency. This hand-off keeps the distributed prefill speedup while
moving decode back onto one machine and gaining decode speed.

For example, using M5 Max 128GB as the coordinator with `--layers 22:output --local-decode`
and running DGX Spark as a worker with `--layers 0:21` over 2.5GbE direct link provides
`prefill: 602.78 t/s, generation: 30.10 t/s`.

### Network Link Comparison

The table below shows the same two M5 Max hosts, the same 91 GB Flash quant,
Expand Down Expand Up @@ -468,21 +502,26 @@ control TCP connection open to the coordinator and send a `HELLO` with their
model ID, model family, quant profile, layer slice, context capacity, and data
port. The coordinator uses these registrations to build a route that covers all
layers. Work then moves over low-latency TCP data connections: the coordinator
computes the first slice, sends a `WORK` frame with session ID, token positions,
rolling token-prefix hashes before and after the span, route information, and
hidden-state payload, and each worker computes its slice. Middle workers can
forward directly to the next worker. The final worker returns logits to the
coordinator, or ACKs for non-final prefill chunks so the prefill pipeline can
stay full. `RESULT` frames echo the request ID and the post-span hash. A worker
status error is handled differently from a socket failure: KV/hash mismatch can
be recovered by replaying the token history on the same route, while transport
failure drops the route and waits for a replacement worker. For persistent KV,
the coordinator opens worker data connections and sends snapshot save/load
messages for each worker-owned layer range; the disk payload remains a single
agent/server cache file. The protocol has no
encryption or authentication, and is not release-stable yet; coordinator and
workers should be built from the same commit and used on trusted machines and
trusted networks.
computes the local prefix first in forward topology, sends a `WORK` frame with
session ID, token positions, rolling token-prefix hashes before and after the
span, route information, and hidden-state payload, and each worker computes its
slice. In reverse topology the first worker starts from layer 0 with token
input only, returns hidden state upstream, and the coordinator finishes the
higher layers plus output locally. Middle workers can forward directly to the
next worker. The final worker returns logits in the usual forward path, or
returns hidden state when the coordinator owns the output path. Forward
non-final prefill chunks may use ACK-only replies so the prefill pipeline can
stay full; reverse pipelined prefill returns hidden state for every chunk
because the coordinator must finish each chunk locally. `RESULT` frames echo
the request ID and the post-span hash. A worker status error is handled
differently from a socket failure: KV/hash mismatch can be recovered by
replaying the token history on the same route, while transport failure drops
the route and waits for a replacement worker. For persistent KV, the
coordinator opens worker data connections and sends snapshot save/load messages
for each worker-owned layer range; the disk payload remains a single
agent/server cache file. The protocol has no encryption or authentication, and
is not release-stable yet; coordinator and workers should be built from the
same commit and used on trusted machines and trusted networks.

## Reducing heat, power usage and fan noise

Expand Down
30 changes: 29 additions & 1 deletion ds4.c
Original file line number Diff line number Diff line change
Expand Up @@ -25554,6 +25554,15 @@ int ds4_engine_open(ds4_engine **out, const ds4_engine_options *opt) {
e->distributed = opt->distributed;
e->power_percent = opt->power_percent > 0 ? opt->power_percent : 100;
e->prefill_chunk = opt->prefill_chunk;
const bool distributed_reverse_coordinator =
e->distributed.role == DS4_DISTRIBUTED_COORDINATOR &&
e->distributed.layers.set &&
e->distributed.layers.has_output &&
e->distributed.layers.start > 0u;
if (e->prefill_chunk == 0 && distributed_reverse_coordinator) {
e->prefill_chunk = 2048u;
e->distributed.prefill_chunk = 2048u;
}
e->ssd_streaming_cache_experts = opt->ssd_streaming_cache_experts;
e->ssd_streaming_cache_bytes = opt->ssd_streaming_cache_bytes;
e->ssd_streaming_preload_experts = opt->ssd_streaming_preload_experts;
Expand Down Expand Up @@ -25590,8 +25599,15 @@ int ds4_engine_open(ds4_engine **out, const ds4_engine_options *opt) {
uint32_t load_layer_end = opt->load_layer_end;
bool load_output = opt->load_output;
bool load_output_optional = false;
const bool distributed_reverse_coordinator_full_resident =
opt->distributed.role == DS4_DISTRIBUTED_COORDINATOR &&
opt->distributed.local_decode &&
opt->distributed.layers.set &&
opt->distributed.layers.has_output &&
opt->distributed.layers.start > 0u;
if (opt->distributed.role != DS4_DISTRIBUTED_NONE &&
opt->distributed.layers.set)
opt->distributed.layers.set &&
!distributed_reverse_coordinator_full_resident)
{
load_slice = true;
load_layer_start = opt->distributed.layers.start;
Expand Down Expand Up @@ -27153,6 +27169,18 @@ static int ds4_session_eval_internal(ds4_session *s, int token, bool probe_mtp,
#endif
}

int ds4_session_eval_local_only(ds4_session *s, int token, char *err, size_t errlen) {
if (!s) {
if (errlen) snprintf(err, errlen, "invalid session");
return 1;
}
ds4_dist_session *saved = s->distributed;
s->distributed = NULL;
const int rc = ds4_session_eval_internal(s, token, true, err, errlen);
s->distributed = saved;
return rc;
}

int ds4_session_eval(ds4_session *s, int token, char *err, size_t errlen) {
return ds4_session_eval_internal(s, token, true, err, errlen);
}
Expand Down
6 changes: 6 additions & 0 deletions ds4.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ typedef struct {
uint32_t prefill_chunk;
uint32_t prefill_window;
uint32_t activation_bits;
bool local_decode;
bool replay_check;
bool debug;
} ds4_distributed_options;
Expand Down Expand Up @@ -261,6 +262,11 @@ int ds4_session_top_logprobs(ds4_session *s, ds4_token_score *out, int k);
int ds4_session_token_logprob(ds4_session *s, int token, ds4_token_score *out);
int ds4_session_copy_logits(ds4_session *s, float *out, int cap);
int ds4_session_set_logits(ds4_session *s, const float *logits, int n);
/* Internal runtime helper: run the normal local eval path even if the session
* still carries a distributed coordinator attachment. Frontends should keep
* using ds4_session_eval(); ds4_distributed.c uses this helper once
* coordinator-owned reverse-topology local decode is active. */
int ds4_session_eval_local_only(ds4_session *s, int token, char *err, size_t errlen);
int ds4_session_eval(ds4_session *s, int token, char *err, size_t errlen);
int ds4_session_eval_speculative_argmax(ds4_session *s, int first_token,
int max_tokens, int eos_token,
Expand Down
Loading