From 6d3427b8f4fe8b4458526b3ee50e18fb5405a5ee Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Wed, 27 May 2026 12:32:58 +0200 Subject: [PATCH] GH-3598: Expose getRowRanges(int) and add getCompressedBytesForRowRanges ### Rationale for this change Opening up APIs needed by a later materialization feature in Spark. External readers (e.g. a Spark-side scanner) need (a) the column-index-derived row ranges that may pass the configured filter for a row group, and (b) a metadata-only estimate of the on-disk compressed bytes those ranges correspond to for the currently requested columns, so they can plan I/O without reading column data. ### What changes are included in this PR? - `getRowRanges(int blockIndex)`: made public; returns row ranges that may pass the configured filter. With no filter, shortcuts to all rows of the row group. - `getCompressedBytesForRowRanges(int blockIndex, RowRanges rowRanges)`: metadata-only sum of compressed page sizes for the reader's currently requested columns whose pages overlap the given row ranges. Dictionary pages are not represented in OffsetIndex and are therefore excluded. ### Are these changes tested? Yes. `TestParquetFileReaderRowRanges` covers: no-filter row ranges cover all rows, empty ranges short-circuit to 0, full ranges equal the per-page OffsetIndex sum and are strictly less than the column-chunk total (proving dictionary-page exclusion), and partial ranges fall between 0 and the full total. ### Are there any user-facing changes? No. Closes #3598 Co-authored-by: Matt Butrovich --- .../parquet/hadoop/ParquetFileReader.java | 60 ++++++- .../TestParquetFileReaderRowRanges.java | 159 ++++++++++++++++++ 2 files changed, 216 insertions(+), 3 deletions(-) create mode 100644 parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderRowRanges.java diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index e0b0d76e0e..bd2bcaf225 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -1489,9 +1489,23 @@ public ColumnIndexStore getColumnIndexStore(int blockIndex) { return ciStore; } - private RowRanges getRowRanges(int blockIndex) { - assert FilterCompat.isFilteringRequired(options.getRecordFilter()) - : "Should not be invoked if filter is null or NOOP"; + /** + * Computes the {@link RowRanges} within the given row group that may pass the configured filter + * (set via {@link ParquetReadOptions} or {@link ParquetInputFormat#setFilterPredicate}). If no + * filter is configured, returns a {@link RowRanges} covering all rows in the row group. + * + *

