diff --git a/manifest.go b/manifest.go index 91e1a1791..35e4115ba 100644 --- a/manifest.go +++ b/manifest.go @@ -1274,6 +1274,16 @@ func WithManifestFileContent(content ManifestContent) ManifestFileOption { } } +// WithManifestFileFirstRowID sets the first_row_id on a v3+ data manifest. +// Silently no-ops on v1/v2 manifests since the field does not exist in those formats. +func WithManifestFileFirstRowID(firstRowID int64) ManifestFileOption { + return func(mf *manifestFile) { + if mf.version >= 3 { + mf.FirstRowIDValue = &firstRowID + } + } +} + func (w *ManifestWriter) ToManifestFile(location string, length int64, opts ...ManifestFileOption) (ManifestFile, error) { if err := w.Close(); err != nil { return nil, err @@ -1620,6 +1630,10 @@ func WriteManifestList(version int, out io.Writer, snapshotID int64, parentSnaps return writer.AddManifests(files) } +// WriteManifest writes a manifest file (a list of manifest entries) as Avro. +// opts may include ManifestFileOption values such as WithManifestFileContent or +// WithManifestFileFirstRowID (v3+ only) to set descriptor fields on the returned +// ManifestFile. func WriteManifest( filename string, out io.Writer, @@ -1628,6 +1642,7 @@ func WriteManifest( schema *Schema, snapshotID int64, entries []ManifestEntry, + opts ...ManifestFileOption, ) (mf ManifestFile, err error) { cnt := &internal.CountingWriter{W: out} @@ -1648,7 +1663,7 @@ func WriteManifest( return nil, err } - return w.ToManifestFile(filename, cnt.Count) + return w.ToManifestFile(filename, cnt.Count, opts...) } // ManifestEntryStatus defines constants for the entry status of diff --git a/manifest_test.go b/manifest_test.go index 228d891c0..8058c768f 100644 --- a/manifest_test.go +++ b/manifest_test.go @@ -1158,6 +1158,110 @@ func (m *ManifestTestSuite) TestV3DataManifestFirstRowIDInheritanceSkipsDeletedE m.EqualValues(1000+liveCount, *read[2].DataFile().FirstRowID()) } +func (m *ManifestTestSuite) TestWriteManifestWithFirstRowIDOption() { + partitionSpec := NewPartitionSpecID(1, + PartitionField{FieldID: 1000, SourceIDs: []int{1}, Name: "x", Transform: IdentityTransform{}}) + count := int64(10) + entries := []ManifestEntry{ + &manifestEntry{ + EntryStatus: EntryStatusADDED, + Snapshot: &entrySnapshotID, + Data: &dataFile{ + Content: EntryContentData, + Path: "/data/file1.parquet", + Format: ParquetFile, + PartitionData: map[string]any{"x": int(1)}, + RecordCount: count, + FileSize: 1000, + BlockSizeInBytes: 64 * 1024, + FirstRowIDField: nil, + }, + }, + &manifestEntry{ + EntryStatus: EntryStatusADDED, + Snapshot: &entrySnapshotID, + Data: &dataFile{ + Content: EntryContentData, + Path: "/data/file2.parquet", + Format: ParquetFile, + PartitionData: map[string]any{"x": int(2)}, + RecordCount: count, + FileSize: 2000, + BlockSizeInBytes: 64 * 1024, + FirstRowIDField: nil, + }, + }, + } + + // Test 1: WriteManifest with WithManifestFileFirstRowID sets the field. + var bufWithID bytes.Buffer + firstRowID := int64(500) + mf, err := WriteManifest("/manifest.avro", &bufWithID, 3, partitionSpec, testSchema, entrySnapshotID, entries, + WithManifestFileFirstRowID(firstRowID)) + m.Require().NoError(err) + m.Require().NotNil(mf.FirstRowID()) + m.Equal(firstRowID, *mf.FirstRowID()) + + // Reading back, entries inherit first_row_id from the manifest file. + read, err := ReadManifest(mf, bytes.NewReader(bufWithID.Bytes()), false) + m.Require().NoError(err) + m.Require().Len(read, 2) + m.Require().NotNil(read[0].DataFile().FirstRowID()) + m.EqualValues(firstRowID, *read[0].DataFile().FirstRowID()) + m.Require().NotNil(read[1].DataFile().FirstRowID()) + m.EqualValues(firstRowID+count, *read[1].DataFile().FirstRowID()) + + // Test 2: WriteManifest without option leaves FirstRowID nil (backward compat). + var bufNoID bytes.Buffer + mf2, err := WriteManifest("/manifest.avro", &bufNoID, 3, partitionSpec, testSchema, entrySnapshotID, entries) + m.Require().NoError(err) + m.Nil(mf2.FirstRowID()) + + // Test 3: v1 + WithManifestFileFirstRowID is a no-op (version < 3). + var bufV1 bytes.Buffer + mf3, err := WriteManifest("/manifest.avro", &bufV1, 1, partitionSpec, testSchema, entrySnapshotID, entries, + WithManifestFileFirstRowID(999)) + m.Require().NoError(err) + m.Nil(mf3.FirstRowID()) + + // Test 4: v3 delete-manifest + WithManifestFileFirstRowID sets the field + // (version >= 3), but the reader does not inherit first_row_id into entries + // for delete manifests. + deleteEntry := &manifestEntry{ + EntryStatus: EntryStatusADDED, + Snapshot: &entrySnapshotID, + Data: &dataFile{ + Content: EntryContentPosDeletes, + Path: "/data/deletes.avro", + Format: AvroFile, + PartitionData: map[string]any{"x": int(1)}, + RecordCount: count, + FileSize: 1000, + BlockSizeInBytes: 64 * 1024, + FirstRowIDField: nil, + }, + } + var bufDelete bytes.Buffer + cnt := &internal.CountingWriter{W: &bufDelete} + w, err := NewManifestWriter(3, cnt, partitionSpec, testSchema, entrySnapshotID, + WithManifestWriterContent(ManifestContentDeletes)) + m.Require().NoError(err) + m.Require().NoError(w.Add(deleteEntry)) + m.Require().NoError(w.Close()) + mf4, err := w.ToManifestFile("/manifest.avro", cnt.Count, + WithManifestFileContent(ManifestContentDeletes), + WithManifestFileFirstRowID(777)) + m.Require().NoError(err) + m.Require().NotNil(mf4.FirstRowID()) + m.EqualValues(777, *mf4.FirstRowID()) + + // Reading back a delete manifest — entries should not inherit first_row_id. + readDelete, err := ReadManifest(mf4, bytes.NewReader(bufDelete.Bytes()), false) + m.Require().NoError(err) + m.Require().Len(readDelete, 1) + m.Nil(readDelete[0].DataFile().FirstRowID()) +} + func (m *ManifestTestSuite) TestReadManifestListIncompleteSchema() { // Verify that reading a manifest list whose embedded schema references // an undefined named type ("field_summary" without its definition)