feat(datafusion): report IcebergTableScan metrics#2521
Conversation
9f036f5 to
fe4a322
Compare
| baseline_metrics: BaselineMetrics, | ||
| ) -> Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>> { | ||
| futures::stream::poll_fn(move |cx| { | ||
| let baseline_metrics = baseline_metrics.clone(); |
There was a problem hiding this comment.
I don't think baseline_metrics.clone() is needed. BaselineMetrics::elapsed_compute() and record_poll() both take &self, so the captured value can be used directly. The clone is cheap (Arc/Count copies) but it's dead weight on every poll and reads as if there's a borrow problem to work around. Suggest:
futures::stream::poll_fn(move |cx| {
let _timer = baseline_metrics.elapsed_compute().timer();
let poll = stream.as_mut().poll_next(cx);
baseline_metrics.record_poll(poll)
})There was a problem hiding this comment.
Good catch. Removed the per-poll clone() in 19db703.
This is cleaner and also avoids dropping a cloned BaselineMetrics on every poll, which could record end_timestamp earlier than intended.
| let metrics = metrics.clone_inner(); | ||
| assert_eq!(metrics.output_rows(), Some(3)); | ||
| assert!( | ||
| metrics.elapsed_compute().is_some_and(|elapsed| elapsed > 0), |
There was a problem hiding this comment.
stream_with_baseline_metrics_records_rows_and_compute asserts output_rows and elapsed_compute. The PR description also lists output_batches, output_bytes, and completion timestamps as exposed. Those are in fact recorded — BaselineMetrics::record_poll → batch.record_output(...) updates output_batches and output_bytes (see datafusion/physical-expr-common/src/metrics/baseline.rs:331) — so adding assert!(metrics.output_batches() == Some(1)) and assert!(metrics.output_bytes().is_some_and(|b| b > 0)) is a cheap regression guard that matches the documented contract.
There was a problem hiding this comment.
Agreed on covering the rest of the baseline metrics. I added assertions for output_batches, output_bytes, start_timestamp, and end_timestamp in the focused stream_with_baseline_metrics test in 19db703.
I adapted this through MetricsSet::sum(...) + MetricValue matching because DataFusion 53.1 does not expose MetricsSet::output_batches() / output_bytes() convenience methods.
mbutrovich
left a comment
There was a problem hiding this comment.
Minor suggestions, thanks for tackling this @geoffreyclaude!
fb0a0ae to
6dc9f99
Compare
@mbutrovich Thanks for the quick review! PR should now be updated with your suggestions, as two additional commits. I also had to add a small public API snapshot update in f94d61f after rebasing onto current |
6dc9f99 to
e3293de
Compare
e3293de to
f94d61f
Compare
Which issue does this PR close?
What changes are included in this PR?
IcebergTableScannow owns a DataFusionExecutionPlanMetricsSet, returns it fromExecutionPlan::metrics(), and resets it fromExecutionPlan::reset_state().The scan output stream is wrapped in a small
poll_fnadapter that recordsBaselineMetricswhile the Iceberg stream is polled. This exposes the standard DataFusion operator metrics such aselapsed_compute,output_rows,output_batches,output_bytes, and completion timestamps inEXPLAIN ANALYZE.Focused tests cover the metrics wrapper directly and the catalog-backed provider execution path, including reset-state behavior.
Are these changes tested?
cargo test -p iceberg-datafusion stream_with_baseline_metrics_records_rows_and_compute --lockedcargo test -p iceberg-datafusion test_catalog_backed_provider_scan_reports_metrics --lockedcargo check -p iceberg-datafusion --lockedcargo test -p iceberg-datafusion --lockedcargo clippy -p iceberg-datafusion --all-targets --locked