Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion crates/integrations/datafusion/public-api.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,11 @@ pub fn iceberg_datafusion::physical_plan::IcebergTableScan::fmt_as(&self, _t: da
impl datafusion_physical_plan::execution_plan::ExecutionPlan for iceberg_datafusion::physical_plan::IcebergTableScan
pub fn iceberg_datafusion::physical_plan::IcebergTableScan::as_any(&self) -> &dyn core::any::Any
pub fn iceberg_datafusion::physical_plan::IcebergTableScan::children(&self) -> alloc::vec::Vec<&alloc::sync::Arc<(dyn datafusion_physical_plan::execution_plan::ExecutionPlan + 'static)>>
pub fn iceberg_datafusion::physical_plan::IcebergTableScan::execute(&self, _partition: usize, _context: alloc::sync::Arc<datafusion_execution::task::TaskContext>) -> datafusion_common::error::Result<datafusion_execution::stream::SendableRecordBatchStream>
pub fn iceberg_datafusion::physical_plan::IcebergTableScan::execute(&self, partition: usize, _context: alloc::sync::Arc<datafusion_execution::task::TaskContext>) -> datafusion_common::error::Result<datafusion_execution::stream::SendableRecordBatchStream>
pub fn iceberg_datafusion::physical_plan::IcebergTableScan::metrics(&self) -> core::option::Option<datafusion_physical_expr_common::metrics::MetricsSet>
pub fn iceberg_datafusion::physical_plan::IcebergTableScan::name(&self) -> &str
pub fn iceberg_datafusion::physical_plan::IcebergTableScan::properties(&self) -> &alloc::sync::Arc<datafusion_physical_plan::execution_plan::PlanProperties>
pub fn iceberg_datafusion::physical_plan::IcebergTableScan::reset_state(self: alloc::sync::Arc<Self>) -> datafusion_common::error::Result<alloc::sync::Arc<dyn datafusion_physical_plan::execution_plan::ExecutionPlan>>
pub fn iceberg_datafusion::physical_plan::IcebergTableScan::with_new_children(self: alloc::sync::Arc<Self>, _children: alloc::vec::Vec<alloc::sync::Arc<dyn datafusion_physical_plan::execution_plan::ExecutionPlan>>) -> datafusion_common::error::Result<alloc::sync::Arc<dyn datafusion_physical_plan::execution_plan::ExecutionPlan>>
impl core::marker::Freeze for iceberg_datafusion::physical_plan::IcebergTableScan
impl core::marker::Send for iceberg_datafusion::physical_plan::IcebergTableScan
Expand Down
143 changes: 140 additions & 3 deletions crates/integrations/datafusion/src/physical_plan/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ use datafusion::error::Result as DFResult;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_expr::EquivalenceProperties;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties};
use datafusion::prelude::Expr;
use futures::{Stream, TryStreamExt};
use futures::{Stream, StreamExt, TryStreamExt};
use iceberg::expr::Predicate;
use iceberg::table::Table;

Expand All @@ -53,6 +54,8 @@ pub struct IcebergTableScan {
predicates: Option<Predicate>,
/// Optional limit on the number of rows to return
limit: Option<usize>,
/// Execution metrics for this scan.
metrics: ExecutionPlanMetricsSet,
}

impl IcebergTableScan {
Expand Down Expand Up @@ -80,6 +83,7 @@ impl IcebergTableScan {
projection,
predicates,
limit,
metrics: ExecutionPlanMetricsSet::new(),
}
}

Expand Down Expand Up @@ -143,7 +147,7 @@ impl ExecutionPlan for IcebergTableScan {

fn execute(
&self,
_partition: usize,
partition: usize,
_context: Arc<TaskContext>,
) -> DFResult<SendableRecordBatchStream> {
let fut = get_batch_stream(
Expand Down Expand Up @@ -174,11 +178,30 @@ impl ExecutionPlan for IcebergTableScan {
Box::pin(stream)
};

let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
let measured_stream = stream_with_baseline_metrics(limited_stream, baseline_metrics);

Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
limited_stream,
measured_stream,
)))
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn reset_state(self: Arc<Self>) -> DFResult<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(Self {
table: self.table.clone(),
snapshot_id: self.snapshot_id,
plan_properties: Arc::clone(&self.plan_properties),
projection: self.projection.clone(),
predicates: self.predicates.clone(),
limit: self.limit,
metrics: ExecutionPlanMetricsSet::new(),
}))
}
}

