Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 206 additions & 3 deletions crates/iceberg/src/transaction/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,12 @@ impl SnapshotProduceOperation for FastAppendOperation {
Ok(manifest_list
.entries()
.iter()
.filter(|entry| entry.has_added_files() || entry.has_existing_files())
.filter(|entry| {
// Keep delete-only manifests too: they record which files were removed and
// must persist across snapshots until `expire_snapshots` cleans them up.
// Dropping them lets the removed files reappear as live data (see #2148).
entry.has_added_files() || entry.has_existing_files() || entry.has_deleted_files()
})
.cloned()
.collect())
}
Expand All @@ -147,14 +152,212 @@ impl SnapshotProduceOperation for FastAppendOperation {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::fs;
use std::sync::Arc;

use minijinja::{AutoEscape, Environment, Value, context};
use tempfile::TempDir;
use uuid::Uuid;

use crate::io::FileIO;
use crate::spec::{
DataContentType, DataFileBuilder, DataFileFormat, Literal, MAIN_BRANCH, Struct,
DataContentType, DataFileBuilder, DataFileFormat, Literal, MAIN_BRANCH, ManifestEntry,
ManifestListWriter, ManifestStatus, ManifestWriterBuilder, Struct, TableMetadata,
};
use crate::table::Table;
use crate::test_utils::test_runtime;
use crate::transaction::tests::make_v2_minimal_table;
use crate::transaction::{Transaction, TransactionAction};
use crate::{TableRequirement, TableUpdate};
use crate::{TableIdent, TableRequirement, TableUpdate};

fn render_template(template: &str, ctx: Value) -> String {
let mut env = Environment::new();
env.set_auto_escape_callback(|_| AutoEscape::None);
env.render_str(template, ctx).unwrap()
}

/// Builds a table whose current snapshot's manifest list contains a data manifest
/// followed by a delete-only manifest (one entry with `ManifestStatus::Deleted`,
/// so `deleted_files_count > 0` while `added_files_count == existing_files_count == 0`).
///
/// Returns the table plus the `manifest_path` of the delete-only manifest so callers
/// can assert whether a subsequent append carries it forward.
async fn make_table_with_delete_only_manifest() -> (Table, TempDir, String) {
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().join("table1");
let manifest_list_location = table_location.join("metadata/manifests_list_1.avro");
let table_metadata_location = table_location.join("metadata/v1.json");

let file_io = FileIO::new_with_fs();

let template = fs::read_to_string(format!(
"{}/testdata/example_table_metadata_v2.json",
env!("CARGO_MANIFEST_DIR")
))
.unwrap();
// The template has two snapshots; point the current one at our manifest list.
let metadata_json = render_template(&template, context! {
table_location => &table_location,
manifest_list_1_location => &manifest_list_location,
manifest_list_2_location => &manifest_list_location,
table_metadata_1_location => &table_metadata_location,
});
let table_metadata = serde_json::from_str::<TableMetadata>(&metadata_json).unwrap();

let table = Table::builder()
.metadata(table_metadata)
.identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
.file_io(file_io)
.metadata_location(table_metadata_location.to_str().unwrap())
.runtime(test_runtime())
.build()
.unwrap();

let current_snapshot = table.metadata().current_snapshot().unwrap();
let schema = current_snapshot.schema(table.metadata()).unwrap();
let partition_spec = table.metadata().default_partition_spec();

let next_manifest_file = |location: &str| {
table
.file_io()
.new_output(format!(
"{}/metadata/manifest_{}.avro",
location,
Uuid::new_v4()
))
.unwrap()
};
let table_location_str = table_location.to_str().unwrap().to_string();

// Data manifest: one Added data file.
let mut data_writer = ManifestWriterBuilder::new(
next_manifest_file(&table_location_str),
Some(current_snapshot.snapshot_id()),
None,
schema.clone(),
partition_spec.as_ref().clone(),
)
.build_v2_data();
data_writer
.add_entry(
ManifestEntry::builder()
.status(ManifestStatus::Added)
.data_file(
DataFileBuilder::default()
.partition_spec_id(0)
.content(DataContentType::Data)
.file_path(format!("{table_location_str}/data.parquet"))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(100))]))
.build()
.unwrap(),
)
.build(),
)
.unwrap();
let data_manifest = data_writer.write_manifest_file().await.unwrap();

// Delete-only manifest: a single Deleted entry, nothing added or existing.
let mut delete_writer = ManifestWriterBuilder::new(
next_manifest_file(&table_location_str),
Some(current_snapshot.snapshot_id()),
None,
schema.clone(),
partition_spec.as_ref().clone(),
)
.build_v2_data();
delete_writer
.add_delete_entry(
ManifestEntry::builder()
.status(ManifestStatus::Deleted)
.sequence_number(0)
.file_sequence_number(0)
.data_file(
DataFileBuilder::default()
.partition_spec_id(0)
.content(DataContentType::Data)
.file_path(format!("{table_location_str}/removed.parquet"))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(100))]))
.build()
.unwrap(),
)
.build(),
)
.unwrap();
let delete_manifest = delete_writer.write_manifest_file().await.unwrap();
let delete_manifest_path = delete_manifest.manifest_path.clone();

// Sanity: the delete manifest really is delete-only.
assert!(delete_manifest.has_deleted_files());
assert!(!delete_manifest.has_added_files());
assert!(!delete_manifest.has_existing_files());

let mut manifest_list_write = ManifestListWriter::v2(
table
.file_io()
.new_output(current_snapshot.manifest_list())
.unwrap(),
current_snapshot.snapshot_id(),
current_snapshot.parent_snapshot_id(),
current_snapshot.sequence_number(),
);
manifest_list_write
.add_manifests(vec![data_manifest, delete_manifest].into_iter())
.unwrap();
manifest_list_write.close().await.unwrap();

(table, tmp_dir, delete_manifest_path)
}

/// Regression test for #2148: a `fast_append` must carry delete-only manifests
/// forward into the new snapshot. Dropping them lets the files they mark as
/// removed reappear as live data on the next append.
#[tokio::test]
async fn test_fast_append_preserves_delete_only_manifest() {
let (table, _tmp_dir, delete_manifest_path) = make_table_with_delete_only_manifest().await;

// Append a new data file via the public transaction API.
let new_file = DataFileBuilder::default()
.content(DataContentType::Data)
.file_path(format!("{}/appended.parquet", table.metadata().location()))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition_spec_id(table.metadata().default_partition_spec_id())
.partition(Struct::from_iter([Some(Literal::long(100))]))
.build()
.unwrap();

let tx = Transaction::new(&table);
let action = tx.fast_append().add_data_files(vec![new_file]);
let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
let updates = action_commit.take_updates();

let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] {
snapshot
} else {
unreachable!("first update of a fast append should be AddSnapshot")
};

let manifest_list = new_snapshot
.load_manifest_list(table.file_io(), table.metadata())
.await
.unwrap();

assert!(
manifest_list
.entries()
.iter()
.any(|m| m.manifest_path == delete_manifest_path),
"delete-only manifest {delete_manifest_path} was dropped from the new snapshot's \
manifest list; the files it removed would reappear as live data"
);
}

#[tokio::test]
async fn test_empty_data_append_action() {
Expand Down
Loading