Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -1274,6 +1274,16 @@ func WithManifestFileContent(content ManifestContent) ManifestFileOption {
}
}

// WithManifestFileFirstRowID sets the first_row_id on a v3+ data manifest.
// Silently no-ops on v1/v2 manifests since the field does not exist in those formats.
func WithManifestFileFirstRowID(firstRowID int64) ManifestFileOption {
return func(mf *manifestFile) {
if mf.version >= 3 {
mf.FirstRowIDValue = &firstRowID
}
}
}

func (w *ManifestWriter) ToManifestFile(location string, length int64, opts ...ManifestFileOption) (ManifestFile, error) {
if err := w.Close(); err != nil {
return nil, err
Expand Down Expand Up @@ -1620,6 +1630,10 @@ func WriteManifestList(version int, out io.Writer, snapshotID int64, parentSnaps
return writer.AddManifests(files)
}

// WriteManifest writes a manifest file (a list of manifest entries) as Avro.
// opts may include ManifestFileOption values such as WithManifestFileContent or
// WithManifestFileFirstRowID (v3+ only) to set descriptor fields on the returned
// ManifestFile.
func WriteManifest(
filename string,
out io.Writer,
Expand All @@ -1628,6 +1642,7 @@ func WriteManifest(
schema *Schema,
snapshotID int64,
entries []ManifestEntry,
opts ...ManifestFileOption,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit (pre-existing): WriteManifest is exported but has no doc comment. Since you're already touching the signature, a one-liner noting that opts set v3-specific descriptor fields (e.g. WithManifestFileFirstRowID) would be a welcome add.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a doc comment on WriteManifest in b6f3bbb, calling out that opts can include v3-specific options like WithManifestFileFirstRowID.

) (mf ManifestFile, err error) {
cnt := &internal.CountingWriter{W: out}

Expand All @@ -1648,7 +1663,7 @@ func WriteManifest(
return nil, err
}

return w.ToManifestFile(filename, cnt.Count)
return w.ToManifestFile(filename, cnt.Count, opts...)
}

// ManifestEntryStatus defines constants for the entry status of
Expand Down
104 changes: 104 additions & 0 deletions manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1158,6 +1158,110 @@ func (m *ManifestTestSuite) TestV3DataManifestFirstRowIDInheritanceSkipsDeletedE
m.EqualValues(1000+liveCount, *read[2].DataFile().FirstRowID())
}

func (m *ManifestTestSuite) TestWriteManifestWithFirstRowIDOption() {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: consider adding (a) a v1/v2 + WithManifestFileFirstRowID case to pin the intended behavior (no-op vs non-nil FirstRowID()), and (b) a delete-manifest case, since the spec restricts first_row_id to data manifests. The read side already gates inheritance on formatVersion >= 3 && content == data, so this just locks down the write side.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added both in b6f3bbb:

a v1 manifest + WithManifestFileFirstRowID case confirming it's a no-op (FirstRowID() is nil)
a v3 delete-manifest case confirming the field can be set on the manifest itself (the guard only checks version), but entries don't inherit first_row_id on read since that's separately gated on content == data in the reader

partitionSpec := NewPartitionSpecID(1,
PartitionField{FieldID: 1000, SourceIDs: []int{1}, Name: "x", Transform: IdentityTransform{}})
count := int64(10)
entries := []ManifestEntry{
&manifestEntry{
EntryStatus: EntryStatusADDED,
Snapshot: &entrySnapshotID,
Data: &dataFile{
Content: EntryContentData,
Path: "/data/file1.parquet",
Format: ParquetFile,
PartitionData: map[string]any{"x": int(1)},
RecordCount: count,
FileSize: 1000,
BlockSizeInBytes: 64 * 1024,
FirstRowIDField: nil,
},
},
&manifestEntry{
EntryStatus: EntryStatusADDED,
Snapshot: &entrySnapshotID,
Data: &dataFile{
Content: EntryContentData,
Path: "/data/file2.parquet",
Format: ParquetFile,
PartitionData: map[string]any{"x": int(2)},
RecordCount: count,
FileSize: 2000,
BlockSizeInBytes: 64 * 1024,
FirstRowIDField: nil,
},
},
}

// Test 1: WriteManifest with WithManifestFileFirstRowID sets the field.
var bufWithID bytes.Buffer
firstRowID := int64(500)
mf, err := WriteManifest("/manifest.avro", &bufWithID, 3, partitionSpec, testSchema, entrySnapshotID, entries,
WithManifestFileFirstRowID(firstRowID))
m.Require().NoError(err)
m.Require().NotNil(mf.FirstRowID())
m.Equal(firstRowID, *mf.FirstRowID())

// Reading back, entries inherit first_row_id from the manifest file.
read, err := ReadManifest(mf, bytes.NewReader(bufWithID.Bytes()), false)
m.Require().NoError(err)
m.Require().Len(read, 2)
m.Require().NotNil(read[0].DataFile().FirstRowID())
m.EqualValues(firstRowID, *read[0].DataFile().FirstRowID())
m.Require().NotNil(read[1].DataFile().FirstRowID())
m.EqualValues(firstRowID+count, *read[1].DataFile().FirstRowID())

// Test 2: WriteManifest without option leaves FirstRowID nil (backward compat).
var bufNoID bytes.Buffer
mf2, err := WriteManifest("/manifest.avro", &bufNoID, 3, partitionSpec, testSchema, entrySnapshotID, entries)
m.Require().NoError(err)
m.Nil(mf2.FirstRowID())

// Test 3: v1 + WithManifestFileFirstRowID is a no-op (version < 3).
var bufV1 bytes.Buffer
mf3, err := WriteManifest("/manifest.avro", &bufV1, 1, partitionSpec, testSchema, entrySnapshotID, entries,
WithManifestFileFirstRowID(999))
m.Require().NoError(err)
m.Nil(mf3.FirstRowID())

// Test 4: v3 delete-manifest + WithManifestFileFirstRowID sets the field
// (version >= 3), but the reader does not inherit first_row_id into entries
// for delete manifests.
deleteEntry := &manifestEntry{
EntryStatus: EntryStatusADDED,
Snapshot: &entrySnapshotID,
Data: &dataFile{
Content: EntryContentPosDeletes,
Path: "/data/deletes.avro",
Format: AvroFile,
PartitionData: map[string]any{"x": int(1)},
RecordCount: count,
FileSize: 1000,
BlockSizeInBytes: 64 * 1024,
FirstRowIDField: nil,
},
}
var bufDelete bytes.Buffer
cnt := &internal.CountingWriter{W: &bufDelete}
w, err := NewManifestWriter(3, cnt, partitionSpec, testSchema, entrySnapshotID,
WithManifestWriterContent(ManifestContentDeletes))
m.Require().NoError(err)
m.Require().NoError(w.Add(deleteEntry))
m.Require().NoError(w.Close())
mf4, err := w.ToManifestFile("/manifest.avro", cnt.Count,
WithManifestFileContent(ManifestContentDeletes),
WithManifestFileFirstRowID(777))
m.Require().NoError(err)
m.Require().NotNil(mf4.FirstRowID())
m.EqualValues(777, *mf4.FirstRowID())

// Reading back a delete manifest — entries should not inherit first_row_id.
readDelete, err := ReadManifest(mf4, bytes.NewReader(bufDelete.Bytes()), false)
m.Require().NoError(err)
m.Require().Len(readDelete, 1)
m.Nil(readDelete[0].DataFile().FirstRowID())
}

func (m *ManifestTestSuite) TestReadManifestListIncompleteSchema() {
// Verify that reading a manifest list whose embedded schema references
// an undefined named type ("field_summary" without its definition)
Expand Down
Loading