This computation is metadata-only: it consults each filter-referenced column's column + * index from the file footer; no column data is read from disk. The result can be passed to + * {@link #readFilteredRowGroup(int, RowRanges)} (intersected with any caller-supplied row + * ranges if desired) to read only the matching pages. + * + * @param blockIndex the row group (block) index + * @return row ranges within the block that may pass the configured filter + */ + public RowRanges getRowRanges(int blockIndex) { + if (!FilterCompat.isFilteringRequired(options.getRecordFilter())) { + return RowRanges.createSingle(blocks.get(blockIndex).getRowCount()); + } RowRanges rowRanges = blockRowRanges.get(blockIndex); if (rowRanges == null) { rowRanges = ColumnIndexFilter.calculateRowRanges( @@ -1504,6 +1518,46 @@ private RowRanges getRowRanges(int blockIndex) { return rowRanges; } + /** + * Returns the total compressed byte count of this reader's requested columns' pages whose + * row ranges intersect {@code rowRanges} within the given row group. The set of columns is + * taken from the reader's currently configured requested schema (see + * {@link #setRequestedSchema}). Metadata-only: consults each column's {@link OffsetIndex} + * from the file footer; no column data is read. + * + *

Page size here is {@link OffsetIndex#getCompressedPageSize} (includes page header). + * Dictionary pages are not represented in {@link OffsetIndex} and are therefore excluded + * from the sum. + * + * @param blockIndex row group index + * @param rowRanges row ranges to intersect against pages + * @return sum of compressed page sizes across requested columns for pages overlapping + * {@code rowRanges} + * @throws ColumnIndexStore.MissingOffsetIndexException if any requested column lacks an + * offset index + */ + public long getCompressedBytesForRowRanges(int blockIndex, RowRanges rowRanges) { + if (rowRanges.rowCount() == 0 || paths.isEmpty()) { + return 0L; + } + BlockMetaData block = blocks.get(blockIndex); + long blockRowCount = block.getRowCount(); + ColumnIndexStore ciStore = getColumnIndexStore(blockIndex); + long total = 0L; + for (ColumnPath path : paths.keySet()) { + OffsetIndex offsetIndex = ciStore.getOffsetIndex(path); + int pageCount = offsetIndex.getPageCount(); + for (int i = 0; i < pageCount; i++) { + long from = offsetIndex.getFirstRowIndex(i); + long to = offsetIndex.getLastRowIndex(i, blockRowCount); + if (rowRanges.isOverlapping(from, to)) { + total += offsetIndex.getCompressedPageSize(i); + } + } + } + return total; + } + public boolean skipNextRowGroup() { return advanceToNextBlock(); } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderRowRanges.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderRowRanges.java new file mode 100644 index 0000000000..48ff4321d0 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderRowRanges.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.PrimitiveIterator; +import java.util.stream.IntStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.HadoopReadOptions; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.internal.filter2.columnindex.RowRanges; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * Tests {@link ParquetFileReader#getRowRanges(int)} and + * {@link ParquetFileReader#getCompressedBytesForRowRanges(int, RowRanges)}. + */ +public class TestParquetFileReaderRowRanges { + + private static final int ROW_COUNT = 10_000; + private static final MessageType SCHEMA = + MessageTypeParser.parseMessageType("message test { required int64 id; required int64 grp; }"); + + @Rule + public final TemporaryFolder temp = new TemporaryFolder(); + + private Path file; + + @Before + public void writeFile() throws IOException { + File f = temp.newFile(); + f.delete(); + file = new Path(f.toURI()); + + // Small page size produces many pages per column chunk; low-cardinality `grp` + // ensures dictionary encoding kicks in so we can verify dictionary-page exclusion. + try (ParquetWriter writer = ExampleParquetWriter.builder(file) + .withType(SCHEMA) + .withWriteMode(OVERWRITE) + .withRowGroupSize(64L * 1024 * 1024) + .withPageSize(4 * 1024) + .withDictionaryEncoding(true) + .build()) { + SimpleGroupFactory factory = new SimpleGroupFactory(SCHEMA); + for (int i = 0; i < ROW_COUNT; i++) { + writer.write(factory.newGroup().append("id", (long) i).append("grp", (long) (i % 8))); + } + } + } + + private ParquetFileReader openReader() throws IOException { + Configuration conf = new Configuration(); + ParquetReadOptions options = HadoopReadOptions.builder(conf).build(); + return ParquetFileReader.open(HadoopInputFile.fromPath(file, conf), options); + } + + @Test + public void getRowRangesWithoutFilterCoversAllRows() throws IOException { + try (ParquetFileReader reader = openReader()) { + assertEquals(1, reader.getRowGroups().size()); + BlockMetaData block = reader.getRowGroups().get(0); + + RowRanges ranges = reader.getRowRanges(0); + + assertEquals(block.getRowCount(), ranges.rowCount()); + assertTrue(ranges.isOverlapping(0L, block.getRowCount() - 1)); + } + } + + @Test + public void getCompressedBytesForEmptyRangesIsZero() throws IOException { + try (ParquetFileReader reader = openReader()) { + assertEquals(0L, reader.getCompressedBytesForRowRanges(0, RowRanges.EMPTY)); + } + } + + @Test + public void getCompressedBytesForFullRangesEqualsOffsetIndexSum() throws IOException { + try (ParquetFileReader reader = openReader()) { + BlockMetaData block = reader.getRowGroups().get(0); + RowRanges full = reader.getRowRanges(0); + + long expected = 0L; + long columnChunkTotal = 0L; + for (ColumnChunkMetaData col : block.getColumns()) { + OffsetIndex oi = reader.readOffsetIndex(col); + for (int p = 0; p < oi.getPageCount(); p++) { + expected += oi.getCompressedPageSize(p); + } + columnChunkTotal += col.getTotalSize(); + } + + assertEquals(expected, reader.getCompressedBytesForRowRanges(0, full)); + + // Dictionary pages aren't represented in OffsetIndex, so the per-page sum + // must be strictly smaller than the column-chunk totals (which include them). + assertTrue( + "expected dictionary-page exclusion: " + expected + " < " + columnChunkTotal, + expected < columnChunkTotal); + } + } + + @Test + public void getCompressedBytesForPartialRangesIsBetweenZeroAndFull() throws IOException { + try (ParquetFileReader reader = openReader()) { + BlockMetaData block = reader.getRowGroups().get(0); + RowRanges full = reader.getRowRanges(0); + long fullBytes = reader.getCompressedBytesForRowRanges(0, full); + + // Build a partial RowRanges from the first half of the pages of an arbitrary column; + // since all columns share row counts, the resulting range applies to every column. + OffsetIndex anyOi = reader.readOffsetIndex(block.getColumns().get(0)); + int halfPageCount = Math.max(1, anyOi.getPageCount() / 2); + PrimitiveIterator.OfInt pages = IntStream.range(0, halfPageCount).iterator(); + RowRanges partial = RowRanges.create(block.getRowCount(), pages, anyOi); + + long partialBytes = reader.getCompressedBytesForRowRanges(0, partial); + + assertTrue("partial bytes should be > 0: " + partialBytes, partialBytes > 0); + assertTrue( + "partial bytes should be < full bytes: " + partialBytes + " < " + fullBytes, + partialBytes < fullBytes); + } + } +}