impl DisplayAs for IcebergTableScan {
Expand Down Expand Up @@ -237,6 +260,18 @@ async fn get_batch_stream(
Ok(Box::pin(stream))
}

fn stream_with_baseline_metrics(
mut stream: Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>,
baseline_metrics: BaselineMetrics,
) -> Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>> {
futures::stream::poll_fn(move |cx| {
let _timer = baseline_metrics.elapsed_compute().timer();
let poll = stream.as_mut().poll_next(cx);
baseline_metrics.record_poll(poll)
})
.boxed()
}

fn get_column_names(
schema: ArrowSchemaRef,
projection: Option<&Vec<usize>>,
Expand All @@ -247,3 +282,105 @@ fn get_column_names(
.collect::<Vec<String>>()
})
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use datafusion::arrow::array::Int64Array;
use datafusion::arrow::datatypes::{
DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::utils::memory::get_record_batch_memory_size;
use datafusion::physical_plan::metrics::{
BaselineMetrics, ExecutionPlanMetricsSet, MetricValue, MetricsSet,
};
use futures::StreamExt;

use super::stream_with_baseline_metrics;

#[test]
fn stream_with_baseline_metrics_records_rows_and_compute() {
let metrics = ExecutionPlanMetricsSet::new();
let baseline_metrics = BaselineMetrics::new(&metrics, 0);
let batch = make_batch();
let expected_output_bytes = get_record_batch_memory_size(&batch);
let stream = Box::pin(futures::stream::iter([Ok(batch)]));
let mut stream = stream_with_baseline_metrics(stream, baseline_metrics);

futures::executor::block_on(async {
let batch = stream
.next()
.await
.expect("stream should return one item")
.expect("stream item should be valid");
assert_eq!(batch.num_rows(), 3);
assert!(stream.next().await.is_none());
});

let metrics = metrics.clone_inner();
assert_eq!(metrics.output_rows(), Some(3));
assert_eq!(output_batches(&metrics), Some(1));
assert_eq!(output_bytes(&metrics), Some(expected_output_bytes));
assert!(
metrics.elapsed_compute().is_some_and(|elapsed| elapsed > 0),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stream_with_baseline_metrics_records_rows_and_compute asserts output_rows and elapsed_compute. The PR description also lists output_batches, output_bytes, and completion timestamps as exposed. Those are in fact recorded — BaselineMetrics::record_poll → batch.record_output(...) updates output_batches and output_bytes (see datafusion/physical-expr-common/src/metrics/baseline.rs:331) — so adding assert!(metrics.output_batches() == Some(1)) and assert!(metrics.output_bytes().is_some_and(|b| b > 0)) is a cheap regression guard that matches the documented contract.

Copy link
Copy Markdown
Author

@geoffreyclaude geoffreyclaude Jun 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed on covering the rest of the baseline metrics. I added assertions for output_batches, output_bytes, start_timestamp, and end_timestamp in the focused stream_with_baseline_metrics test in 19db703.

I adapted this through MetricsSet::sum(...) + MetricValue matching because DataFusion 53.1 does not expose MetricsSet::output_batches() / output_bytes() convenience methods.

"elapsed_compute should be recorded"
);
assert!(
start_timestamp(&metrics).is_some_and(|timestamp| timestamp > 0),
"start_timestamp should be recorded"
);
assert!(
end_timestamp(&metrics).is_some_and(|timestamp| timestamp > 0),
"end_timestamp should be recorded"
);
}

fn make_batch() -> RecordBatch {
let schema = make_arrow_schema();
let values = Arc::new(Int64Array::from(vec![1, 2, 3]));
RecordBatch::try_new(schema, vec![values]).unwrap()
}

fn make_arrow_schema() -> ArrowSchemaRef {
Arc::new(ArrowSchema::new(vec![Field::new(
"id",
DataType::Int64,
false,
)]))
}

fn metric_value_as_usize(
metrics: &MetricsSet,
matches_metric: impl Fn(&MetricValue) -> bool,
) -> Option<usize> {
metrics
.sum(|metric| matches_metric(metric.value()))
.map(|metric| metric.as_usize())
}

fn output_batches(metrics: &MetricsSet) -> Option<usize> {
metric_value_as_usize(metrics, |value| {
matches!(value, MetricValue::OutputBatches(_))
})
}

fn output_bytes(metrics: &MetricsSet) -> Option<usize> {
metric_value_as_usize(metrics, |value| {
matches!(value, MetricValue::OutputBytes(_))
})
}

fn start_timestamp(metrics: &MetricsSet) -> Option<usize> {
metric_value_as_usize(metrics, |value| {
matches!(value, MetricValue::StartTimestamp(_))
})
}

fn end_timestamp(metrics: &MetricsSet) -> Option<usize> {
metric_value_as_usize(metrics, |value| {
matches!(value, MetricValue::EndTimestamp(_))
})
}
}
35 changes: 35 additions & 0 deletions crates/integrations/datafusion/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,41 @@ mod tests {
assert!(physical_plan.is_ok());
}

#[tokio::test]
async fn test_catalog_backed_provider_scan_reports_metrics() {
use datafusion::datasource::TableProvider;

let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await;
let table_provider =
IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone())
.await
.unwrap();

let ctx = SessionContext::new();
let scan_plan = table_provider
.scan(&ctx.state(), None, &[], None)
.await
.unwrap();
let batches = datafusion::physical_plan::collect(Arc::clone(&scan_plan), ctx.task_ctx())
.await
.unwrap();
let output_rows = batches.iter().map(|batch| batch.num_rows()).sum();

let metrics = scan_plan.metrics().expect("scan should expose metrics");
assert_eq!(metrics.output_rows(), Some(output_rows));
assert!(
metrics.elapsed_compute().is_some_and(|elapsed| elapsed > 0),
"elapsed_compute should be recorded"
);

let reset_plan = scan_plan.reset_state().unwrap();
let metrics = reset_plan
.metrics()
.expect("reset scan should expose metrics");
assert_eq!(metrics.output_rows(), None);
assert_eq!(metrics.elapsed_compute(), None);
}

// Tests for IcebergTableProvider

#[tokio::test]
Expand Down
Loading