Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions crates/integrations/datafusion/tests/pk_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1460,9 +1460,9 @@ async fn test_pk_partitioned_multi_bucket() {

// ======================= Error Cases =======================

/// PK table with changelog-producer=input should be rejected.
/// PK table with changelog-producer=input should write through DataFusion SQL.
#[tokio::test]
async fn test_pk_reject_changelog_producer_input() {
async fn test_pk_input_changelog_write_read() {
let (_tmp, sql_context) = setup_sql_context().await;

sql_context
Expand All @@ -1475,18 +1475,24 @@ async fn test_pk_reject_changelog_producer_input() {
.await
.unwrap();

let result = sql_context
.sql("INSERT INTO paimon.test_db.t_changelog VALUES (1, 'alice')")
.await;
sql_context
.sql(
"INSERT INTO paimon.test_db.t_changelog VALUES
(1, 'alice'), (1, 'bob'), (2, 'carol')",
)
.await
.unwrap()
.collect()
.await
.unwrap();

let is_err = match result {
Err(_) => true,
Ok(df) => df.collect().await.is_err(),
};
assert!(
is_err,
"PK table with changelog-producer=input should reject writes"
);
let rows = collect_id_name(
&sql_context,
"SELECT id, name FROM paimon.test_db.t_changelog ORDER BY id",
)
.await;

assert_eq!(rows, vec![(1, "bob".to_string()), (2, "carol".to_string())]);
}

// ======================= String Primary Key =======================
Expand Down
181 changes: 180 additions & 1 deletion crates/paimon/src/spec/core_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ const COMMIT_MAX_RETRY_WAIT_OPTION: &str = "commit.max-retry-wait";
const FILE_COMPRESSION_OPTION: &str = "file.compression";
const FILE_COMPRESSION_ZSTD_LEVEL_OPTION: &str = "file.compression.zstd-level";
const FILE_FORMAT_OPTION: &str = "file.format";
const CHANGELOG_FILE_PREFIX_OPTION: &str = "changelog-file.prefix";
const CHANGELOG_FILE_FORMAT_OPTION: &str = "changelog-file.format";
const CHANGELOG_FILE_COMPRESSION_OPTION: &str = "changelog-file.compression";
const CHANGELOG_FILE_STATS_MODE_OPTION: &str = "changelog-file.stats-mode";
const ROW_TRACKING_ENABLED_OPTION: &str = "row-tracking.enabled";
const WRITE_PARQUET_BUFFER_SIZE_OPTION: &str = "write.parquet-buffer-size";
const SEQUENCE_FIELD_OPTION: &str = "sequence.field";
Expand All @@ -55,6 +59,7 @@ pub const SCAN_VERSION_OPTION: &str = "scan.version";
const DEFAULT_SOURCE_SPLIT_TARGET_SIZE: i64 = 128 * 1024 * 1024;
const DEFAULT_SOURCE_SPLIT_OPEN_FILE_COST: i64 = 4 * 1024 * 1024;
const DEFAULT_PARTITION_DEFAULT_NAME: &str = "__DEFAULT_PARTITION__";
const DEFAULT_CHANGELOG_FILE_PREFIX: &str = "changelog-";
const DEFAULT_TARGET_FILE_SIZE: i64 = 256 * 1024 * 1024;
const DEFAULT_WRITE_PARQUET_BUFFER_SIZE: i64 = 256 * 1024 * 1024;
const DYNAMIC_BUCKET_TARGET_ROW_NUM_OPTION: &str = "dynamic-bucket.target-row-num";
Expand All @@ -75,6 +80,32 @@ pub enum MergeEngine {
FirstRow,
}

/// Changelog producer for table writes.
///
/// Reference: Java `CoreOptions.ChangelogProducer`.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ChangelogProducer {
/// No changelog file.
None,
/// Double write input rows to changelog files.
Input,
/// Generate changelog files during full compaction.
FullCompaction,
/// Generate changelog files through lookup compaction.
Lookup,
}

impl ChangelogProducer {
pub fn as_str(&self) -> &'static str {
match self {
Self::None => "none",
Self::Input => "input",
Self::FullCompaction => "full-compaction",
Self::Lookup => "lookup",
}
}
}

/// Format the bucket directory name for a given bucket number.
/// Returns `"bucket-postpone"` for `POSTPONE_BUCKET` (-2), otherwise `"bucket-{N}"`.
pub fn bucket_dir_name(bucket: i32) -> String {
Expand Down Expand Up @@ -138,14 +169,30 @@ impl<'a> CoreOptions<'a> {
}
}

/// Changelog producer setting. Default is "none".
/// Raw changelog producer setting. Default is `"none"`.
pub fn changelog_producer(&self) -> &str {
self.options
.get(CHANGELOG_PRODUCER_OPTION)
.map(String::as_str)
.unwrap_or("none")
}

/// Typed changelog producer setting. Default is `None`.
pub fn try_changelog_producer(&self) -> crate::Result<ChangelogProducer> {
match self.options.get(CHANGELOG_PRODUCER_OPTION) {
None => Ok(ChangelogProducer::None),
Some(v) => match v.to_ascii_lowercase().as_str() {
"none" => Ok(ChangelogProducer::None),
"input" => Ok(ChangelogProducer::Input),
"full-compaction" => Ok(ChangelogProducer::FullCompaction),
"lookup" => Ok(ChangelogProducer::Lookup),
other => Err(crate::Error::Unsupported {
message: format!("Unsupported changelog-producer: '{other}'"),
}),
},
}
}

/// The `rowkind.field` option: a user column whose value encodes the row kind.
pub fn rowkind_field(&self) -> Option<&str> {
self.options.get(ROWKIND_FIELD_OPTION).map(String::as_str)
Expand Down Expand Up @@ -363,6 +410,55 @@ impl<'a> CoreOptions<'a> {
.unwrap_or(1)
}

/// File name prefix for changelog files. Default is `"changelog-"`.
pub fn changelog_file_prefix(&self) -> &str {
self.options
.get(CHANGELOG_FILE_PREFIX_OPTION)
.map(String::as_str)
.unwrap_or(DEFAULT_CHANGELOG_FILE_PREFIX)
}

/// Effective file format for changelog files.
///
/// When `changelog-file.format` is not configured, Java Paimon falls back
/// to the table `file.format`.
pub fn changelog_file_format(&self) -> &str {
self.options
.get(CHANGELOG_FILE_FORMAT_OPTION)
.map(String::as_str)
.unwrap_or_else(|| self.file_format())
}

/// Effective compression codec for changelog files.
///
/// When `changelog-file.compression` is not configured, Java Paimon falls
/// back to the table `file.compression`.
pub fn changelog_file_compression(&self) -> &str {
self.options
.get(CHANGELOG_FILE_COMPRESSION_OPTION)
.map(String::as_str)
.unwrap_or_else(|| self.file_compression())
}

/// Metadata stats collection mode for changelog files, if configured.
pub fn changelog_file_stats_mode(&self) -> Option<&str> {
self.options
.get(CHANGELOG_FILE_STATS_MODE_OPTION)
.map(String::as_str)
}

pub(crate) fn changelog_file_format_configured(&self) -> bool {
self.options.contains_key(CHANGELOG_FILE_FORMAT_OPTION)
}

pub(crate) fn changelog_file_compression_configured(&self) -> bool {
self.options.contains_key(CHANGELOG_FILE_COMPRESSION_OPTION)
}

pub(crate) fn changelog_file_stats_mode_configured(&self) -> bool {
self.options.contains_key(CHANGELOG_FILE_STATS_MODE_OPTION)
}

/// Parquet writer in-progress buffer size limit. Default is 256MB.
/// When the buffered data exceeds this, the writer flushes the current row group.
pub fn write_parquet_buffer_size(&self) -> i64 {
Expand Down Expand Up @@ -546,6 +642,89 @@ mod tests {
assert_eq!(core.merge_engine().unwrap(), MergeEngine::PartialUpdate);
}

#[test]
fn test_changelog_producer_defaults_to_none() {
let options = HashMap::new();
let core = CoreOptions::new(&options);

assert_eq!(core.changelog_producer(), "none");
assert_eq!(
core.try_changelog_producer().unwrap(),
ChangelogProducer::None
);
}

#[test]
fn test_changelog_producer_accepts_known_values() {
for (value, expected) in [
("none", ChangelogProducer::None),
("input", ChangelogProducer::Input),
("full-compaction", ChangelogProducer::FullCompaction),
("lookup", ChangelogProducer::Lookup),
("INPUT", ChangelogProducer::Input),
] {
let options = HashMap::from([(CHANGELOG_PRODUCER_OPTION.to_string(), value.into())]);
let core = CoreOptions::new(&options);

assert_eq!(core.try_changelog_producer().unwrap(), expected);
}
}

#[test]
fn test_changelog_producer_rejects_unknown_values() {
let options = HashMap::from([(CHANGELOG_PRODUCER_OPTION.to_string(), "other".into())]);
let core = CoreOptions::new(&options);

let err = core
.try_changelog_producer()
.expect_err("unknown producer should fail");
assert!(
matches!(err, crate::Error::Unsupported { message } if message.contains("Unsupported changelog-producer"))
);
}

#[test]
fn test_changelog_file_options_defaults_and_overrides() {
let default_options = HashMap::from([
(FILE_FORMAT_OPTION.to_string(), "avro".to_string()),
(FILE_COMPRESSION_OPTION.to_string(), "snappy".to_string()),
]);
let default_core = CoreOptions::new(&default_options);

assert_eq!(default_core.changelog_file_prefix(), "changelog-");
assert_eq!(default_core.changelog_file_format(), "avro");
assert_eq!(default_core.changelog_file_compression(), "snappy");
assert_eq!(default_core.changelog_file_stats_mode(), None);

let custom_options = HashMap::from([
(
CHANGELOG_FILE_PREFIX_OPTION.to_string(),
"custom-".to_string(),
),
(
CHANGELOG_FILE_FORMAT_OPTION.to_string(),
"parquet".to_string(),
),
(
CHANGELOG_FILE_COMPRESSION_OPTION.to_string(),
"zstd".to_string(),
),
(
CHANGELOG_FILE_STATS_MODE_OPTION.to_string(),
"counts".to_string(),
),
]);
let custom_core = CoreOptions::new(&custom_options);

assert_eq!(custom_core.changelog_file_prefix(), "custom-");
assert_eq!(custom_core.changelog_file_format(), "parquet");
assert_eq!(custom_core.changelog_file_compression(), "zstd");
assert_eq!(custom_core.changelog_file_stats_mode(), Some("counts"));
assert!(custom_core.changelog_file_format_configured());
assert!(custom_core.changelog_file_compression_configured());
assert!(custom_core.changelog_file_stats_mode_configured());
}

#[test]
fn test_commit_options_defaults() {
let options = HashMap::new();
Expand Down
3 changes: 3 additions & 0 deletions crates/paimon/src/table/commit_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pub struct CommitMessage {
pub bucket: i32,
/// New data files to be added.
pub new_files: Vec<DataFileMeta>,
/// New changelog files to be added.
pub new_changelog_files: Vec<DataFileMeta>,
/// New index files to be added (used by dynamic bucket mode).
pub new_index_files: Vec<IndexFileMeta>,
/// Files to be deleted (copy-on-write rewrite: old files replaced by new_files).
Expand All @@ -41,6 +43,7 @@ impl CommitMessage {
partition,
bucket,
new_files,
new_changelog_files: Vec::new(),
new_index_files: Vec::new(),
deleted_files: Vec::new(),
}
Expand Down
Loading
Loading