From 5392829f9043ba635a0aea2298905c18a1ef9317 Mon Sep 17 00:00:00 2001 From: liujiwen-up Date: Thu, 14 May 2026 16:01:13 +0800 Subject: [PATCH] test(datafusion): add vortex SQL e2e --- .github/workflows/ci.yml | 6 ++ crates/integrations/datafusion/Cargo.toml | 1 + .../datafusion/tests/common/mod.rs | 18 +++- .../datafusion/tests/vortex_tables.rs | 80 +++++++++++++++++ crates/paimon/src/arrow/format/vortex.rs | 90 ++++++++++++++++++- 5 files changed, 190 insertions(+), 5 deletions(-) create mode 100644 crates/integrations/datafusion/tests/vortex_tables.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b19d49fe..123e9c9f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -121,6 +121,12 @@ jobs: RUST_LOG: DEBUG RUST_BACKTRACE: full + - name: DataFusion Vortex Integration Test + run: cargo test -p paimon-datafusion --features vortex --test vortex_tables + env: + RUST_LOG: DEBUG + RUST_BACKTRACE: full + - name: Install uv uses: astral-sh/setup-uv@37802adc94f370d6bfd71619e3f0bf239e1f3b78 with: diff --git a/crates/integrations/datafusion/Cargo.toml b/crates/integrations/datafusion/Cargo.toml index 30b5100e..3e7392de 100644 --- a/crates/integrations/datafusion/Cargo.toml +++ b/crates/integrations/datafusion/Cargo.toml @@ -29,6 +29,7 @@ keywords = ["paimon", "datafusion", "integrations"] [features] fulltext = ["paimon/fulltext"] +vortex = ["paimon/vortex"] [dependencies] async-trait = "0.1" diff --git a/crates/integrations/datafusion/tests/common/mod.rs b/crates/integrations/datafusion/tests/common/mod.rs index d32734c3..bcd0cd75 100644 --- a/crates/integrations/datafusion/tests/common/mod.rs +++ b/crates/integrations/datafusion/tests/common/mod.rs @@ -54,9 +54,24 @@ pub async fn setup_sql_context() -> (TempDir, SQLContext) { #[allow(dead_code)] pub async fn collect_id_name(sql_context: &SQLContext, sql: &str) -> Vec<(i32, String)> { + let mut rows = collect_id_name_in_batch_order(sql_context, sql).await; + rows.sort_by_key(|(id, _)| *id); + rows +} + +#[allow(dead_code)] +pub async fn collect_id_name_in_batch_order( + sql_context: &SQLContext, + sql: &str, +) -> Vec<(i32, String)> { let batches = sql_context.sql(sql).await.unwrap().collect().await.unwrap(); + collect_id_name_from_batches_in_order(&batches) +} + +#[allow(dead_code)] +pub fn collect_id_name_from_batches_in_order(batches: &[RecordBatch]) -> Vec<(i32, String)> { let mut rows = Vec::new(); - for batch in &batches { + for batch in batches { let ids = batch .column_by_name("id") .and_then(|c| c.as_any().downcast_ref::()) @@ -69,7 +84,6 @@ pub async fn collect_id_name(sql_context: &SQLContext, sql: &str) -> Vec<(i32, S rows.push((ids.value(i), names.value(i).to_string())); } } - rows.sort_by_key(|(id, _)| *id); rows } diff --git a/crates/integrations/datafusion/tests/vortex_tables.rs b/crates/integrations/datafusion/tests/vortex_tables.rs new file mode 100644 index 00000000..9492429f --- /dev/null +++ b/crates/integrations/datafusion/tests/vortex_tables.rs @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#![cfg(feature = "vortex")] + +//! Vortex file format SQL end-to-end tests. + +mod common; + +use std::path::Path; + +#[tokio::test] +async fn test_vortex_file_format_sql_e2e() { + let (tmp, sql_context) = common::setup_sql_context().await; + + common::exec( + &sql_context, + "CREATE TABLE paimon.test_db.t ( + id INT, + name STRING + ) WITH ( + 'file.format' = 'vortex' + )", + ) + .await; + + common::exec( + &sql_context, + "INSERT INTO paimon.test_db.t VALUES (1, 'Alice'), (2, 'Bob')", + ) + .await; + + assert!( + contains_vortex_file(tmp.path()), + "expected Vortex data file" + ); + + let rows = common::collect_id_name_in_batch_order( + &sql_context, + "SELECT id, name FROM paimon.test_db.t ORDER BY id", + ) + .await; + assert_eq!(rows, vec![(1, "Alice".to_string()), (2, "Bob".to_string())]); + + let filtered = common::collect_id_name_in_batch_order( + &sql_context, + "SELECT id, name FROM paimon.test_db.t WHERE id = 2", + ) + .await; + assert_eq!(filtered, vec![(2, "Bob".to_string())]); +} + +fn contains_vortex_file(path: &Path) -> bool { + let entries = std::fs::read_dir(path).expect("read warehouse dir"); + for entry in entries { + let path = entry.expect("read dir entry").path(); + if path.is_dir() { + if contains_vortex_file(&path) { + return true; + } + } else if path.extension().is_some_and(|ext| ext == "vortex") { + return true; + } + } + false +} diff --git a/crates/paimon/src/arrow/format/vortex.rs b/crates/paimon/src/arrow/format/vortex.rs index 3b7ba275..f0a3a48e 100644 --- a/crates/paimon/src/arrow/format/vortex.rs +++ b/crates/paimon/src/arrow/format/vortex.rs @@ -21,7 +21,7 @@ use crate::spec::{DataField, Datum, Predicate, PredicateOperator}; use crate::table::{ArrowRecordBatchStream, RowRange}; use crate::Error; use arrow_array::RecordBatch; -use arrow_schema::SchemaRef; +use arrow_schema::{DataType as ArrowDataType, SchemaRef}; use async_trait::async_trait; use futures::future::BoxFuture; use futures::StreamExt; @@ -472,7 +472,7 @@ fn vortex_array_to_record_batch( schema: &SchemaRef, ) -> crate::Result { let arrow_array = vortex_array - .into_arrow_preferred() + .into_arrow(&ArrowDataType::Struct(schema.fields().clone())) .map_err(|e| Error::DataInvalid { message: format!("Failed to convert Vortex array to Arrow: {e}"), source: None, @@ -486,6 +486,17 @@ fn vortex_array_to_record_batch( source: None, })?; + if struct_array.columns().len() != schema.fields().len() { + return Err(Error::DataInvalid { + message: format!( + "Vortex column count {} does not match target schema column count {}", + struct_array.columns().len(), + schema.fields().len() + ), + source: None, + }); + } + RecordBatch::try_new(schema.clone(), struct_array.columns().to_vec()).map_err(|e| { Error::DataInvalid { message: format!("Failed to build RecordBatch from Vortex data: {e}"), @@ -680,7 +691,8 @@ mod tests { use super::*; use crate::arrow::format::FormatFileWriter; use crate::io::FileIOBuilder; - use arrow_array::Int32Array; + use crate::spec::{DataField, DataType, VarCharType}; + use arrow_array::{Int32Array, StringArray}; use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema}; fn test_arrow_schema() -> Arc { @@ -758,6 +770,78 @@ mod tests { assert_eq!(total_rows, 3); } + #[tokio::test] + async fn test_vortex_reader_returns_utf8_for_string_schema() { + let file_io = FileIOBuilder::new("memory").build().unwrap(); + let path = "memory:/test_vortex_utf8_schema.vortex"; + let output = file_io.new_output(path).unwrap(); + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("id", ArrowDataType::Int32, false), + ArrowField::new("name", ArrowDataType::Utf8, true), + ])); + + let mut writer: Box = Box::new( + VortexFormatWriter::new(&output, schema.clone()) + .await + .unwrap(), + ); + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(Int32Array::from(vec![1, 2])), + Arc::new(StringArray::from(vec![Some("Alice"), Some("Bob")])), + ], + ) + .unwrap(); + writer.write(&batch).await.unwrap(); + writer.close().await.unwrap(); + + let input = file_io.new_input(path).unwrap(); + let file_reader = input.reader().await.unwrap(); + let metadata = input.metadata().await.unwrap(); + let read_fields = vec![ + DataField::new( + 0, + "id".to_string(), + DataType::Int(crate::spec::IntType::new()), + ), + DataField::new( + 1, + "name".to_string(), + DataType::VarChar(VarCharType::string_type()), + ), + ]; + + let reader = VortexFormatReader; + let mut stream = reader + .read_batch_stream( + Box::new(file_reader), + metadata.size, + &read_fields, + None, + None, + None, + ) + .await + .unwrap(); + + let mut names = Vec::new(); + while let Some(result) = stream.next().await { + let batch = result.unwrap(); + assert_eq!(batch.schema().field(1).data_type(), &ArrowDataType::Utf8); + assert_eq!(batch.column(1).data_type(), &ArrowDataType::Utf8); + let name_col = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..batch.num_rows() { + names.push(name_col.value(i).to_string()); + } + } + assert_eq!(names, vec!["Alice".to_string(), "Bob".to_string()]); + } + #[tokio::test] async fn test_vortex_writer_multiple_batches() { let file_io = FileIOBuilder::new("memory").build().unwrap();