Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions crates/iceberg/public-api.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2030,9 +2030,9 @@ impl iceberg::spec::ManifestListWriter
pub fn iceberg::spec::ManifestListWriter::add_manifests(&mut self, manifests: impl core::iter::traits::iterator::Iterator<Item = iceberg::spec::ManifestFile>) -> iceberg::Result<()>
pub async fn iceberg::spec::ManifestListWriter::close(self) -> iceberg::Result<()>
pub fn iceberg::spec::ManifestListWriter::next_row_id(&self) -> core::option::Option<u64>
pub fn iceberg::spec::ManifestListWriter::v1(output_file: iceberg::io::OutputFile, snapshot_id: i64, parent_snapshot_id: core::option::Option<i64>) -> Self
pub fn iceberg::spec::ManifestListWriter::v2(output_file: iceberg::io::OutputFile, snapshot_id: i64, parent_snapshot_id: core::option::Option<i64>, sequence_number: i64) -> Self
pub fn iceberg::spec::ManifestListWriter::v3(output_file: iceberg::io::OutputFile, snapshot_id: i64, parent_snapshot_id: core::option::Option<i64>, sequence_number: i64, first_row_id: core::option::Option<u64>) -> Self
pub fn iceberg::spec::ManifestListWriter::v1(writer: alloc::boxed::Box<dyn iceberg::io::FileWrite>, snapshot_id: i64, parent_snapshot_id: core::option::Option<i64>) -> Self
pub fn iceberg::spec::ManifestListWriter::v2(writer: alloc::boxed::Box<dyn iceberg::io::FileWrite>, snapshot_id: i64, parent_snapshot_id: core::option::Option<i64>, sequence_number: i64) -> Self
pub fn iceberg::spec::ManifestListWriter::v3(writer: alloc::boxed::Box<dyn iceberg::io::FileWrite>, snapshot_id: i64, parent_snapshot_id: core::option::Option<i64>, sequence_number: i64, first_row_id: core::option::Option<u64>) -> Self
impl core::fmt::Debug for iceberg::spec::ManifestListWriter
pub fn iceberg::spec::ManifestListWriter::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result
pub struct iceberg::spec::ManifestMetadata
Expand Down
13 changes: 9 additions & 4 deletions crates/iceberg/src/io/object_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,11 +298,16 @@ mod tests {
let data_file_manifest = writer.write_manifest_file().await.unwrap();

// Write to manifest list
let manifest_list_writer = self
.table
.file_io()
.new_output(current_snapshot.manifest_list())
.unwrap()
.writer()
.await
.unwrap();
let mut manifest_list_write = ManifestListWriter::v2(
self.table
.file_io()
.new_output(current_snapshot.manifest_list())
.unwrap(),
manifest_list_writer,
current_snapshot.snapshot_id(),
current_snapshot.parent_snapshot_id(),
current_snapshot.sequence_number(),
Expand Down
39 changes: 27 additions & 12 deletions crates/iceberg/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -897,11 +897,16 @@ pub mod tests {
let data_file_manifest = writer.write_manifest_file().await.unwrap();

// Write to manifest list
let manifest_list_writer = self
.table
.file_io()
.new_output(current_snapshot.manifest_list())
.unwrap()
.writer()
.await
.unwrap();
let mut manifest_list_write = ManifestListWriter::v2(
self.table
.file_io()
.new_output(current_snapshot.manifest_list())
.unwrap(),
manifest_list_writer,
current_snapshot.snapshot_id(),
current_snapshot.parent_snapshot_id(),
current_snapshot.sequence_number(),
Expand Down Expand Up @@ -1128,11 +1133,16 @@ pub mod tests {
let data_file_manifest = writer.write_manifest_file().await.unwrap();

// Write to manifest list
let manifest_list_writer = self
.table
.file_io()
.new_output(current_snapshot.manifest_list())
.unwrap()
.writer()
.await
.unwrap();
let mut manifest_list_write = ManifestListWriter::v2(
self.table
.file_io()
.new_output(current_snapshot.manifest_list())
.unwrap(),
manifest_list_writer,
current_snapshot.snapshot_id(),
current_snapshot.parent_snapshot_id(),
current_snapshot.sequence_number(),
Expand Down Expand Up @@ -1219,11 +1229,16 @@ pub mod tests {

// Write to manifest list - DATA FIRST then DELETE
// This order is crucial for reproduction
let manifest_list_writer = self
.table
.file_io()
.new_output(current_snapshot.manifest_list())
.unwrap()
.writer()
.await
.unwrap();
let mut manifest_list_write = ManifestListWriter::v2(
self.table
.file_io()
.new_output(current_snapshot.manifest_list())
.unwrap(),
manifest_list_writer,
current_snapshot.snapshot_id(),
current_snapshot.parent_snapshot_id(),
current_snapshot.sequence_number(),
Expand Down
104 changes: 55 additions & 49 deletions crates/iceberg/src/spec/manifest_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use self::_const_schema::{MANIFEST_LIST_AVRO_SCHEMA_V1, MANIFEST_LIST_AVRO_SCHEM
use self::_serde::{ManifestFileV1, ManifestFileV2};
use super::{FormatVersion, Manifest};
use crate::error::Result;
use crate::io::{FileIO, OutputFile};
use crate::io::{FileIO, FileWrite};
use crate::spec::manifest_list::_const_schema::MANIFEST_LIST_AVRO_SCHEMA_V3;
use crate::spec::manifest_list::_serde::ManifestFileV3;
use crate::{Error, ErrorKind};
Expand Down Expand Up @@ -93,7 +93,7 @@ impl ManifestList {
/// A manifest list writer.
pub struct ManifestListWriter {
format_version: FormatVersion,
output_file: OutputFile,
writer: Box<dyn FileWrite>,
avro_writer: Writer<'static, Vec<u8>>,
sequence_number: i64,
snapshot_id: i64,
Expand All @@ -104,7 +104,6 @@ impl std::fmt::Debug for ManifestListWriter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ManifestListWriter")
.field("format_version", &self.format_version)
.field("output_file", &self.output_file)
.field("avro_writer", &self.avro_writer.schema())
.finish_non_exhaustive()
}
Expand All @@ -116,8 +115,12 @@ impl ManifestListWriter {
self.next_row_id
}

/// Construct a v1 [`ManifestListWriter`] that writes to a provided [`OutputFile`].
pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: Option<i64>) -> Self {
/// Construct a v1 [`ManifestListWriter`] that writes to a provided [`FileWrite`].
pub fn v1(
writer: Box<dyn FileWrite>,
snapshot_id: i64,
parent_snapshot_id: Option<i64>,
) -> Self {
let mut metadata = HashMap::from_iter([
("snapshot-id".to_string(), snapshot_id.to_string()),
("format-version".to_string(), "1".to_string()),
Expand All @@ -128,19 +131,12 @@ impl ManifestListWriter {
parent_snapshot_id.to_string(),
);
}
Self::new(
FormatVersion::V1,
output_file,
metadata,
0,
snapshot_id,
None,
)
Self::new(FormatVersion::V1, writer, metadata, 0, snapshot_id, None)
}

/// Construct a v2 [`ManifestListWriter`] that writes to a provided [`OutputFile`].
/// Construct a v2 [`ManifestListWriter`] that writes to a provided [`FileWrite`].
pub fn v2(
output_file: OutputFile,
writer: Box<dyn FileWrite>,
snapshot_id: i64,
parent_snapshot_id: Option<i64>,
sequence_number: i64,
Expand All @@ -158,17 +154,17 @@ impl ManifestListWriter {
);
Self::new(
FormatVersion::V2,
output_file,
writer,
metadata,
sequence_number,
snapshot_id,
None,
)
}

/// Construct a v3 [`ManifestListWriter`] that writes to a provided [`OutputFile`].
/// Construct a v3 [`ManifestListWriter`] that writes to a provided [`FileWrite`].
pub fn v3(
output_file: OutputFile,
writer: Box<dyn FileWrite>,
snapshot_id: i64,
parent_snapshot_id: Option<i64>,
sequence_number: i64,
Expand All @@ -193,7 +189,7 @@ impl ManifestListWriter {
);
Self::new(
FormatVersion::V3,
output_file,
writer,
metadata,
sequence_number,
snapshot_id,
Expand All @@ -203,7 +199,7 @@ impl ManifestListWriter {

fn new(
format_version: FormatVersion,
output_file: OutputFile,
writer: Box<dyn FileWrite>,
metadata: HashMap<String, String>,
sequence_number: i64,
snapshot_id: i64,
Expand All @@ -222,7 +218,7 @@ impl ManifestListWriter {
}
Self {
format_version,
output_file,
writer,
avro_writer,
sequence_number,
snapshot_id,
Expand Down Expand Up @@ -262,11 +258,10 @@ impl ManifestListWriter {
}

/// Write the manifest list to the output file.
pub async fn close(self) -> Result<()> {
pub async fn close(mut self) -> Result<()> {
let data = self.avro_writer.into_inner()?;
let mut writer = self.output_file.writer().await?;
writer.write(Bytes::from(data)).await?;
writer.close().await?;
self.writer.write(Bytes::from(data)).await?;
self.writer.close().await?;
Ok(())
}

Expand Down Expand Up @@ -1360,12 +1355,13 @@ pub(super) mod _serde {
#[cfg(test)]
mod test {
use std::fs;
use std::path::Path;

use apache_avro::{Reader, Schema};
use tempfile::TempDir;

use super::_serde::ManifestListV2;
use crate::io::FileIO;
use crate::io::{FileIO, FileWrite};
use crate::spec::manifest_list::_serde::{ManifestListV1, ManifestListV3};
use crate::spec::{
Datum, FieldSummary, ManifestContentType, ManifestFile, ManifestList, ManifestListWriter,
Expand Down Expand Up @@ -1400,14 +1396,11 @@ mod test {
let file_io = FileIO::new_with_fs();

let tmp_dir = TempDir::new().unwrap();
let file_name = "simple_manifest_list_v1.avro";
let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
let full_path = tmp_dir.path().join("simple_manifest_list_v1.avro");
let file_writer = file_writer(&full_path, file_io).await;

let mut writer = ManifestListWriter::v1(
file_io.new_output(full_path.clone()).unwrap(),
1646658105718557341,
Some(1646658105718557341),
);
let mut writer =
ManifestListWriter::v1(file_writer, 1646658105718557341, Some(1646658105718557341));

writer
.add_manifests(manifest_list.entries.clone().into_iter())
Expand Down Expand Up @@ -1472,11 +1465,11 @@ mod test {
let file_io = FileIO::new_with_fs();

let tmp_dir = TempDir::new().unwrap();
let file_name = "simple_manifest_list_v1.avro";
let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);
let full_path = tmp_dir.path().join("simple_manifest_list_v1.avro");
let file_writer = file_writer(&full_path, file_io).await;

let mut writer = ManifestListWriter::v2(
file_io.new_output(full_path.clone()).unwrap(),
file_writer,
1646658105718557341,
Some(1646658105718557341),
1,
Expand Down Expand Up @@ -1549,7 +1542,12 @@ mod test {
let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);

let mut writer = ManifestListWriter::v3(
file_io.new_output(full_path.clone()).unwrap(),
file_io
.new_output(full_path.clone())
.unwrap()
.writer()
.await
.unwrap(),
377075049360453639,
Some(377075049360453639),
1,
Expand Down Expand Up @@ -1688,9 +1686,9 @@ mod test {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().join("manifest_list_v1.avro");
let io = FileIO::new_with_fs();
let output_file = io.new_output(path.to_str().unwrap()).unwrap();
let file_writer = file_writer(&path, io).await;

let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, Some(0));
let mut writer = ManifestListWriter::v1(file_writer, 1646658105718557341, Some(0));
writer
.add_manifests(expected_manifest_list.entries.clone().into_iter())
.unwrap();
Expand Down Expand Up @@ -1735,9 +1733,9 @@ mod test {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().join("manifest_list_v2.avro");
let io = FileIO::new_with_fs();
let output_file = io.new_output(path.to_str().unwrap()).unwrap();
let file_writer = file_writer(&path, io).await;

let mut writer = ManifestListWriter::v2(output_file, snapshot_id, Some(0), seq_num);
let mut writer = ManifestListWriter::v2(file_writer, snapshot_id, Some(0), seq_num);
writer
.add_manifests(expected_manifest_list.entries.clone().into_iter())
.unwrap();
Expand Down Expand Up @@ -1783,10 +1781,10 @@ mod test {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().join("manifest_list_v2.avro");
let io = FileIO::new_with_fs();
let output_file = io.new_output(path.to_str().unwrap()).unwrap();
let file_writer = file_writer(&path, io).await;

let mut writer =
ManifestListWriter::v3(output_file, snapshot_id, Some(0), seq_num, Some(10));
ManifestListWriter::v3(file_writer, snapshot_id, Some(0), seq_num, Some(10));
writer
.add_manifests(expected_manifest_list.entries.clone().into_iter())
.unwrap();
Expand Down Expand Up @@ -1831,9 +1829,9 @@ mod test {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().join("manifest_list_v1.avro");
let io = FileIO::new_with_fs();
let output_file = io.new_output(path.to_str().unwrap()).unwrap();
let file_writer = file_writer(&path, io).await;

let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, Some(0));
let mut writer = ManifestListWriter::v1(file_writer, 1646658105718557341, Some(0));
writer
.add_manifests(expected_manifest_list.entries.clone().into_iter())
.unwrap();
Expand Down Expand Up @@ -1876,9 +1874,9 @@ mod test {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().join("manifest_list_v1.avro");
let io = FileIO::new_with_fs();
let output_file = io.new_output(path.to_str().unwrap()).unwrap();
let file_writer = file_writer(&path, io).await;

let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, Some(0));
let mut writer = ManifestListWriter::v1(file_writer, 1646658105718557341, Some(0));
writer
.add_manifests(expected_manifest_list.entries.clone().into_iter())
.unwrap();
Expand Down Expand Up @@ -1923,9 +1921,9 @@ mod test {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().join("manifest_list_v2.avro");
let io = FileIO::new_with_fs();
let output_file = io.new_output(path.to_str().unwrap()).unwrap();
let file_writer = file_writer(&path, io).await;

let mut writer = ManifestListWriter::v2(output_file, snapshot_id, Some(0), seq_num);
let mut writer = ManifestListWriter::v2(file_writer, snapshot_id, Some(0), seq_num);
writer
.add_manifests(expected_manifest_list.entries.clone().into_iter())
.unwrap();
Expand All @@ -1942,6 +1940,14 @@ mod test {
temp_dir.close().unwrap();
}

async fn file_writer(path: &Path, io: FileIO) -> Box<dyn FileWrite> {
io.new_output(path.to_str().unwrap())
.unwrap()
.writer()
.await
.unwrap()
}

#[tokio::test]
async fn test_manifest_list_v2_deserializer_aliases() {
// reading avro manifest file generated by iceberg 1.4.0
Expand Down
3 changes: 3 additions & 0 deletions crates/iceberg/src/transaction/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,9 @@ mod tests {
table
.file_io()
.new_output(current_snapshot.manifest_list())
.unwrap()
.writer()
.await
.unwrap(),
current_snapshot.snapshot_id(),
current_snapshot.parent_snapshot_id(),
Expand Down
Loading
Loading