Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/defs.limits.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@
#define HALF_MAX_RANK (MAX_RANK / 2)
#define LOD_MAX_NDIM HALF_MAX_RANK
#define LOD_MAX_LEVELS 32

// Generations of per-fc LOD timing events. The delivery worker reads a
// drained batch's timing while the producer re-records the next same-fc
// batch's events; at the worst case three batches are live at once (one
// draining, one kicked-and-pending, one filling), so each owns its own
// generation for its whole lifetime (#154).
#define LOD_TIMING_SLOTS 3
#define MAX_ZARR_RANK (HALF_MAX_RANK)

// S3
Expand Down
1 change: 1 addition & 0 deletions src/gpu/flush.compress_agg.c
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,7 @@ compress_agg_fill_handoff(struct compress_agg_stage* stage,
const uint8_t nlod = stage->ar.nlod;
out->fc = fc;
out->n_epochs = in->n_epochs;
out->lod_timing_slot = in->lod_timing_slot;
out->active_levels_mask = in->active_levels_mask;
out->batch_active_masks = in->batch_active_masks;
out->nlod = nlod;
Expand Down
2 changes: 1 addition & 1 deletion src/gpu/flush.d2h_deliver.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ record_flush_metrics(const struct flush_handoff* handoff,
const int fc = handoff->fc;
const uint32_t n_epochs = handoff->n_epochs;

const struct lod_timing* t = &lod_shared->timing[fc];
const struct lod_timing* t = &lod_shared->timing[handoff->lod_timing_slot];
if (levels->enable_multiscale && t->t_start) {
const size_t bytes_per_element = dtype_bpe(config->dtype);
const size_t scatter_bytes = layout->epoch_elements * bytes_per_element;
Expand Down
1 change: 1 addition & 0 deletions src/gpu/flush.handoff.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ struct flush_handoff
const uint32_t* batch_active_masks; // borrowed [K] per-epoch masks
uint32_t per_lod_n_active[LOD_MAX_LEVELS]; // owned, for delivery sizing
uint8_t nlod;
int lod_timing_slot; // LOD timing generation this batch owns (#154)

CUevent t_aggregate_end; // D2H waits on this
CUevent t_compress_start; // for metrics
Expand Down
11 changes: 11 additions & 0 deletions src/gpu/schedule.c
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,7 @@ make_compress_input(struct stream_engine* e, int fc, uint32_t n_epochs)
.active_levels_mask = s->active_levels_mask,
.batch_active_masks = s->batch_active_masks,
.epochs_per_batch = e->sched.epochs_per_batch,
.lod_timing_slot = s->lod_timing_slot,
};
}

Expand All @@ -470,6 +471,15 @@ run_epoch_lod(struct stream_engine* e, struct stream_context* ctx)
struct schedule_slot* s = &e->sched.slot[e->sched.fill];
uint32_t active_mask;

// The batch owns one timing generation for its whole lifetime; pick a fresh
// one at the first epoch so the worker's drain read can't collide with the
// next same-fc batch's re-record (#154).
if (e->sched.accumulated == 0) {
s->lod_timing_slot = e->lod_shared.next_timing_slot;
e->lod_shared.next_timing_slot =
(e->lod_shared.next_timing_slot + 1) % LOD_TIMING_SLOTS;
}

if (!e->sched.lod_active) {
active_mask = 1;
} else {
Expand All @@ -478,6 +488,7 @@ run_epoch_lod(struct stream_engine* e, struct stream_context* ctx)
&e->lod_shared,
&e->ord,
e->sched.fill,
s->lod_timing_slot,
&ctx->levels,
stream_engine_pool_epoch(e, ctx, e->sched.accumulated),
ctx->config.dtype,
Expand Down
1 change: 1 addition & 0 deletions src/gpu/schedule.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ struct schedule_slot
{
uint32_t active_levels_mask; // union of per-epoch active masks
uint32_t* batch_active_masks; // [epochs_per_batch]; per-array allocation
int lod_timing_slot; // generation owned by the batch being filled
int kicked;
uint64_t kick_seq;
struct flush_handoff handoff;
Expand Down
18 changes: 15 additions & 3 deletions src/gpu/stream.engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,20 @@ struct lod_timing
// owned by the engine and sized for that one array.
struct lod_shared_state
{
CUdeviceptr d_linear; // linear epoch buffer (device)
CUdeviceptr d_morton; // morton-ordered LOD output (all levels packed)
struct lod_timing timing[2]; // double-buffered pipeline timing
CUdeviceptr d_linear; // linear epoch buffer (device)
CUdeviceptr d_morton; // morton-ordered LOD output (all levels packed)

// Per-fc ordering edge (GPU_EDGE_LOD_DONE). Stable per fc — recorded every
// epoch, never rotates — so the bound edge handle stays valid for waits.
CUevent lod_done[2];

// Pipeline timing read at drain. Rotated across batches so a batch's
// generation stays stable while the worker reads it and the producer
// re-records the next same-fc batch into a different one (#154). Each
// batch picks a generation at its first epoch and owns it through drain;
// the chosen index rides the flush handoff.
struct lod_timing timing[LOD_TIMING_SLOTS];
int next_timing_slot; // producer-only round-robin picker
};

// Per-array LOD state. In multiarray, one instance per array; the active
Expand Down Expand Up @@ -109,6 +120,7 @@ struct compress_agg_input
uint32_t active_levels_mask;
const uint32_t* batch_active_masks; // borrowed from schedule_slot [K]
uint32_t epochs_per_batch;
int lod_timing_slot; // the batch's owned LOD timing generation (#154)
};

// Per-shard layout tables shared across arrays, sized to the max
Expand Down
4 changes: 2 additions & 2 deletions src/gpu/stream.init.c
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,10 @@ stream_engine_init(struct stream_engine* e,
lim->lod_linear_bytes,
lim->lod_morton_bytes,
e->streams.compute) == 0);
// t_end doubles as GPU_EDGE_LOD_DONE; already seeded by the init above.
// lod_done is the stable per-fc ordering edge; already seeded above.
for (int fc = 0; fc < 2; ++fc)
gpu_ordering_bind(
&e->ord, GPU_EDGE_LOD_DONE, fc, e->lod_shared.timing[fc].t_end);
&e->ord, GPU_EDGE_LOD_DONE, fc, e->lod_shared.lod_done[fc]);
}

