From 1ae2b6268c7120eb65cc47ba7cd8f7a4ea5c018e Mon Sep 17 00:00:00 2001 From: Nathan Clack Date: Thu, 18 Jun 2026 02:08:36 +0000 Subject: [PATCH] gpu: snapshot per-batch LOD timing #154 --- src/defs.limits.h | 7 +++++ src/gpu/flush.compress_agg.c | 1 + src/gpu/flush.d2h_deliver.c | 2 +- src/gpu/flush.handoff.h | 1 + src/gpu/schedule.c | 11 ++++++++ src/gpu/schedule.h | 1 + src/gpu/stream.engine.h | 18 ++++++++++--- src/gpu/stream.init.c | 4 +-- src/gpu/stream.lod.c | 51 ++++++++++++++++++++++-------------- src/gpu/stream.lod.h | 16 ++++++----- 10 files changed, 81 insertions(+), 31 deletions(-) diff --git a/src/defs.limits.h b/src/defs.limits.h index ff1ae83b..3c7257a9 100644 --- a/src/defs.limits.h +++ b/src/defs.limits.h @@ -6,6 +6,13 @@ #define HALF_MAX_RANK (MAX_RANK / 2) #define LOD_MAX_NDIM HALF_MAX_RANK #define LOD_MAX_LEVELS 32 + +// Generations of per-fc LOD timing events. The delivery worker reads a +// drained batch's timing while the producer re-records the next same-fc +// batch's events; at the worst case three batches are live at once (one +// draining, one kicked-and-pending, one filling), so each owns its own +// generation for its whole lifetime (#154). +#define LOD_TIMING_SLOTS 3 #define MAX_ZARR_RANK (HALF_MAX_RANK) // S3 diff --git a/src/gpu/flush.compress_agg.c b/src/gpu/flush.compress_agg.c index 55e9b738..744dac0b 100644 --- a/src/gpu/flush.compress_agg.c +++ b/src/gpu/flush.compress_agg.c @@ -709,6 +709,7 @@ compress_agg_fill_handoff(struct compress_agg_stage* stage, const uint8_t nlod = stage->ar.nlod; out->fc = fc; out->n_epochs = in->n_epochs; + out->lod_timing_slot = in->lod_timing_slot; out->active_levels_mask = in->active_levels_mask; out->batch_active_masks = in->batch_active_masks; out->nlod = nlod; diff --git a/src/gpu/flush.d2h_deliver.c b/src/gpu/flush.d2h_deliver.c index e01479e3..ad0956e5 100644 --- a/src/gpu/flush.d2h_deliver.c +++ b/src/gpu/flush.d2h_deliver.c @@ -72,7 +72,7 @@ record_flush_metrics(const struct flush_handoff* handoff, const int fc = handoff->fc; const uint32_t n_epochs = handoff->n_epochs; - const struct lod_timing* t = &lod_shared->timing[fc]; + const struct lod_timing* t = &lod_shared->timing[handoff->lod_timing_slot]; if (levels->enable_multiscale && t->t_start) { const size_t bytes_per_element = dtype_bpe(config->dtype); const size_t scatter_bytes = layout->epoch_elements * bytes_per_element; diff --git a/src/gpu/flush.handoff.h b/src/gpu/flush.handoff.h index 0ac7ae1a..f09276dd 100644 --- a/src/gpu/flush.handoff.h +++ b/src/gpu/flush.handoff.h @@ -30,6 +30,7 @@ struct flush_handoff const uint32_t* batch_active_masks; // borrowed [K] per-epoch masks uint32_t per_lod_n_active[LOD_MAX_LEVELS]; // owned, for delivery sizing uint8_t nlod; + int lod_timing_slot; // LOD timing generation this batch owns (#154) CUevent t_aggregate_end; // D2H waits on this CUevent t_compress_start; // for metrics diff --git a/src/gpu/schedule.c b/src/gpu/schedule.c index de65febe..c2a6d158 100644 --- a/src/gpu/schedule.c +++ b/src/gpu/schedule.c @@ -450,6 +450,7 @@ make_compress_input(struct stream_engine* e, int fc, uint32_t n_epochs) .active_levels_mask = s->active_levels_mask, .batch_active_masks = s->batch_active_masks, .epochs_per_batch = e->sched.epochs_per_batch, + .lod_timing_slot = s->lod_timing_slot, }; } @@ -470,6 +471,15 @@ run_epoch_lod(struct stream_engine* e, struct stream_context* ctx) struct schedule_slot* s = &e->sched.slot[e->sched.fill]; uint32_t active_mask; + // The batch owns one timing generation for its whole lifetime; pick a fresh + // one at the first epoch so the worker's drain read can't collide with the + // next same-fc batch's re-record (#154). + if (e->sched.accumulated == 0) { + s->lod_timing_slot = e->lod_shared.next_timing_slot; + e->lod_shared.next_timing_slot = + (e->lod_shared.next_timing_slot + 1) % LOD_TIMING_SLOTS; + } + if (!e->sched.lod_active) { active_mask = 1; } else { @@ -478,6 +488,7 @@ run_epoch_lod(struct stream_engine* e, struct stream_context* ctx) &e->lod_shared, &e->ord, e->sched.fill, + s->lod_timing_slot, &ctx->levels, stream_engine_pool_epoch(e, ctx, e->sched.accumulated), ctx->config.dtype, diff --git a/src/gpu/schedule.h b/src/gpu/schedule.h index e0d7d8e4..0a7ccf57 100644 --- a/src/gpu/schedule.h +++ b/src/gpu/schedule.h @@ -71,6 +71,7 @@ struct schedule_slot { uint32_t active_levels_mask; // union of per-epoch active masks uint32_t* batch_active_masks; // [epochs_per_batch]; per-array allocation + int lod_timing_slot; // generation owned by the batch being filled int kicked; uint64_t kick_seq; struct flush_handoff handoff; diff --git a/src/gpu/stream.engine.h b/src/gpu/stream.engine.h index cece8672..ed96a16e 100644 --- a/src/gpu/stream.engine.h +++ b/src/gpu/stream.engine.h @@ -61,9 +61,20 @@ struct lod_timing // owned by the engine and sized for that one array. struct lod_shared_state { - CUdeviceptr d_linear; // linear epoch buffer (device) - CUdeviceptr d_morton; // morton-ordered LOD output (all levels packed) - struct lod_timing timing[2]; // double-buffered pipeline timing + CUdeviceptr d_linear; // linear epoch buffer (device) + CUdeviceptr d_morton; // morton-ordered LOD output (all levels packed) + + // Per-fc ordering edge (GPU_EDGE_LOD_DONE). Stable per fc — recorded every + // epoch, never rotates — so the bound edge handle stays valid for waits. + CUevent lod_done[2]; + + // Pipeline timing read at drain. Rotated across batches so a batch's + // generation stays stable while the worker reads it and the producer + // re-records the next same-fc batch into a different one (#154). Each + // batch picks a generation at its first epoch and owns it through drain; + // the chosen index rides the flush handoff. + struct lod_timing timing[LOD_TIMING_SLOTS]; + int next_timing_slot; // producer-only round-robin picker }; // Per-array LOD state. In multiarray, one instance per array; the active @@ -109,6 +120,7 @@ struct compress_agg_input uint32_t active_levels_mask; const uint32_t* batch_active_masks; // borrowed from schedule_slot [K] uint32_t epochs_per_batch; + int lod_timing_slot; // the batch's owned LOD timing generation (#154) }; // Per-shard layout tables shared across arrays, sized to the max diff --git a/src/gpu/stream.init.c b/src/gpu/stream.init.c index 14bdaff1..34f059db 100644 --- a/src/gpu/stream.init.c +++ b/src/gpu/stream.init.c @@ -171,10 +171,10 @@ stream_engine_init(struct stream_engine* e, lim->lod_linear_bytes, lim->lod_morton_bytes, e->streams.compute) == 0); - // t_end doubles as GPU_EDGE_LOD_DONE; already seeded by the init above. + // lod_done is the stable per-fc ordering edge; already seeded above. for (int fc = 0; fc < 2; ++fc) gpu_ordering_bind( - &e->ord, GPU_EDGE_LOD_DONE, fc, e->lod_shared.timing[fc].t_end); + &e->ord, GPU_EDGE_LOD_DONE, fc, e->lod_shared.lod_done[fc]); } CU(Fail, cuStreamSynchronize(e->streams.compute)); diff --git a/src/gpu/stream.lod.c b/src/gpu/stream.lod.c index da394d8f..c9f7467e 100644 --- a/src/gpu/stream.lod.c +++ b/src/gpu/stream.lod.c @@ -394,18 +394,26 @@ lod_shared_state_init(struct lod_shared_state* sh, CU(Fail, cuMemAlloc(&sh->d_linear, linear_bytes)); CU(Fail, cuMemAlloc(&sh->d_morton, morton_bytes)); + sh->next_timing_slot = 0; + + // Ordering-edge events, one per fc (GPU_EDGE_LOD_DONE binds these). for (int fc = 0; fc < 2; ++fc) { - CU(Fail, cuEventCreate(&sh->timing[fc].t_start, CU_EVENT_DEFAULT)); - CU(Fail, cuEventCreate(&sh->timing[fc].t_scatter_end, CU_EVENT_DEFAULT)); - CU(Fail, cuEventCreate(&sh->timing[fc].t_reduce_end, CU_EVENT_DEFAULT)); - CU(Fail, cuEventCreate(&sh->timing[fc].t_append_end, CU_EVENT_DEFAULT)); - CU(Fail, cuEventCreate(&sh->timing[fc].t_end, CU_EVENT_DEFAULT)); - // Seed events so initial waits complete immediately. - CU(Fail, cuEventRecord(sh->timing[fc].t_start, seed_stream)); - CU(Fail, cuEventRecord(sh->timing[fc].t_scatter_end, seed_stream)); - CU(Fail, cuEventRecord(sh->timing[fc].t_reduce_end, seed_stream)); - CU(Fail, cuEventRecord(sh->timing[fc].t_append_end, seed_stream)); - CU(Fail, cuEventRecord(sh->timing[fc].t_end, seed_stream)); + CU(Fail, cuEventCreate(&sh->lod_done[fc], CU_EVENT_DEFAULT)); + CU(Fail, cuEventRecord(sh->lod_done[fc], seed_stream)); + } + + // Timing generations. Seed so initial metric reads see a valid interval. + for (int g = 0; g < LOD_TIMING_SLOTS; ++g) { + CU(Fail, cuEventCreate(&sh->timing[g].t_start, CU_EVENT_DEFAULT)); + CU(Fail, cuEventCreate(&sh->timing[g].t_scatter_end, CU_EVENT_DEFAULT)); + CU(Fail, cuEventCreate(&sh->timing[g].t_reduce_end, CU_EVENT_DEFAULT)); + CU(Fail, cuEventCreate(&sh->timing[g].t_append_end, CU_EVENT_DEFAULT)); + CU(Fail, cuEventCreate(&sh->timing[g].t_end, CU_EVENT_DEFAULT)); + CU(Fail, cuEventRecord(sh->timing[g].t_start, seed_stream)); + CU(Fail, cuEventRecord(sh->timing[g].t_scatter_end, seed_stream)); + CU(Fail, cuEventRecord(sh->timing[g].t_reduce_end, seed_stream)); + CU(Fail, cuEventRecord(sh->timing[g].t_append_end, seed_stream)); + CU(Fail, cuEventRecord(sh->timing[g].t_end, seed_stream)); } return 0; @@ -424,12 +432,14 @@ lod_shared_state_destroy(struct lod_shared_state* sh) // Null-check each event individually: a partial init (e.g., cleanup after // lod_shared_state_init fails halfway through event creation) leaves some // handles zero, and cuEventDestroy on NULL returns an error. - for (int fc = 0; fc < 2; ++fc) { - cu_event_destroy(sh->timing[fc].t_start); - cu_event_destroy(sh->timing[fc].t_scatter_end); - cu_event_destroy(sh->timing[fc].t_reduce_end); - cu_event_destroy(sh->timing[fc].t_append_end); - cu_event_destroy(sh->timing[fc].t_end); + for (int fc = 0; fc < 2; ++fc) + cu_event_destroy(sh->lod_done[fc]); + for (int g = 0; g < LOD_TIMING_SLOTS; ++g) { + cu_event_destroy(sh->timing[g].t_start); + cu_event_destroy(sh->timing[g].t_scatter_end); + cu_event_destroy(sh->timing[g].t_reduce_end); + cu_event_destroy(sh->timing[g].t_append_end); + cu_event_destroy(sh->timing[g].t_end); } memset(sh, 0, sizeof(*sh)); } @@ -731,6 +741,7 @@ lod_run_epoch(struct lod_state* lod, struct lod_shared_state* sh, struct gpu_ordering* ord, int fc, + int timing_slot, const struct level_geometry* levels, struct gpu_pool_view pool_epoch, enum dtype dtype, @@ -741,7 +752,7 @@ lod_run_epoch(struct lod_state* lod, uint32_t* out_active_mask) { struct lod_plan* p = &lod->plan; - struct lod_timing* t = &sh->timing[fc]; + struct lod_timing* t = &sh->timing[timing_slot]; CU(Error, cuEventRecord(t->t_start, compute)); @@ -804,7 +815,9 @@ lod_run_epoch(struct lod_state* lod, lod, sh, levels, pool_epoch, dtype, active_levels_mask, compute) == 0); - // t_end doubles as GPU_EDGE_LOD_DONE (bound at engine init). + // Timing endpoint for the batch's generation; the ordering edge records + // separately on the stable per-fc sh->lod_done[fc] (bound at engine init). + CU(Error, cuEventRecord(t->t_end, compute)); CHECK(Error, gpu_edge_record(ord, GPU_EDGE_LOD_DONE, fc, compute) == 0); *out_active_mask = active_levels_mask; diff --git a/src/gpu/stream.lod.h b/src/gpu/stream.lod.h index ba18874e..5d3a7a7d 100644 --- a/src/gpu/stream.lod.h +++ b/src/gpu/stream.lod.h @@ -17,10 +17,11 @@ lod_state_init(struct lod_state* lod, // linear_bytes / morton_bytes are the buffer sizes (multiarray passes the max // across arrays; single-array passes the array's own sizes). // -// The timing events are created AND seeded on `seed_stream` so the first -// downstream wait completes immediately. `seed_stream` must be the same -// compute stream that later runs lod_run_epoch — seeding on an unrelated -// stream leaves the initial waits unsatisfied. +// The lod_done ordering events and the timing generations are created AND +// seeded on `seed_stream` so the first downstream wait (and metric read) +// completes immediately. `seed_stream` must be the same compute stream that +// later runs lod_run_epoch — seeding on an unrelated stream leaves the +// initial waits unsatisfied. // // Returns 0 on success; on failure, the struct is left safe to pass to // lod_shared_state_destroy. @@ -55,13 +56,16 @@ lod_state_device_bytes(const struct computed_stream_layouts* cl, // Run LOD pipeline for one epoch: gather -> reduce -> append fold -> // morton-to-chunks. pool_epoch: acquired view of this epoch's chunk pool -// region (all levels). *out_active_mask: set to bitmask of active LOD levels -// for this epoch. Returns 0 on success, non-zero on error. +// region (all levels). timing_slot: the batch's owned timing generation +// (sh->timing[timing_slot]); the ordering edge records on the stable +// per-fc sh->lod_done[fc]. *out_active_mask: set to bitmask of active LOD +// levels for this epoch. Returns 0 on success, non-zero on error. int lod_run_epoch(struct lod_state* lod, struct lod_shared_state* sh, struct gpu_ordering* ord, int fc, + int timing_slot, const struct level_geometry* levels, struct gpu_pool_view pool_epoch, enum dtype dtype,