From be8d1ffaf009fecd96e33aa9cfd90526d8b2f9b3 Mon Sep 17 00:00:00 2001 From: Geoffrey Claude Date: Thu, 28 May 2026 10:02:40 +0200 Subject: [PATCH 1/4] feat(datafusion): report IcebergTableScan metrics --- .../datafusion/src/physical_plan/scan.rs | 97 ++++++++++++++++++- .../integrations/datafusion/src/table/mod.rs | 35 +++++++ 2 files changed, 129 insertions(+), 3 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index 36539ae503..f5e309aa6e 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -26,10 +26,11 @@ use datafusion::error::Result as DFResult; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; use datafusion::physical_expr::EquivalenceProperties; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties}; use datafusion::prelude::Expr; -use futures::{Stream, TryStreamExt}; +use futures::{Stream, StreamExt, TryStreamExt}; use iceberg::expr::Predicate; use iceberg::table::Table; @@ -53,6 +54,8 @@ pub struct IcebergTableScan { predicates: Option, /// Optional limit on the number of rows to return limit: Option, + /// Execution metrics for this scan. + metrics: ExecutionPlanMetricsSet, } impl IcebergTableScan { @@ -80,6 +83,7 @@ impl IcebergTableScan { projection, predicates, limit, + metrics: ExecutionPlanMetricsSet::new(), } } @@ -143,7 +147,7 @@ impl ExecutionPlan for IcebergTableScan { fn execute( &self, - _partition: usize, + partition: usize, _context: Arc, ) -> DFResult { let fut = get_batch_stream( @@ -174,11 +178,30 @@ impl ExecutionPlan for IcebergTableScan { Box::pin(stream) }; + let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); + let measured_stream = stream_with_baseline_metrics(limited_stream, baseline_metrics); + Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), - limited_stream, + measured_stream, ))) } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + + fn reset_state(self: Arc) -> DFResult> { + Ok(Arc::new(Self { + table: self.table.clone(), + snapshot_id: self.snapshot_id, + plan_properties: Arc::clone(&self.plan_properties), + projection: self.projection.clone(), + predicates: self.predicates.clone(), + limit: self.limit, + metrics: ExecutionPlanMetricsSet::new(), + })) + } } impl DisplayAs for IcebergTableScan { @@ -237,6 +260,19 @@ async fn get_batch_stream( Ok(Box::pin(stream)) } +fn stream_with_baseline_metrics( + mut stream: Pin> + Send>>, + baseline_metrics: BaselineMetrics, +) -> Pin> + Send>> { + futures::stream::poll_fn(move |cx| { + let baseline_metrics = baseline_metrics.clone(); + let _timer = baseline_metrics.elapsed_compute().timer(); + let poll = stream.as_mut().poll_next(cx); + baseline_metrics.record_poll(poll) + }) + .boxed() +} + fn get_column_names( schema: ArrowSchemaRef, projection: Option<&Vec>, @@ -247,3 +283,58 @@ fn get_column_names( .collect::>() }) } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use datafusion::arrow::array::Int64Array; + use datafusion::arrow::datatypes::{ + DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, + }; + use datafusion::arrow::record_batch::RecordBatch; + use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet}; + use futures::StreamExt; + + use super::stream_with_baseline_metrics; + + #[test] + fn stream_with_baseline_metrics_records_rows_and_compute() { + let metrics = ExecutionPlanMetricsSet::new(); + let baseline_metrics = BaselineMetrics::new(&metrics, 0); + let batch = make_batch(); + let stream = Box::pin(futures::stream::iter([Ok(batch)])); + let mut stream = stream_with_baseline_metrics(stream, baseline_metrics); + + futures::executor::block_on(async { + let batch = stream + .next() + .await + .expect("stream should return one item") + .expect("stream item should be valid"); + assert_eq!(batch.num_rows(), 3); + assert!(stream.next().await.is_none()); + }); + + let metrics = metrics.clone_inner(); + assert_eq!(metrics.output_rows(), Some(3)); + assert!( + metrics.elapsed_compute().is_some_and(|elapsed| elapsed > 0), + "elapsed_compute should be recorded" + ); + } + + fn make_batch() -> RecordBatch { + let schema = make_arrow_schema(); + let values = Arc::new(Int64Array::from(vec![1, 2, 3])); + RecordBatch::try_new(schema, vec![values]).unwrap() + } + + fn make_arrow_schema() -> ArrowSchemaRef { + Arc::new(ArrowSchema::new(vec![Field::new( + "id", + DataType::Int64, + false, + )])) + } +} diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 75b7988d8d..f72556a06a 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -519,6 +519,41 @@ mod tests { assert!(physical_plan.is_ok()); } + #[tokio::test] + async fn test_catalog_backed_provider_scan_reports_metrics() { + use datafusion::datasource::TableProvider; + + let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await; + let table_provider = + IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone()) + .await + .unwrap(); + + let ctx = SessionContext::new(); + let scan_plan = table_provider + .scan(&ctx.state(), None, &[], None) + .await + .unwrap(); + let batches = datafusion::physical_plan::collect(Arc::clone(&scan_plan), ctx.task_ctx()) + .await + .unwrap(); + let output_rows = batches.iter().map(|batch| batch.num_rows()).sum(); + + let metrics = scan_plan.metrics().expect("scan should expose metrics"); + assert_eq!(metrics.output_rows(), Some(output_rows)); + assert!( + metrics.elapsed_compute().is_some_and(|elapsed| elapsed > 0), + "elapsed_compute should be recorded" + ); + + let reset_plan = scan_plan.reset_state().unwrap(); + let metrics = reset_plan + .metrics() + .expect("reset scan should expose metrics"); + assert_eq!(metrics.output_rows(), None); + assert_eq!(metrics.elapsed_compute(), None); + } + // Tests for IcebergTableProvider #[tokio::test] From 19db7035ef14e626835973814e6104b9c34261f0 Mon Sep 17 00:00:00 2001 From: Geoffrey Claude Date: Mon, 1 Jun 2026 10:04:00 +0200 Subject: [PATCH 2/4] fix(datafusion): avoid cloning scan metrics per poll --- crates/integrations/datafusion/src/physical_plan/scan.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index f5e309aa6e..a6367790f5 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -265,7 +265,6 @@ fn stream_with_baseline_metrics( baseline_metrics: BaselineMetrics, ) -> Pin> + Send>> { futures::stream::poll_fn(move |cx| { - let baseline_metrics = baseline_metrics.clone(); let _timer = baseline_metrics.elapsed_compute().timer(); let poll = stream.as_mut().poll_next(cx); baseline_metrics.record_poll(poll) From aaaa615f588df767a918bb202bef0efa289f2d90 Mon Sep 17 00:00:00 2001 From: Geoffrey Claude Date: Mon, 1 Jun 2026 10:08:31 +0200 Subject: [PATCH 3/4] test(datafusion): cover all scan baseline metrics --- .../datafusion/src/physical_plan/scan.rs | 49 ++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index a6367790f5..df0b91e0ad 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -292,7 +292,10 @@ mod tests { DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, }; use datafusion::arrow::record_batch::RecordBatch; - use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet}; + use datafusion::common::utils::memory::get_record_batch_memory_size; + use datafusion::physical_plan::metrics::{ + BaselineMetrics, ExecutionPlanMetricsSet, MetricValue, MetricsSet, + }; use futures::StreamExt; use super::stream_with_baseline_metrics; @@ -302,6 +305,7 @@ mod tests { let metrics = ExecutionPlanMetricsSet::new(); let baseline_metrics = BaselineMetrics::new(&metrics, 0); let batch = make_batch(); + let expected_output_bytes = get_record_batch_memory_size(&batch); let stream = Box::pin(futures::stream::iter([Ok(batch)])); let mut stream = stream_with_baseline_metrics(stream, baseline_metrics); @@ -317,10 +321,20 @@ mod tests { let metrics = metrics.clone_inner(); assert_eq!(metrics.output_rows(), Some(3)); + assert_eq!(output_batches(&metrics), Some(1)); + assert_eq!(output_bytes(&metrics), Some(expected_output_bytes)); assert!( metrics.elapsed_compute().is_some_and(|elapsed| elapsed > 0), "elapsed_compute should be recorded" ); + assert!( + start_timestamp(&metrics).is_some_and(|timestamp| timestamp > 0), + "start_timestamp should be recorded" + ); + assert!( + end_timestamp(&metrics).is_some_and(|timestamp| timestamp > 0), + "end_timestamp should be recorded" + ); } fn make_batch() -> RecordBatch { @@ -336,4 +350,37 @@ mod tests { false, )])) } + + fn metric_value_as_usize( + metrics: &MetricsSet, + matches_metric: impl Fn(&MetricValue) -> bool, + ) -> Option { + metrics + .sum(|metric| matches_metric(metric.value())) + .map(|metric| metric.as_usize()) + } + + fn output_batches(metrics: &MetricsSet) -> Option { + metric_value_as_usize(metrics, |value| { + matches!(value, MetricValue::OutputBatches(_)) + }) + } + + fn output_bytes(metrics: &MetricsSet) -> Option { + metric_value_as_usize(metrics, |value| { + matches!(value, MetricValue::OutputBytes(_)) + }) + } + + fn start_timestamp(metrics: &MetricsSet) -> Option { + metric_value_as_usize(metrics, |value| { + matches!(value, MetricValue::StartTimestamp(_)) + }) + } + + fn end_timestamp(metrics: &MetricsSet) -> Option { + metric_value_as_usize(metrics, |value| { + matches!(value, MetricValue::EndTimestamp(_)) + }) + } } From f94d61fb56c3503524e0b302cb9420e6d225dec0 Mon Sep 17 00:00:00 2001 From: Geoffrey Claude Date: Mon, 1 Jun 2026 13:32:38 +0200 Subject: [PATCH 4/4] chore(datafusion): update public api snapshot --- crates/integrations/datafusion/public-api.txt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/integrations/datafusion/public-api.txt b/crates/integrations/datafusion/public-api.txt index d717b1d00d..0f5cbc1097 100644 --- a/crates/integrations/datafusion/public-api.txt +++ b/crates/integrations/datafusion/public-api.txt @@ -79,9 +79,11 @@ pub fn iceberg_datafusion::physical_plan::IcebergTableScan::fmt_as(&self, _t: da impl datafusion_physical_plan::execution_plan::ExecutionPlan for iceberg_datafusion::physical_plan::IcebergTableScan pub fn iceberg_datafusion::physical_plan::IcebergTableScan::as_any(&self) -> &dyn core::any::Any pub fn iceberg_datafusion::physical_plan::IcebergTableScan::children(&self) -> alloc::vec::Vec<&alloc::sync::Arc<(dyn datafusion_physical_plan::execution_plan::ExecutionPlan + 'static)>> -pub fn iceberg_datafusion::physical_plan::IcebergTableScan::execute(&self, _partition: usize, _context: alloc::sync::Arc) -> datafusion_common::error::Result +pub fn iceberg_datafusion::physical_plan::IcebergTableScan::execute(&self, partition: usize, _context: alloc::sync::Arc) -> datafusion_common::error::Result +pub fn iceberg_datafusion::physical_plan::IcebergTableScan::metrics(&self) -> core::option::Option pub fn iceberg_datafusion::physical_plan::IcebergTableScan::name(&self) -> &str pub fn iceberg_datafusion::physical_plan::IcebergTableScan::properties(&self) -> &alloc::sync::Arc +pub fn iceberg_datafusion::physical_plan::IcebergTableScan::reset_state(self: alloc::sync::Arc) -> datafusion_common::error::Result> pub fn iceberg_datafusion::physical_plan::IcebergTableScan::with_new_children(self: alloc::sync::Arc, _children: alloc::vec::Vec>) -> datafusion_common::error::Result> impl core::marker::Freeze for iceberg_datafusion::physical_plan::IcebergTableScan impl core::marker::Send for iceberg_datafusion::physical_plan::IcebergTableScan