CU(Fail, cuStreamSynchronize(e->streams.compute));
Expand Down
51 changes: 32 additions & 19 deletions src/gpu/stream.lod.c
Original file line number Diff line number Diff line change
Expand Up @@ -394,18 +394,26 @@ lod_shared_state_init(struct lod_shared_state* sh,
CU(Fail, cuMemAlloc(&sh->d_linear, linear_bytes));
CU(Fail, cuMemAlloc(&sh->d_morton, morton_bytes));

sh->next_timing_slot = 0;

// Ordering-edge events, one per fc (GPU_EDGE_LOD_DONE binds these).
for (int fc = 0; fc < 2; ++fc) {
CU(Fail, cuEventCreate(&sh->timing[fc].t_start, CU_EVENT_DEFAULT));
CU(Fail, cuEventCreate(&sh->timing[fc].t_scatter_end, CU_EVENT_DEFAULT));
CU(Fail, cuEventCreate(&sh->timing[fc].t_reduce_end, CU_EVENT_DEFAULT));
CU(Fail, cuEventCreate(&sh->timing[fc].t_append_end, CU_EVENT_DEFAULT));
CU(Fail, cuEventCreate(&sh->timing[fc].t_end, CU_EVENT_DEFAULT));
// Seed events so initial waits complete immediately.
CU(Fail, cuEventRecord(sh->timing[fc].t_start, seed_stream));
CU(Fail, cuEventRecord(sh->timing[fc].t_scatter_end, seed_stream));
CU(Fail, cuEventRecord(sh->timing[fc].t_reduce_end, seed_stream));
CU(Fail, cuEventRecord(sh->timing[fc].t_append_end, seed_stream));
CU(Fail, cuEventRecord(sh->timing[fc].t_end, seed_stream));
CU(Fail, cuEventCreate(&sh->lod_done[fc], CU_EVENT_DEFAULT));
CU(Fail, cuEventRecord(sh->lod_done[fc], seed_stream));
}

// Timing generations. Seed so initial metric reads see a valid interval.
for (int g = 0; g < LOD_TIMING_SLOTS; ++g) {
CU(Fail, cuEventCreate(&sh->timing[g].t_start, CU_EVENT_DEFAULT));
CU(Fail, cuEventCreate(&sh->timing[g].t_scatter_end, CU_EVENT_DEFAULT));
CU(Fail, cuEventCreate(&sh->timing[g].t_reduce_end, CU_EVENT_DEFAULT));
CU(Fail, cuEventCreate(&sh->timing[g].t_append_end, CU_EVENT_DEFAULT));
CU(Fail, cuEventCreate(&sh->timing[g].t_end, CU_EVENT_DEFAULT));
CU(Fail, cuEventRecord(sh->timing[g].t_start, seed_stream));
CU(Fail, cuEventRecord(sh->timing[g].t_scatter_end, seed_stream));
CU(Fail, cuEventRecord(sh->timing[g].t_reduce_end, seed_stream));
CU(Fail, cuEventRecord(sh->timing[g].t_append_end, seed_stream));
CU(Fail, cuEventRecord(sh->timing[g].t_end, seed_stream));
}

return 0;
Expand All @@ -424,12 +432,14 @@ lod_shared_state_destroy(struct lod_shared_state* sh)
// Null-check each event individually: a partial init (e.g., cleanup after
// lod_shared_state_init fails halfway through event creation) leaves some
// handles zero, and cuEventDestroy on NULL returns an error.
for (int fc = 0; fc < 2; ++fc) {
cu_event_destroy(sh->timing[fc].t_start);
cu_event_destroy(sh->timing[fc].t_scatter_end);
cu_event_destroy(sh->timing[fc].t_reduce_end);
cu_event_destroy(sh->timing[fc].t_append_end);
cu_event_destroy(sh->timing[fc].t_end);
for (int fc = 0; fc < 2; ++fc)
cu_event_destroy(sh->lod_done[fc]);
for (int g = 0; g < LOD_TIMING_SLOTS; ++g) {
cu_event_destroy(sh->timing[g].t_start);
cu_event_destroy(sh->timing[g].t_scatter_end);
cu_event_destroy(sh->timing[g].t_reduce_end);
cu_event_destroy(sh->timing[g].t_append_end);
cu_event_destroy(sh->timing[g].t_end);
}
memset(sh, 0, sizeof(*sh));
}
Expand Down Expand Up @@ -731,6 +741,7 @@ lod_run_epoch(struct lod_state* lod,
struct lod_shared_state* sh,
struct gpu_ordering* ord,
int fc,
int timing_slot,
const struct level_geometry* levels,
struct gpu_pool_view pool_epoch,
enum dtype dtype,
Expand All @@ -741,7 +752,7 @@ lod_run_epoch(struct lod_state* lod,
uint32_t* out_active_mask)
{
struct lod_plan* p = &lod->plan;
struct lod_timing* t = &sh->timing[fc];
struct lod_timing* t = &sh->timing[timing_slot];

CU(Error, cuEventRecord(t->t_start, compute));

Expand Down Expand Up @@ -804,7 +815,9 @@ lod_run_epoch(struct lod_state* lod,
lod, sh, levels, pool_epoch, dtype, active_levels_mask, compute) ==
0);

// t_end doubles as GPU_EDGE_LOD_DONE (bound at engine init).
// Timing endpoint for the batch's generation; the ordering edge records
// separately on the stable per-fc sh->lod_done[fc] (bound at engine init).
CU(Error, cuEventRecord(t->t_end, compute));
CHECK(Error, gpu_edge_record(ord, GPU_EDGE_LOD_DONE, fc, compute) == 0);

*out_active_mask = active_levels_mask;
Expand Down
16 changes: 10 additions & 6 deletions src/gpu/stream.lod.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ lod_state_init(struct lod_state* lod,
// linear_bytes / morton_bytes are the buffer sizes (multiarray passes the max
// across arrays; single-array passes the array's own sizes).
//
// The timing events are created AND seeded on `seed_stream` so the first
// downstream wait completes immediately. `seed_stream` must be the same
// compute stream that later runs lod_run_epoch — seeding on an unrelated
// stream leaves the initial waits unsatisfied.
// The lod_done ordering events and the timing generations are created AND
// seeded on `seed_stream` so the first downstream wait (and metric read)
// completes immediately. `seed_stream` must be the same compute stream that
// later runs lod_run_epoch — seeding on an unrelated stream leaves the
// initial waits unsatisfied.
//
// Returns 0 on success; on failure, the struct is left safe to pass to
// lod_shared_state_destroy.
Expand Down Expand Up @@ -55,13 +56,16 @@ lod_state_device_bytes(const struct computed_stream_layouts* cl,

// Run LOD pipeline for one epoch: gather -> reduce -> append fold ->
// morton-to-chunks. pool_epoch: acquired view of this epoch's chunk pool
// region (all levels). *out_active_mask: set to bitmask of active LOD levels
// for this epoch. Returns 0 on success, non-zero on error.
// region (all levels). timing_slot: the batch's owned timing generation
// (sh->timing[timing_slot]); the ordering edge records on the stable
// per-fc sh->lod_done[fc]. *out_active_mask: set to bitmask of active LOD
// levels for this epoch. Returns 0 on success, non-zero on error.
int
lod_run_epoch(struct lod_state* lod,
struct lod_shared_state* sh,
struct gpu_ordering* ord,
int fc,
int timing_slot,
const struct level_geometry* levels,
struct gpu_pool_view pool_epoch,
enum dtype dtype,
Expand Down
Loading