From 90828a27d1e3c0d7c3aa49d27493d97f2e1b4e96 Mon Sep 17 00:00:00 2001 From: QuakeWang Date: Thu, 14 May 2026 13:14:40 +0800 Subject: [PATCH 1/2] feat: support input changelog for primary key writes Signed-off-by: QuakeWang --- crates/paimon/src/spec/core_options.rs | 181 +++++- crates/paimon/src/table/commit_message.rs | 3 + crates/paimon/src/table/kv_file_writer.rs | 260 ++++++-- crates/paimon/src/table/mod.rs | 1 + crates/paimon/src/table/prepared_files.rs | 33 + crates/paimon/src/table/table_commit.rs | 84 ++- crates/paimon/src/table/table_write.rs | 744 +++++++++++++++++++++- 7 files changed, 1223 insertions(+), 83 deletions(-) create mode 100644 crates/paimon/src/table/prepared_files.rs diff --git a/crates/paimon/src/spec/core_options.rs b/crates/paimon/src/spec/core_options.rs index bafad0a1..3d15b211 100644 --- a/crates/paimon/src/spec/core_options.rs +++ b/crates/paimon/src/spec/core_options.rs @@ -40,6 +40,10 @@ const COMMIT_MAX_RETRY_WAIT_OPTION: &str = "commit.max-retry-wait"; const FILE_COMPRESSION_OPTION: &str = "file.compression"; const FILE_COMPRESSION_ZSTD_LEVEL_OPTION: &str = "file.compression.zstd-level"; const FILE_FORMAT_OPTION: &str = "file.format"; +const CHANGELOG_FILE_PREFIX_OPTION: &str = "changelog-file.prefix"; +const CHANGELOG_FILE_FORMAT_OPTION: &str = "changelog-file.format"; +const CHANGELOG_FILE_COMPRESSION_OPTION: &str = "changelog-file.compression"; +const CHANGELOG_FILE_STATS_MODE_OPTION: &str = "changelog-file.stats-mode"; const ROW_TRACKING_ENABLED_OPTION: &str = "row-tracking.enabled"; const WRITE_PARQUET_BUFFER_SIZE_OPTION: &str = "write.parquet-buffer-size"; const SEQUENCE_FIELD_OPTION: &str = "sequence.field"; @@ -55,6 +59,7 @@ pub const SCAN_VERSION_OPTION: &str = "scan.version"; const DEFAULT_SOURCE_SPLIT_TARGET_SIZE: i64 = 128 * 1024 * 1024; const DEFAULT_SOURCE_SPLIT_OPEN_FILE_COST: i64 = 4 * 1024 * 1024; const DEFAULT_PARTITION_DEFAULT_NAME: &str = "__DEFAULT_PARTITION__"; +const DEFAULT_CHANGELOG_FILE_PREFIX: &str = "changelog-"; const DEFAULT_TARGET_FILE_SIZE: i64 = 256 * 1024 * 1024; const DEFAULT_WRITE_PARQUET_BUFFER_SIZE: i64 = 256 * 1024 * 1024; const DYNAMIC_BUCKET_TARGET_ROW_NUM_OPTION: &str = "dynamic-bucket.target-row-num"; @@ -75,6 +80,32 @@ pub enum MergeEngine { FirstRow, } +/// Changelog producer for table writes. +/// +/// Reference: Java `CoreOptions.ChangelogProducer`. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ChangelogProducer { + /// No changelog file. + None, + /// Double write input rows to changelog files. + Input, + /// Generate changelog files during full compaction. + FullCompaction, + /// Generate changelog files through lookup compaction. + Lookup, +} + +impl ChangelogProducer { + pub fn as_str(&self) -> &'static str { + match self { + Self::None => "none", + Self::Input => "input", + Self::FullCompaction => "full-compaction", + Self::Lookup => "lookup", + } + } +} + /// Format the bucket directory name for a given bucket number. /// Returns `"bucket-postpone"` for `POSTPONE_BUCKET` (-2), otherwise `"bucket-{N}"`. pub fn bucket_dir_name(bucket: i32) -> String { @@ -138,7 +169,7 @@ impl<'a> CoreOptions<'a> { } } - /// Changelog producer setting. Default is "none". + /// Raw changelog producer setting. Default is `"none"`. pub fn changelog_producer(&self) -> &str { self.options .get(CHANGELOG_PRODUCER_OPTION) @@ -146,6 +177,22 @@ impl<'a> CoreOptions<'a> { .unwrap_or("none") } + /// Typed changelog producer setting. Default is `None`. + pub fn try_changelog_producer(&self) -> crate::Result { + match self.options.get(CHANGELOG_PRODUCER_OPTION) { + None => Ok(ChangelogProducer::None), + Some(v) => match v.to_ascii_lowercase().as_str() { + "none" => Ok(ChangelogProducer::None), + "input" => Ok(ChangelogProducer::Input), + "full-compaction" => Ok(ChangelogProducer::FullCompaction), + "lookup" => Ok(ChangelogProducer::Lookup), + other => Err(crate::Error::Unsupported { + message: format!("Unsupported changelog-producer: '{other}'"), + }), + }, + } + } + /// The `rowkind.field` option: a user column whose value encodes the row kind. pub fn rowkind_field(&self) -> Option<&str> { self.options.get(ROWKIND_FIELD_OPTION).map(String::as_str) @@ -363,6 +410,55 @@ impl<'a> CoreOptions<'a> { .unwrap_or(1) } + /// File name prefix for changelog files. Default is `"changelog-"`. + pub fn changelog_file_prefix(&self) -> &str { + self.options + .get(CHANGELOG_FILE_PREFIX_OPTION) + .map(String::as_str) + .unwrap_or(DEFAULT_CHANGELOG_FILE_PREFIX) + } + + /// Effective file format for changelog files. + /// + /// When `changelog-file.format` is not configured, Java Paimon falls back + /// to the table `file.format`. + pub fn changelog_file_format(&self) -> &str { + self.options + .get(CHANGELOG_FILE_FORMAT_OPTION) + .map(String::as_str) + .unwrap_or_else(|| self.file_format()) + } + + /// Effective compression codec for changelog files. + /// + /// When `changelog-file.compression` is not configured, Java Paimon falls + /// back to the table `file.compression`. + pub fn changelog_file_compression(&self) -> &str { + self.options + .get(CHANGELOG_FILE_COMPRESSION_OPTION) + .map(String::as_str) + .unwrap_or_else(|| self.file_compression()) + } + + /// Metadata stats collection mode for changelog files, if configured. + pub fn changelog_file_stats_mode(&self) -> Option<&str> { + self.options + .get(CHANGELOG_FILE_STATS_MODE_OPTION) + .map(String::as_str) + } + + pub(crate) fn changelog_file_format_configured(&self) -> bool { + self.options.contains_key(CHANGELOG_FILE_FORMAT_OPTION) + } + + pub(crate) fn changelog_file_compression_configured(&self) -> bool { + self.options.contains_key(CHANGELOG_FILE_COMPRESSION_OPTION) + } + + pub(crate) fn changelog_file_stats_mode_configured(&self) -> bool { + self.options.contains_key(CHANGELOG_FILE_STATS_MODE_OPTION) + } + /// Parquet writer in-progress buffer size limit. Default is 256MB. /// When the buffered data exceeds this, the writer flushes the current row group. pub fn write_parquet_buffer_size(&self) -> i64 { @@ -546,6 +642,89 @@ mod tests { assert_eq!(core.merge_engine().unwrap(), MergeEngine::PartialUpdate); } + #[test] + fn test_changelog_producer_defaults_to_none() { + let options = HashMap::new(); + let core = CoreOptions::new(&options); + + assert_eq!(core.changelog_producer(), "none"); + assert_eq!( + core.try_changelog_producer().unwrap(), + ChangelogProducer::None + ); + } + + #[test] + fn test_changelog_producer_accepts_known_values() { + for (value, expected) in [ + ("none", ChangelogProducer::None), + ("input", ChangelogProducer::Input), + ("full-compaction", ChangelogProducer::FullCompaction), + ("lookup", ChangelogProducer::Lookup), + ("INPUT", ChangelogProducer::Input), + ] { + let options = HashMap::from([(CHANGELOG_PRODUCER_OPTION.to_string(), value.into())]); + let core = CoreOptions::new(&options); + + assert_eq!(core.try_changelog_producer().unwrap(), expected); + } + } + + #[test] + fn test_changelog_producer_rejects_unknown_values() { + let options = HashMap::from([(CHANGELOG_PRODUCER_OPTION.to_string(), "other".into())]); + let core = CoreOptions::new(&options); + + let err = core + .try_changelog_producer() + .expect_err("unknown producer should fail"); + assert!( + matches!(err, crate::Error::Unsupported { message } if message.contains("Unsupported changelog-producer")) + ); + } + + #[test] + fn test_changelog_file_options_defaults_and_overrides() { + let default_options = HashMap::from([ + (FILE_FORMAT_OPTION.to_string(), "avro".to_string()), + (FILE_COMPRESSION_OPTION.to_string(), "snappy".to_string()), + ]); + let default_core = CoreOptions::new(&default_options); + + assert_eq!(default_core.changelog_file_prefix(), "changelog-"); + assert_eq!(default_core.changelog_file_format(), "avro"); + assert_eq!(default_core.changelog_file_compression(), "snappy"); + assert_eq!(default_core.changelog_file_stats_mode(), None); + + let custom_options = HashMap::from([ + ( + CHANGELOG_FILE_PREFIX_OPTION.to_string(), + "custom-".to_string(), + ), + ( + CHANGELOG_FILE_FORMAT_OPTION.to_string(), + "parquet".to_string(), + ), + ( + CHANGELOG_FILE_COMPRESSION_OPTION.to_string(), + "zstd".to_string(), + ), + ( + CHANGELOG_FILE_STATS_MODE_OPTION.to_string(), + "counts".to_string(), + ), + ]); + let custom_core = CoreOptions::new(&custom_options); + + assert_eq!(custom_core.changelog_file_prefix(), "custom-"); + assert_eq!(custom_core.changelog_file_format(), "parquet"); + assert_eq!(custom_core.changelog_file_compression(), "zstd"); + assert_eq!(custom_core.changelog_file_stats_mode(), Some("counts")); + assert!(custom_core.changelog_file_format_configured()); + assert!(custom_core.changelog_file_compression_configured()); + assert!(custom_core.changelog_file_stats_mode_configured()); + } + #[test] fn test_commit_options_defaults() { let options = HashMap::new(); diff --git a/crates/paimon/src/table/commit_message.rs b/crates/paimon/src/table/commit_message.rs index ac7efb96..88d35332 100644 --- a/crates/paimon/src/table/commit_message.rs +++ b/crates/paimon/src/table/commit_message.rs @@ -29,6 +29,8 @@ pub struct CommitMessage { pub bucket: i32, /// New data files to be added. pub new_files: Vec, + /// New changelog files to be added. + pub new_changelog_files: Vec, /// New index files to be added (used by dynamic bucket mode). pub new_index_files: Vec, /// Files to be deleted (copy-on-write rewrite: old files replaced by new_files). @@ -41,6 +43,7 @@ impl CommitMessage { partition, bucket, new_files, + new_changelog_files: Vec::new(), new_index_files: Vec::new(), deleted_files: Vec::new(), } diff --git a/crates/paimon/src/table/kv_file_writer.rs b/crates/paimon/src/table/kv_file_writer.rs index 6b434455..4e538c15 100644 --- a/crates/paimon/src/table/kv_file_writer.rs +++ b/crates/paimon/src/table/kv_file_writer.rs @@ -31,10 +31,12 @@ use crate::io::FileIO; use crate::spec::stats::{compute_column_stats, BinaryTableStats}; use crate::spec::{ extract_datum_from_arrow, BinaryRowBuilder, DataFileMeta, DataType, MergeEngine, - PartialUpdateConfig, EMPTY_SERIALIZED_ROW, SEQUENCE_NUMBER_FIELD_NAME, VALUE_KIND_FIELD_NAME, + PartialUpdateConfig, RowKind, EMPTY_SERIALIZED_ROW, SEQUENCE_NUMBER_FIELD_NAME, + VALUE_KIND_FIELD_NAME, }; +use crate::table::prepared_files::PreparedFiles; use crate::Result; -use arrow_array::{Int64Array, Int8Array, RecordBatch}; +use arrow_array::{Array, Int64Array, Int8Array, RecordBatch, UInt32Array}; use arrow_ord::sort::{lexsort_to_indices, SortColumn, SortOptions}; use arrow_row::{RowConverter, SortField}; use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema}; @@ -55,6 +57,8 @@ pub(crate) struct KeyValueFileWriter { buffer_bytes: usize, /// Completed file metadata. written_files: Vec, + /// Completed changelog file metadata. + written_changelog_files: Vec, } /// Configuration for [`KeyValueFileWriter`], grouping file-location, schema, @@ -70,6 +74,10 @@ pub(crate) struct KeyValueWriteConfig { pub file_compression_zstd_level: i32, pub write_buffer_size: i64, pub file_format: String, + pub input_changelog: bool, + pub changelog_file_prefix: String, + pub changelog_file_compression: String, + pub changelog_file_format: String, /// Primary key column indices in the user schema. pub primary_key_indices: Vec, /// Paimon DataTypes for each primary key column (same order as primary_key_indices). @@ -82,6 +90,16 @@ pub(crate) struct KeyValueWriteConfig { pub deletion_vectors_enabled: bool, } +struct IndexedFileWrite<'a> { + file_prefix: &'a str, + file_ordinal: usize, + file_format: &'a str, + file_compression: &'a str, + min_sequence_number: i64, + max_sequence_number: i64, + delete_row_count: i64, +} + impl KeyValueFileWriter { pub(crate) fn new( file_io: FileIO, @@ -118,6 +136,7 @@ impl KeyValueFileWriter { buffer: Vec::new(), buffer_bytes: 0, written_files: Vec::new(), + written_changelog_files: Vec::new(), }) } @@ -218,23 +237,82 @@ impl KeyValueFileWriter { // FirstRow → keep first row per key group (lowest seq) // PartialUpdate → keep all rows for read-side field-wise merge let selected_indices = self.select_flush_indices(&combined, &sorted_indices)?; - let selected_num_rows = selected_indices.len(); + let selected_u32 = UInt32Array::from(selected_indices); - // Extract min_key / max_key from selected endpoints. - let first_row = selected_indices[0] as usize; - let last_row = selected_indices[selected_num_rows - 1] as usize; - let min_key = self.extract_key_binary_row(&combined, first_row)?; - let max_key = self.extract_key_binary_row(&combined, last_row)?; + let data_delete_row_count = Self::indexed_delete_row_count(&combined, &selected_u32)?; + let changelog_delete_row_count = if self.config.input_changelog { + Some(Self::indexed_delete_row_count(&combined, &sorted_indices)?) + } else { + None + }; - // Build physical schema and open writer. - let physical_schema = build_physical_schema(&user_schema); + let data_file = self + .write_indexed_file( + &combined, + seq_array.as_ref(), + &selected_u32, + IndexedFileWrite { + file_prefix: "data-", + file_ordinal: self.written_files.len(), + file_format: &self.config.file_format, + file_compression: &self.config.file_compression, + min_sequence_number: start_seq, + max_sequence_number: end_seq, + delete_row_count: data_delete_row_count, + }, + ) + .await?; + self.written_files.push(data_file); + + if let Some(delete_row_count) = changelog_delete_row_count { + let changelog_file = self + .write_indexed_file( + &combined, + seq_array.as_ref(), + &sorted_indices, + IndexedFileWrite { + file_prefix: &self.config.changelog_file_prefix, + file_ordinal: self.written_changelog_files.len(), + file_format: &self.config.changelog_file_format, + file_compression: &self.config.changelog_file_compression, + min_sequence_number: start_seq, + max_sequence_number: end_seq, + delete_row_count, + }, + ) + .await?; + self.written_changelog_files.push(changelog_file); + } + Ok(()) + } + + async fn write_indexed_file( + &self, + batch: &RecordBatch, + seq_array: &dyn Array, + indices: &UInt32Array, + write: IndexedFileWrite<'_>, + ) -> Result { + if indices.is_empty() { + return Err(crate::Error::DataInvalid { + message: "Cannot write an empty key-value data file".to_string(), + source: None, + }); + } - // Open file writer. + let user_schema = batch.schema(); + let first_row = indices.value(0) as usize; + let last_row = indices.value(indices.len() - 1) as usize; + let min_key = self.extract_key_binary_row(batch, first_row)?; + let max_key = self.extract_key_binary_row(batch, last_row)?; + + let physical_schema = build_physical_schema(&user_schema); let file_name = format!( - "data-{}-{}.{}", + "{}{}-{}.{}", + write.file_prefix, uuid::Uuid::new_v4(), - self.written_files.len(), - self.config.file_format, + write.file_ordinal, + write.file_format, ); let bucket_dir = if self.config.partition_path.is_empty() { format!( @@ -248,44 +326,42 @@ impl KeyValueFileWriter { ) }; self.file_io.mkdirs(&format!("{bucket_dir}/")).await?; - let file_path = format!("{}/{}", bucket_dir, file_name); + let file_path = format!("{bucket_dir}/{file_name}"); let output = self.file_io.new_output(&file_path)?; let mut writer = create_format_writer( &output, physical_schema.clone(), - &self.config.file_compression, + write.file_compression, self.config.file_compression_zstd_level, None, ) .await?; - // Chunked write using selected indices. - let selected_u32 = arrow_array::UInt32Array::from(selected_indices); - for chunk_start in (0..selected_num_rows).step_by(Self::FLUSH_CHUNK_ROWS) { - let chunk_len = Self::FLUSH_CHUNK_ROWS.min(selected_num_rows - chunk_start); - let chunk_indices = selected_u32.slice(chunk_start, chunk_len); + let vk_idx = batch + .schema() + .fields() + .iter() + .position(|f| f.name() == crate::spec::VALUE_KIND_FIELD_NAME); + + for chunk_start in (0..indices.len()).step_by(Self::FLUSH_CHUNK_ROWS) { + let chunk_len = Self::FLUSH_CHUNK_ROWS.min(indices.len() - chunk_start); + let chunk_indices = indices.slice(chunk_start, chunk_len); - let mut physical_columns: Vec> = Vec::new(); - // Sequence numbers for this chunk. + let mut physical_columns: Vec> = Vec::new(); physical_columns.push( - arrow_select::take::take(seq_array.as_ref(), &chunk_indices, None).map_err( - |e| crate::Error::DataInvalid { + arrow_select::take::take(seq_array, &chunk_indices, None).map_err(|e| { + crate::Error::DataInvalid { message: format!("Failed to reorder sequence numbers: {e}"), source: None, - }, - )?, + } + })?, ); - // Value kind column — resolve from batch schema. - let vk_idx = combined - .schema() - .fields() - .iter() - .position(|f| f.name() == crate::spec::VALUE_KIND_FIELD_NAME); + match vk_idx { Some(vk_idx) => { physical_columns.push( arrow_select::take::take( - combined.column(vk_idx).as_ref(), + batch.column(vk_idx).as_ref(), &chunk_indices, None, ) @@ -296,21 +372,20 @@ impl KeyValueFileWriter { ); } None => { - // All rows are INSERT (value_kind = 0). physical_columns.push(Arc::new(Int8Array::from(vec![0i8; chunk_len]))); } } - // All user columns (skip _VALUE_KIND if present — already handled above). - for idx in 0..combined.num_columns() { + + for idx in 0..batch.num_columns() { if Some(idx) == vk_idx { continue; } physical_columns.push( - arrow_select::take::take(combined.column(idx).as_ref(), &chunk_indices, None) + arrow_select::take::take(batch.column(idx).as_ref(), &chunk_indices, None) .map_err(|e| crate::Error::DataInvalid { - message: format!("Failed to reorder by sort indices: {e}"), - source: None, - })?, + message: format!("Failed to reorder by sort indices: {e}"), + source: None, + })?, ); } @@ -324,20 +399,20 @@ impl KeyValueFileWriter { let file_size = writer.close().await? as i64; - // Compute key_stats on selected output rows (not the raw combined batch). - let selected_key_columns: Vec> = self + let key_columns: Vec> = self .config .primary_key_indices .iter() .map(|&idx| { - arrow_select::take::take(combined.column(idx).as_ref(), &selected_u32, None) - .map_err(|e| crate::Error::DataInvalid { + arrow_select::take::take(batch.column(idx).as_ref(), indices, None).map_err(|e| { + crate::Error::DataInvalid { message: format!("Failed to take key column for stats: {e}"), source: None, - }) + } + }) }) .collect::>>()?; - let selected_key_batch = RecordBatch::try_new( + let key_batch = RecordBatch::try_new( Arc::new(ArrowSchema::new( self.config .primary_key_indices @@ -345,24 +420,23 @@ impl KeyValueFileWriter { .map(|&idx| user_schema.field(idx).clone()) .collect::>(), )), - selected_key_columns, + key_columns, ) .map_err(|e| crate::Error::DataInvalid { - message: format!("Failed to build selected key batch for stats: {e}"), + message: format!("Failed to build key batch for stats: {e}"), source: None, })?; let stats_col_indices: Vec = (0..self.config.primary_key_indices.len()).collect(); let key_stats = compute_column_stats( - &selected_key_batch, + &key_batch, &stats_col_indices, &self.config.primary_key_types, )?; - // Sequence numbers span the full assigned range. - let meta = DataFileMeta { + Ok(DataFileMeta { file_name, file_size, - row_count: selected_num_rows as i64, + row_count: indices.len() as i64, min_key, max_key, key_stats, @@ -371,22 +445,54 @@ impl KeyValueFileWriter { EMPTY_SERIALIZED_ROW.clone(), vec![], ), - min_sequence_number: start_seq, - max_sequence_number: end_seq, + min_sequence_number: write.min_sequence_number, + max_sequence_number: write.max_sequence_number, schema_id: self.config.schema_id, level: 0, extra_files: vec![], creation_time: Some(Utc::now()), - delete_row_count: Some(0), + delete_row_count: Some(write.delete_row_count), embedded_index: None, file_source: Some(0), // FileSource.APPEND value_stats_cols: Some(vec![]), external_path: None, first_row_id: None, write_cols: None, + }) + } + + fn indexed_delete_row_count(batch: &RecordBatch, indices: &UInt32Array) -> Result { + let Some(vk_idx) = batch + .schema() + .fields() + .iter() + .position(|f| f.name() == crate::spec::VALUE_KIND_FIELD_NAME) + else { + return Ok(0); }; - self.written_files.push(meta); - Ok(()) + + let column = batch.column(vk_idx); + let Some(value_kinds) = column.as_any().downcast_ref::() else { + return Err(crate::Error::DataInvalid { + message: "_VALUE_KIND column must be Int8".to_string(), + source: None, + }); + }; + + let mut delete_count = 0; + for idx in 0..indices.len() { + let row = indices.value(idx) as usize; + let value = if column.is_null(row) { + 0 + } else { + value_kinds.value(row) + }; + match RowKind::from_value(value)? { + RowKind::UpdateBefore | RowKind::Delete => delete_count += 1, + RowKind::Insert | RowKind::UpdateAfter => {} + } + } + Ok(delete_count) } /// Select output row indices from sorted inputs according to merge engine. @@ -478,9 +584,12 @@ impl KeyValueFileWriter { } /// Flush remaining buffer and return all written file metadata. - pub(crate) async fn prepare_commit(&mut self) -> Result> { + pub(crate) async fn prepare_commit(&mut self) -> Result { self.flush().await?; - Ok(std::mem::take(&mut self.written_files)) + Ok(PreparedFiles { + data_files: std::mem::take(&mut self.written_files), + changelog_files: std::mem::take(&mut self.written_changelog_files), + }) } /// Extract primary key columns from a batch at a given row index into a serialized BinaryRow. @@ -549,6 +658,10 @@ mod tests { file_compression_zstd_level: 0, write_buffer_size: 1024, file_format: "parquet".to_string(), + input_changelog: false, + changelog_file_prefix: "changelog-".to_string(), + changelog_file_compression: "none".to_string(), + changelog_file_format: "parquet".to_string(), primary_key_indices: vec![0], primary_key_types: vec![DataType::Int(IntType::new())], sequence_field_indices: vec![1], @@ -621,6 +734,33 @@ mod tests { assert_eq!(selected, vec![0, 1]); } + #[test] + fn test_indexed_delete_row_count_rejects_invalid_value_kind() { + let schema = Arc::new(ArrowSchema::new(vec![ + Arc::new(ArrowField::new("id", ArrowDataType::Int32, false)), + Arc::new(ArrowField::new( + VALUE_KIND_FIELD_NAME, + ArrowDataType::Int8, + false, + )), + ])); + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(Int32Array::from(vec![1])) as Arc, + Arc::new(Int8Array::from(vec![4])) as Arc, + ], + ) + .unwrap(); + let indices = UInt32Array::from(vec![0]); + + let err = KeyValueFileWriter::indexed_delete_row_count(&batch, &indices).unwrap_err(); + + assert!( + matches!(err, crate::Error::DataInvalid { message, .. } if message.contains("Invalid RowKind value")) + ); + } + #[test] fn test_new_rejects_partial_update_dynamic_bucket() { let mut config = test_write_config(MergeEngine::PartialUpdate); diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs index c84d7c8c..cbff76ab 100644 --- a/crates/paimon/src/table/mod.rs +++ b/crates/paimon/src/table/mod.rs @@ -39,6 +39,7 @@ mod kv_file_reader; mod kv_file_writer; mod partition_filter; mod postpone_file_writer; +mod prepared_files; mod read_builder; pub(crate) mod rest_env; pub(crate) mod row_id_predicate; diff --git a/crates/paimon/src/table/prepared_files.rs b/crates/paimon/src/table/prepared_files.rs new file mode 100644 index 00000000..d2c89f93 --- /dev/null +++ b/crates/paimon/src/table/prepared_files.rs @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::spec::DataFileMeta; + +/// Files produced by closing a writer. +pub(crate) struct PreparedFiles { + pub data_files: Vec, + pub changelog_files: Vec, +} + +impl PreparedFiles { + pub(crate) fn data(data_files: Vec) -> Self { + Self { + data_files, + changelog_files: Vec::new(), + } + } +} diff --git a/crates/paimon/src/table/table_commit.rs b/crates/paimon/src/table/table_commit.rs index 0de494ea..4e0b46b9 100644 --- a/crates/paimon/src/table/table_commit.rs +++ b/crates/paimon/src/table/table_commit.rs @@ -98,9 +98,11 @@ impl TableCommit { } let entries = self.messages_to_entries(&commit_messages); + let changelog_entries = self.messages_to_changelog_entries(&commit_messages); let new_index_entries = self.messages_to_index_entries(&commit_messages); self.try_commit(CommitEntriesPlan::Direct { entries, + changelog_entries, new_index_entries, }) .await @@ -126,6 +128,14 @@ impl TableCommit { if commit_messages.is_empty() && static_partitions.is_none() { return Ok(()); } + if commit_messages + .iter() + .any(|msg| !msg.new_changelog_files.is_empty()) + { + return Err(crate::Error::Unsupported { + message: "overwrite with changelog files is not supported".to_string(), + }); + } let new_entries = self.messages_to_entries(&commit_messages); let new_index_entries = self.messages_to_index_entries(&commit_messages); @@ -298,7 +308,7 @@ impl TableCommit { let latest_snapshot = self.snapshot_manager.get_latest_snapshot().await?; let resolved = self.resolve_commit(&plan, &latest_snapshot).await?; - if resolved.entries.is_empty() { + if resolved.entries.is_empty() && resolved.changelog_entries.is_empty() { break; } @@ -376,11 +386,15 @@ impl TableCommit { let unique_id = uuid::Uuid::new_v4(); let base_manifest_list_name = format!("manifest-list-{unique_id}-0"); let delta_manifest_list_name = format!("manifest-list-{unique_id}-1"); + let changelog_manifest_list_name = format!("manifest-list-{unique_id}-2"); let new_manifest_name = format!("manifest-{}-0", uuid::Uuid::new_v4()); + let changelog_manifest_name = format!("manifest-{}-1", uuid::Uuid::new_v4()); let base_manifest_list_path = format!("{manifest_dir}/{base_manifest_list_name}"); let delta_manifest_list_path = format!("{manifest_dir}/{delta_manifest_list_name}"); + let changelog_manifest_list_path = format!("{manifest_dir}/{changelog_manifest_list_name}"); let new_manifest_path = format!("{manifest_dir}/{new_manifest_name}"); + let changelog_manifest_path = format!("{manifest_dir}/{changelog_manifest_name}"); // Write manifest file let new_manifest_file_meta = self @@ -400,6 +414,32 @@ impl TableCommit { ) .await?; + let changelog_record_count = if resolved.changelog_entries.is_empty() { + None + } else { + let changelog_manifest_file_meta = self + .write_manifest_file( + file_io, + &changelog_manifest_path, + &changelog_manifest_name, + &resolved.changelog_entries, + ) + .await?; + ManifestList::write( + file_io, + &changelog_manifest_list_path, + &[changelog_manifest_file_meta], + ) + .await?; + Some( + resolved + .changelog_entries + .iter() + .map(|entry| entry.file().row_count) + .sum(), + ) + }; + // Read existing manifests (base + delta from previous snapshot) and write base manifest list let mut total_record_count: i64 = 0; let existing_manifest_files = if let Some(snap) = latest_snapshot { @@ -441,6 +481,8 @@ impl TableCommit { .time_millis(current_time_millis()) .total_record_count(Some(total_record_count)) .delta_record_count(Some(delta_record_count)) + .changelog_manifest_list(changelog_record_count.map(|_| changelog_manifest_list_name)) + .changelog_record_count(changelog_record_count) .next_row_id(next_row_id) .index_manifest(resolved.index_manifest_name) .build(); @@ -533,6 +575,7 @@ impl TableCommit { match plan { CommitEntriesPlan::Direct { entries, + changelog_entries, new_index_entries, } => { if self.row_tracking_enabled { @@ -576,6 +619,7 @@ impl TableCommit { Ok(ResolvedCommit { entries: entries.clone(), + changelog_entries: changelog_entries.clone(), kind, index_manifest_name, }) @@ -613,6 +657,7 @@ impl TableCommit { Ok(ResolvedCommit { entries, + changelog_entries: vec![], kind: CommitKind::OVERWRITE, index_manifest_name, }) @@ -1031,6 +1076,25 @@ impl TableCommit { .collect() } + /// Convert commit messages to changelog manifest entries (ADD kind only). + fn messages_to_changelog_entries(&self, messages: &[CommitMessage]) -> Vec { + messages + .iter() + .flat_map(|msg| { + msg.new_changelog_files.iter().map(|file| { + ManifestEntry::new( + FileKind::Add, + msg.partition.clone(), + msg.bucket, + self.total_buckets, + file.clone(), + 2, + ) + }) + }) + .collect() + } + /// Convert commit messages to index manifest entries (ADD kind). fn messages_to_index_entries(&self, messages: &[CommitMessage]) -> Vec { messages @@ -1056,6 +1120,7 @@ enum CommitEntriesPlan { /// rewrites, in which case `resolve_commit` auto-promotes to `CommitKind::OVERWRITE`. Direct { entries: Vec, + changelog_entries: Vec, new_index_entries: Vec, }, /// Overwrite with optional partition filter. @@ -1069,6 +1134,7 @@ enum CommitEntriesPlan { /// Fully resolved commit ready for writing. struct ResolvedCommit { entries: Vec, + changelog_entries: Vec, kind: CommitKind, index_manifest_name: Option, } @@ -1622,6 +1688,22 @@ mod tests { assert_eq!(snapshot.total_record_count(), Some(350)); } + #[tokio::test] + async fn test_overwrite_rejects_changelog_files() { + let file_io = test_file_io(); + let table_path = "memory:/test_overwrite_changelog_files"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_commit(&file_io, table_path); + let mut message = CommitMessage::new(vec![], 0, vec![test_data_file("data.parquet", 1)]); + message.new_changelog_files = vec![test_data_file("changelog.parquet", 1)]; + + let err = commit.overwrite(vec![message], None).await.unwrap_err(); + assert!( + matches!(err, crate::Error::Unsupported { message } if message.contains("changelog files")) + ); + } + #[tokio::test] async fn test_delete_conflict_rejects_missing_file() { let file_io = test_file_io(); diff --git a/crates/paimon/src/table/table_write.rs b/crates/paimon/src/table/table_write.rs index 167cc850..40d7d977 100644 --- a/crates/paimon/src/table/table_write.rs +++ b/crates/paimon/src/table/table_write.rs @@ -21,10 +21,10 @@ //! and [pypaimon FileStoreWrite](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/write/file_store_write.py) use crate::arrow::build_target_arrow_schema; -use crate::spec::DataFileMeta; use crate::spec::PartitionComputer; use crate::spec::{ - BinaryRow, CoreOptions, DataType, MergeEngine, EMPTY_SERIALIZED_ROW, POSTPONE_BUCKET, + BinaryRow, ChangelogProducer, CoreOptions, DataType, MergeEngine, EMPTY_SERIALIZED_ROW, + POSTPONE_BUCKET, }; use crate::table::blob_file_writer::AppendBlobFileWriter; use crate::table::bucket_assigner::{BucketAssignerEnum, PartitionBucketKey}; @@ -37,6 +37,7 @@ use crate::table::data_file_writer::DataFileWriter; use crate::table::kv_file_writer::{KeyValueFileWriter, KeyValueWriteConfig}; use crate::table::partition_filter::PartitionFilter; use crate::table::postpone_file_writer::{PostponeFileWriter, PostponeWriteConfig}; +use crate::table::prepared_files::PreparedFiles; use crate::table::{SnapshotManager, Table, TableScan}; use crate::Result; use arrow_array::RecordBatch; @@ -61,12 +62,12 @@ impl FileWriter { } } - async fn prepare_commit(mut self) -> Result> { + async fn prepare_commit(mut self) -> Result { match self { - FileWriter::Append(ref mut w) => w.prepare_commit().await, - FileWriter::AppendBlob(ref mut w) => w.prepare_commit().await, + FileWriter::Append(ref mut w) => w.prepare_commit().await.map(PreparedFiles::data), + FileWriter::AppendBlob(ref mut w) => w.prepare_commit().await.map(PreparedFiles::data), FileWriter::KeyValue(ref mut w) => w.prepare_commit().await, - FileWriter::Postpone(ref mut w) => w.prepare_commit().await, + FileWriter::Postpone(ref mut w) => w.prepare_commit().await.map(PreparedFiles::data), } } } @@ -96,6 +97,10 @@ pub struct TableWrite { primary_key_types: Vec, sequence_field_indices: Vec, merge_engine: MergeEngine, + changelog_producer: ChangelogProducer, + changelog_file_prefix: String, + changelog_file_format: String, + changelog_file_compression: String, partition_seq_cache: HashMap, HashMap>, commit_user: String, /// Bucket assignment strategy (fixed, dynamic, or cross-partition). @@ -138,6 +143,7 @@ impl TableWrite { let total_buckets = core_options.bucket(); let has_primary_keys = !schema.primary_keys().is_empty(); let is_dynamic_bucket = has_primary_keys && total_buckets == -1; + let changelog_producer = core_options.try_changelog_producer()?; let is_cross_partition = is_dynamic_bucket && !schema.partition_keys().is_empty() && { let pk_set: HashSet<&str> = schema.primary_keys().iter().map(String::as_str).collect(); @@ -158,16 +164,14 @@ impl TableWrite { ), }); } - if has_primary_keys - && total_buckets != POSTPONE_BUCKET - && core_options - .changelog_producer() - .eq_ignore_ascii_case("input") - { - return Err(crate::Error::Unsupported { - message: "KeyValueFileWriter does not support changelog-producer=input".to_string(), - }); - } + + Self::validate_changelog_write_options( + &core_options, + changelog_producer, + has_primary_keys, + total_buckets, + is_cross_partition, + )?; if !has_primary_keys && total_buckets != -1 && core_options.bucket_key().is_none() { return Err(crate::Error::Unsupported { @@ -179,6 +183,9 @@ impl TableWrite { let file_compression = core_options.file_compression().to_string(); let file_compression_zstd_level = core_options.file_compression_zstd_level(); let file_format = core_options.file_format().to_string(); + let changelog_file_prefix = core_options.changelog_file_prefix().to_string(); + let changelog_file_format = core_options.changelog_file_format().to_string(); + let changelog_file_compression = core_options.changelog_file_compression().to_string(); let write_buffer_size = core_options.write_parquet_buffer_size(); let partition_keys: Vec = schema.partition_keys().to_vec(); let fields = schema.fields(); @@ -285,6 +292,10 @@ impl TableWrite { primary_key_types, sequence_field_indices, merge_engine, + changelog_producer, + changelog_file_prefix, + changelog_file_format, + changelog_file_compression, partition_seq_cache: HashMap::new(), commit_user, bucket_assigner, @@ -294,6 +305,88 @@ impl TableWrite { }) } + fn validate_changelog_write_options( + core_options: &CoreOptions<'_>, + producer: ChangelogProducer, + has_primary_keys: bool, + total_buckets: i32, + is_cross_partition: bool, + ) -> Result<()> { + match producer { + ChangelogProducer::None => Ok(()), + _ if !has_primary_keys => Err(crate::Error::Unsupported { + message: format!( + "changelog-producer={} is only supported for primary-key tables", + producer.as_str() + ), + }), + ChangelogProducer::FullCompaction | ChangelogProducer::Lookup => { + Err(crate::Error::Unsupported { + message: format!( + "changelog-producer={} is not supported by the write path yet", + producer.as_str() + ), + }) + } + ChangelogProducer::Input => { + let merge_engine = core_options.merge_engine()?; + if merge_engine != MergeEngine::Deduplicate { + return Err(crate::Error::Unsupported { + message: format!( + "changelog-producer=input only supports merge-engine=deduplicate, found merge-engine={}", + match merge_engine { + MergeEngine::Deduplicate => "deduplicate", + MergeEngine::PartialUpdate => "partial-update", + MergeEngine::FirstRow => "first-row", + } + ), + }); + } + if is_cross_partition { + return Err(crate::Error::Unsupported { + message: + "changelog-producer=input does not support cross-partition dynamic bucket tables" + .to_string(), + }); + } + if total_buckets == POSTPONE_BUCKET { + return Err(crate::Error::Unsupported { + message: "changelog-producer=input does not support bucket=-2".to_string(), + }); + } + if core_options.rowkind_field().is_some() { + return Err(crate::Error::Unsupported { + message: "changelog-producer=input does not support rowkind.field" + .to_string(), + }); + } + if core_options.changelog_file_format_configured() { + return Err(crate::Error::Unsupported { + message: + "changelog-file.format is not supported for changelog-producer=input yet" + .to_string(), + }); + } + if core_options.changelog_file_compression_configured() { + return Err(crate::Error::Unsupported { + message: + "changelog-file.compression is not supported for changelog-producer=input yet" + .to_string(), + }); + } + if core_options.changelog_file_stats_mode_configured() { + return Err(crate::Error::Unsupported { + message: + "changelog-file.stats-mode is not supported for changelog-producer=input yet" + .to_string(), + }); + } + + Ok(()) + } + } + } + /// Scan the latest snapshot for a specific partition and return a map of /// bucket → (max_sequence_number + 1) for each bucket in that partition. async fn scan_partition_sequence_numbers( @@ -508,6 +601,12 @@ impl TableWrite { /// Close all writers and collect CommitMessages for use with TableCommit. /// Writers are cleared after this call, allowing the TableWrite to be reused. pub async fn prepare_commit(&mut self) -> Result> { + if self.is_overwrite && self.changelog_producer == ChangelogProducer::Input { + return Err(crate::Error::Unsupported { + message: "overwrite with changelog-producer=input is not supported".to_string(), + }); + } + let writers: Vec<(PartitionBucketKey, FileWriter)> = self.partition_writers.drain().collect(); @@ -533,8 +632,12 @@ impl TableWrite { for (partition_bytes, bucket, files) in results { let key = (partition_bytes.clone(), bucket); let index_files = index_files_by_key.remove(&key).unwrap_or_default(); - if !files.is_empty() || !index_files.is_empty() { - let mut msg = CommitMessage::new(partition_bytes, bucket, files); + if !files.data_files.is_empty() + || !files.changelog_files.is_empty() + || !index_files.is_empty() + { + let mut msg = CommitMessage::new(partition_bytes, bucket, files.data_files); + msg.new_changelog_files = files.changelog_files; msg.new_index_files = index_files; messages.push(msg); } @@ -672,6 +775,10 @@ impl TableWrite { file_compression_zstd_level: self.file_compression_zstd_level, write_buffer_size: self.write_buffer_size, file_format: self.file_format.clone(), + input_changelog: self.changelog_producer == ChangelogProducer::Input, + changelog_file_prefix: self.changelog_file_prefix.clone(), + changelog_file_compression: self.changelog_file_compression.clone(), + changelog_file_format: self.changelog_file_format.clone(), primary_key_indices: self.primary_key_indices.clone(), primary_key_types: self.primary_key_types.clone(), sequence_field_indices: self.sequence_field_indices.clone(), @@ -691,15 +798,18 @@ impl TableWrite { #[cfg(test)] mod tests { use super::*; + use crate::arrow::format::create_format_reader; use crate::catalog::Identifier; use crate::io::{FileIO, FileIOBuilder}; use crate::spec::{ - BinaryRowBuilder, BlobType, DataType, DecimalType, IntType, LocalZonedTimestampType, - Schema, TableSchema, TimestampType, VarCharType, + bucket_dir_name, BigIntType, BinaryRowBuilder, BlobType, DataField, DataType, DecimalType, + FileKind, IndexManifest, IntType, LocalZonedTimestampType, Manifest, ManifestList, Schema, + TableSchema, TimestampType, TinyIntType, VarCharType, SEQUENCE_NUMBER_FIELD_ID, + SEQUENCE_NUMBER_FIELD_NAME, VALUE_KIND_FIELD_ID, VALUE_KIND_FIELD_NAME, }; use crate::table::{SnapshotManager, TableCommit}; - use arrow_array::Int32Array; use arrow_array::RecordBatchReader as _; + use arrow_array::{Int32Array, Int64Array, Int8Array}; use arrow_schema::{ DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, TimeUnit, }; @@ -784,6 +894,147 @@ mod tests { .unwrap() } + fn make_batch_with_value_kind( + ids: Vec, + values: Vec, + value_kinds: Vec, + ) -> RecordBatch { + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("id", ArrowDataType::Int32, false), + ArrowField::new("value", ArrowDataType::Int32, false), + ArrowField::new( + crate::spec::VALUE_KIND_FIELD_NAME, + ArrowDataType::Int8, + false, + ), + ])); + RecordBatch::try_new( + schema, + vec![ + Arc::new(Int32Array::from(ids)), + Arc::new(Int32Array::from(values)), + Arc::new(Int8Array::from(value_kinds)), + ], + ) + .unwrap() + } + + fn make_partitioned_batch_with_value_kind( + pts: Vec<&str>, + ids: Vec, + values: Vec, + value_kinds: Vec, + ) -> RecordBatch { + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("pt", ArrowDataType::Utf8, false), + ArrowField::new("id", ArrowDataType::Int32, false), + ArrowField::new("value", ArrowDataType::Int32, false), + ArrowField::new( + crate::spec::VALUE_KIND_FIELD_NAME, + ArrowDataType::Int8, + false, + ), + ])); + RecordBatch::try_new( + schema, + vec![ + Arc::new(arrow_array::StringArray::from(pts)), + Arc::new(Int32Array::from(ids)), + Arc::new(Int32Array::from(values)), + Arc::new(Int8Array::from(value_kinds)), + ], + ) + .unwrap() + } + + fn physical_key_value_fields() -> Vec { + vec![ + DataField::new( + SEQUENCE_NUMBER_FIELD_ID, + SEQUENCE_NUMBER_FIELD_NAME.to_string(), + DataType::BigInt(BigIntType::new()), + ), + DataField::new( + VALUE_KIND_FIELD_ID, + VALUE_KIND_FIELD_NAME.to_string(), + DataType::TinyInt(TinyIntType::new()), + ), + DataField::new(0, "id".to_string(), DataType::Int(IntType::new())), + DataField::new(1, "value".to_string(), DataType::Int(IntType::new())), + ] + } + + async fn read_physical_key_value_batches( + file_io: &FileIO, + file_path: &str, + file_size: i64, + ) -> Vec { + let format_reader = create_format_reader(file_path, false).unwrap(); + let input = file_io.new_input(file_path).unwrap(); + let file_reader = input.reader().await.unwrap(); + let read_fields = physical_key_value_fields(); + let stream = format_reader + .read_batch_stream( + Box::new(file_reader), + file_size as u64, + &read_fields, + None, + None, + None, + ) + .await + .unwrap(); + futures::TryStreamExt::try_collect(stream).await.unwrap() + } + + fn collect_i32(batches: &[RecordBatch], column: usize) -> Vec { + batches + .iter() + .flat_map(|batch| { + batch + .column(column) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .copied() + }) + .collect() + } + + fn collect_i64(batches: &[RecordBatch], column: usize) -> Vec { + batches + .iter() + .flat_map(|batch| { + batch + .column(column) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .copied() + }) + .collect() + } + + fn collect_i8(batches: &[RecordBatch], column: usize) -> Vec { + batches + .iter() + .flat_map(|batch| { + batch + .column(column) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .copied() + }) + .collect() + } + fn make_partitioned_batch(pts: Vec<&str>, ids: Vec) -> RecordBatch { let schema = Arc::new(ArrowSchema::new(vec![ ArrowField::new("pt", ArrowDataType::Utf8, false), @@ -1393,6 +1644,457 @@ mod tests { ) } + fn pk_changelog_schema(options: &[(&str, &str)]) -> TableSchema { + let mut builder = Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())) + .primary_key(["id"]) + .option("bucket", "1"); + for (key, value) in options { + builder = builder.option(*key, *value); + } + TableSchema::new(0, &builder.build().unwrap()) + } + + fn non_pk_changelog_schema(options: &[(&str, &str)]) -> TableSchema { + let mut builder = Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())); + for (key, value) in options { + builder = builder.option(*key, *value); + } + TableSchema::new(0, &builder.build().unwrap()) + } + + fn partitioned_dynamic_pk_changelog_schema() -> TableSchema { + let schema = Schema::builder() + .column("pt", DataType::VarChar(VarCharType::string_type())) + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())) + .partition_keys(["pt"]) + .primary_key(["id"]) + .option("changelog-producer", "input") + .build() + .unwrap(); + TableSchema::new(0, &schema) + } + + fn ordinary_dynamic_pk_changelog_schema() -> TableSchema { + let schema = Schema::builder() + .column("pt", DataType::VarChar(VarCharType::string_type())) + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())) + .partition_keys(["pt"]) + .primary_key(["pt", "id"]) + .option("changelog-producer", "input") + .build() + .unwrap(); + TableSchema::new(0, &schema) + } + + fn table_write_new_unsupported(schema: TableSchema, expected_message: &str) { + let table = Table::new( + test_file_io(), + Identifier::new("default", "test_changelog_validation"), + "memory:/test_changelog_validation".to_string(), + schema, + None, + ); + + let err = match TableWrite::new(&table, "test-user".to_string()) { + Ok(_) => panic!("TableWrite::new should reject this schema"), + Err(err) => err, + }; + assert!( + matches!(err, crate::Error::Unsupported { ref message } if message.contains(expected_message)), + "expected unsupported error containing '{expected_message}', got {err:?}" + ); + } + + #[test] + fn test_rejects_unsupported_changelog_producer_configurations() { + let cases = vec![ + ( + non_pk_changelog_schema(&[("changelog-producer", "input")]), + "only supported for primary-key tables", + ), + ( + non_pk_changelog_schema(&[("changelog-producer", "lookup")]), + "changelog-producer=lookup", + ), + ( + pk_changelog_schema(&[("changelog-producer", "unknown")]), + "Unsupported changelog-producer", + ), + ( + pk_changelog_schema(&[("changelog-producer", "full-compaction")]), + "changelog-producer=full-compaction", + ), + ( + pk_changelog_schema(&[("changelog-producer", "lookup")]), + "changelog-producer=lookup", + ), + ]; + + for (schema, expected_message) in cases { + table_write_new_unsupported(schema, expected_message); + } + } + + #[test] + fn test_rejects_unsupported_input_changelog_options() { + let rowkind_schema = Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())) + .column("op", DataType::Int(IntType::new())) + .primary_key(["id"]) + .option("bucket", "1") + .option("changelog-producer", "input") + .option("rowkind.field", "op") + .build() + .unwrap(); + + let cases = vec![ + ( + pk_changelog_schema(&[ + ("changelog-producer", "input"), + ("merge-engine", "partial-update"), + ]), + "merge-engine=partial-update", + ), + ( + pk_changelog_schema(&[ + ("changelog-producer", "input"), + ("merge-engine", "first-row"), + ]), + "merge-engine=first-row", + ), + ( + pk_changelog_schema(&[("changelog-producer", "input"), ("bucket", "-2")]), + "bucket=-2", + ), + ( + partitioned_dynamic_pk_changelog_schema(), + "cross-partition dynamic bucket", + ), + (TableSchema::new(0, &rowkind_schema), "rowkind.field"), + ( + pk_changelog_schema(&[ + ("changelog-producer", "input"), + ("changelog-file.format", "avro"), + ]), + "changelog-file.format", + ), + ( + pk_changelog_schema(&[ + ("changelog-producer", "input"), + ("changelog-file.compression", "snappy"), + ]), + "changelog-file.compression", + ), + ( + pk_changelog_schema(&[ + ("changelog-producer", "input"), + ("changelog-file.stats-mode", "counts"), + ]), + "changelog-file.stats-mode", + ), + ]; + + for (schema, expected_message) in cases { + table_write_new_unsupported(schema, expected_message); + } + } + + #[tokio::test] + async fn test_input_changelog_writes_raw_rows_separately_from_data_rows() { + let file_io = test_file_io(); + let table_path = "memory:/test_input_changelog_duplicate_pk"; + setup_dirs(&file_io, table_path).await; + + let table = Table::new( + file_io.clone(), + Identifier::new("default", "test_input_changelog"), + table_path.to_string(), + pk_changelog_schema(&[ + ("changelog-producer", "input"), + ("changelog-file.prefix", "custom-changelog-"), + ]), + None, + ); + + let mut table_write = TableWrite::new(&table, "test-user".to_string()).unwrap(); + table_write + .write_arrow_batch(&make_batch(vec![1, 1], vec![10, 20])) + .await + .unwrap(); + + let messages = table_write.prepare_commit().await.unwrap(); + assert_eq!(messages.len(), 1); + assert_eq!(messages[0].new_files.len(), 1); + assert_eq!(messages[0].new_files[0].row_count, 1); + assert_eq!(messages[0].new_changelog_files.len(), 1); + assert_eq!(messages[0].new_changelog_files[0].row_count, 2); + assert!(messages[0].new_files[0].file_name.starts_with("data-")); + assert!(messages[0].new_changelog_files[0] + .file_name + .starts_with("custom-changelog-")); + + let bucket_dir = bucket_dir_name(messages[0].bucket); + let data_file = &messages[0].new_files[0]; + let data_file_path = format!("{table_path}/{bucket_dir}/{}", data_file.file_name); + let data_batches = + read_physical_key_value_batches(&file_io, &data_file_path, data_file.file_size).await; + assert_eq!(collect_i64(&data_batches, 0), vec![1]); + assert_eq!(collect_i8(&data_batches, 1), vec![0]); + assert_eq!(collect_i32(&data_batches, 2), vec![1]); + assert_eq!(collect_i32(&data_batches, 3), vec![20]); + + let changelog_file = &messages[0].new_changelog_files[0]; + let changelog_file_path = format!("{table_path}/{bucket_dir}/{}", changelog_file.file_name); + let changelog_batches = read_physical_key_value_batches( + &file_io, + &changelog_file_path, + changelog_file.file_size, + ) + .await; + assert_eq!(collect_i64(&changelog_batches, 0), vec![0, 1]); + assert_eq!(collect_i8(&changelog_batches, 1), vec![0, 0]); + assert_eq!(collect_i32(&changelog_batches, 2), vec![1, 1]); + assert_eq!(collect_i32(&changelog_batches, 3), vec![10, 20]); + } + + #[tokio::test] + async fn test_input_changelog_metadata_counts_retract_rows() { + let file_io = test_file_io(); + let table_path = "memory:/test_input_changelog_retract_rows"; + setup_dirs(&file_io, table_path).await; + + let table = Table::new( + file_io, + Identifier::new("default", "test_input_changelog"), + table_path.to_string(), + pk_changelog_schema(&[("changelog-producer", "input")]), + None, + ); + + let mut table_write = TableWrite::new(&table, "test-user".to_string()).unwrap(); + table_write + .write_arrow_batch(&make_batch_with_value_kind( + vec![1, 2, 3], + vec![10, 20, 30], + vec![0, 1, 3], + )) + .await + .unwrap(); + + let messages = table_write.prepare_commit().await.unwrap(); + assert_eq!(messages[0].new_files[0].delete_row_count, Some(2)); + assert_eq!(messages[0].new_changelog_files[0].delete_row_count, Some(2)); + } + + #[tokio::test] + async fn test_input_changelog_rejects_invalid_value_kind() { + let file_io = test_file_io(); + let table_path = "memory:/test_input_changelog_invalid_value_kind"; + setup_dirs(&file_io, table_path).await; + + let table = Table::new( + file_io, + Identifier::new("default", "test_input_changelog"), + table_path.to_string(), + pk_changelog_schema(&[("changelog-producer", "input")]), + None, + ); + + let mut table_write = TableWrite::new(&table, "test-user".to_string()).unwrap(); + table_write + .write_arrow_batch(&make_batch_with_value_kind(vec![1], vec![10], vec![4])) + .await + .unwrap(); + + let err = table_write.prepare_commit().await.unwrap_err(); + assert!( + matches!(err, crate::Error::DataInvalid { message, .. } if message.contains("Invalid RowKind value")) + ); + } + + #[tokio::test] + async fn test_input_changelog_commit_writes_changelog_manifest_metadata() { + let file_io = test_file_io(); + let table_path = "memory:/test_input_changelog_commit"; + setup_dirs(&file_io, table_path).await; + + let table = Table::new( + file_io.clone(), + Identifier::new("default", "test_input_changelog"), + table_path.to_string(), + pk_changelog_schema(&[("changelog-producer", "input")]), + None, + ); + + let mut table_write = TableWrite::new(&table, "test-user".to_string()).unwrap(); + table_write + .write_arrow_batch(&make_batch(vec![1, 1], vec![10, 20])) + .await + .unwrap(); + let messages = table_write.prepare_commit().await.unwrap(); + let data_file_name = messages[0].new_files[0].file_name.clone(); + let changelog_file_name = messages[0].new_changelog_files[0].file_name.clone(); + + let commit = TableCommit::new(table, "test-user".to_string()); + commit.commit(messages).await.unwrap(); + + let snap_manager = SnapshotManager::new(file_io.clone(), table_path.to_string()); + let snapshot = snap_manager.get_latest_snapshot().await.unwrap().unwrap(); + assert_eq!(snapshot.total_record_count(), Some(1)); + assert_eq!(snapshot.delta_record_count(), Some(1)); + assert_eq!(snapshot.changelog_record_count(), Some(2)); + + let manifest_dir = format!("{table_path}/manifest"); + let delta_metas = ManifestList::read( + &file_io, + &format!("{manifest_dir}/{}", snapshot.delta_manifest_list()), + ) + .await + .unwrap(); + let delta_entries = Manifest::read( + &file_io, + &format!("{manifest_dir}/{}", delta_metas[0].file_name()), + ) + .await + .unwrap(); + assert_eq!(delta_entries.len(), 1); + assert_eq!(*delta_entries[0].kind(), FileKind::Add); + assert_eq!(delta_entries[0].file().file_name, data_file_name); + + let changelog_list = snapshot + .changelog_manifest_list() + .expect("changelog manifest list"); + let changelog_metas = + ManifestList::read(&file_io, &format!("{manifest_dir}/{changelog_list}")) + .await + .unwrap(); + let changelog_entries = Manifest::read( + &file_io, + &format!("{manifest_dir}/{}", changelog_metas[0].file_name()), + ) + .await + .unwrap(); + assert_eq!(changelog_entries.len(), 1); + assert_eq!(*changelog_entries[0].kind(), FileKind::Add); + assert_eq!(changelog_entries[0].file().file_name, changelog_file_name); + assert_eq!(changelog_entries[0].file().row_count, 2); + } + + #[tokio::test] + async fn test_input_changelog_dynamic_bucket_commits_data_changelog_and_index() { + let file_io = test_file_io(); + let table_path = "memory:/test_input_changelog_dynamic_bucket_commit"; + setup_dirs(&file_io, table_path).await; + + let table = Table::new( + file_io.clone(), + Identifier::new("default", "test_input_changelog_dynamic_bucket"), + table_path.to_string(), + ordinary_dynamic_pk_changelog_schema(), + None, + ); + + let mut table_write = TableWrite::new(&table, "test-user".to_string()).unwrap(); + assert!(matches!( + table_write.bucket_assigner, + BucketAssignerEnum::Dynamic(_) + )); + table_write + .write_arrow_batch(&make_partitioned_batch_with_value_kind( + vec!["a", "a"], + vec![1, 2], + vec![10, 20], + vec![0, 3], + )) + .await + .unwrap(); + + let messages = table_write.prepare_commit().await.unwrap(); + assert_eq!(messages.len(), 1); + assert_eq!(messages[0].new_files.len(), 1); + assert_eq!(messages[0].new_files[0].row_count, 2); + assert_eq!(messages[0].new_files[0].delete_row_count, Some(1)); + assert_eq!(messages[0].new_changelog_files.len(), 1); + assert_eq!(messages[0].new_changelog_files[0].row_count, 2); + assert_eq!(messages[0].new_changelog_files[0].delete_row_count, Some(1)); + assert_eq!(messages[0].new_index_files.len(), 1); + assert_eq!(messages[0].new_index_files[0].index_type, "HASH"); + assert_eq!(messages[0].new_index_files[0].row_count, 2); + + let index_file_name = messages[0].new_index_files[0].file_name.clone(); + + let commit = TableCommit::new(table, "test-user".to_string()); + commit.commit(messages).await.unwrap(); + + let snap_manager = SnapshotManager::new(file_io.clone(), table_path.to_string()); + let snapshot = snap_manager.get_latest_snapshot().await.unwrap().unwrap(); + assert_eq!(snapshot.total_record_count(), Some(2)); + assert_eq!(snapshot.delta_record_count(), Some(2)); + assert_eq!(snapshot.changelog_record_count(), Some(2)); + + let manifest_dir = format!("{table_path}/manifest"); + let changelog_list = snapshot + .changelog_manifest_list() + .expect("changelog manifest list"); + let changelog_metas = + ManifestList::read(&file_io, &format!("{manifest_dir}/{changelog_list}")) + .await + .unwrap(); + let changelog_partition_stats = changelog_metas[0].partition_stats(); + assert!(!changelog_partition_stats.min_values().is_empty()); + assert!(!changelog_partition_stats.max_values().is_empty()); + assert_eq!( + changelog_partition_stats.null_counts().as_slice(), + &[Some(0)] + ); + + let index_manifest = snapshot.index_manifest().expect("index manifest"); + let index_entries = + IndexManifest::read(&file_io, &format!("{manifest_dir}/{index_manifest}")) + .await + .unwrap(); + assert_eq!(index_entries.len(), 1); + assert_eq!(index_entries[0].kind, FileKind::Add); + assert_eq!(index_entries[0].index_file.file_name, index_file_name); + assert_eq!(index_entries[0].index_file.index_type, "HASH"); + assert_eq!(index_entries[0].index_file.row_count, 2); + } + + #[tokio::test] + async fn test_input_changelog_overwrite_prepare_commit_rejects_before_flush() { + let file_io = test_file_io(); + let table_path = "memory:/test_input_changelog_overwrite"; + setup_dirs(&file_io, table_path).await; + + let table = Table::new( + file_io, + Identifier::new("default", "test_input_changelog"), + table_path.to_string(), + pk_changelog_schema(&[("changelog-producer", "input")]), + None, + ); + + let mut table_write = TableWrite::new(&table, "test-user".to_string()) + .unwrap() + .with_overwrite(); + table_write + .write_arrow_batch(&make_batch(vec![1], vec![10])) + .await + .unwrap(); + + let err = table_write.prepare_commit().await.unwrap_err(); + assert!( + matches!(err, crate::Error::Unsupported { message } if message.contains("overwrite")) + ); + } + #[tokio::test] async fn test_pk_write_and_commit() { let file_io = test_file_io(); From 30312689b6970c8d7ab6d2d443490cc6c7222eab Mon Sep 17 00:00:00 2001 From: QuakeWang Date: Thu, 14 May 2026 14:19:14 +0800 Subject: [PATCH 2/2] test: cover input changelog pk write in datafusion Signed-off-by: QuakeWang --- .../datafusion/tests/pk_tables.rs | 32 +++++++++++-------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/crates/integrations/datafusion/tests/pk_tables.rs b/crates/integrations/datafusion/tests/pk_tables.rs index fa0bd6f1..bf1a317b 100644 --- a/crates/integrations/datafusion/tests/pk_tables.rs +++ b/crates/integrations/datafusion/tests/pk_tables.rs @@ -1460,9 +1460,9 @@ async fn test_pk_partitioned_multi_bucket() { // ======================= Error Cases ======================= -/// PK table with changelog-producer=input should be rejected. +/// PK table with changelog-producer=input should write through DataFusion SQL. #[tokio::test] -async fn test_pk_reject_changelog_producer_input() { +async fn test_pk_input_changelog_write_read() { let (_tmp, sql_context) = setup_sql_context().await; sql_context @@ -1475,18 +1475,24 @@ async fn test_pk_reject_changelog_producer_input() { .await .unwrap(); - let result = sql_context - .sql("INSERT INTO paimon.test_db.t_changelog VALUES (1, 'alice')") - .await; + sql_context + .sql( + "INSERT INTO paimon.test_db.t_changelog VALUES + (1, 'alice'), (1, 'bob'), (2, 'carol')", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); - let is_err = match result { - Err(_) => true, - Ok(df) => df.collect().await.is_err(), - }; - assert!( - is_err, - "PK table with changelog-producer=input should reject writes" - ); + let rows = collect_id_name( + &sql_context, + "SELECT id, name FROM paimon.test_db.t_changelog ORDER BY id", + ) + .await; + + assert_eq!(rows, vec![(1, "bob".to_string()), (2, "carol".to_string())]); } // ======================= String Primary Key =======================