feat(reader): Adapt the HoodieFileGroupReader to read the native form…#19072
feat(reader): Adapt the HoodieFileGroupReader to read the native form…#19072cshuo wants to merge 1 commit into
Conversation
ecadd6c to
2a800b4
Compare
2a800b4 to
6946a65
Compare
hudi-agent
left a comment
There was a problem hiding this comment.
Thanks for working on this! This PR adapts the HoodieFileGroupReader stack to transparently read RFC-103 native-format data and delete log files alongside legacy inline logs, adding a synthetic NATIVE_DATA_BLOCK type and a shared NativeLogFooterMetadata contract. The native-delete ordering-value handling is well-reasoned and is backed by solid tests (the double-conversion guard in particular). One edge case around native CDC log files is worth double-checking in the inline comment. Please take a look at any inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of small readability nits — magic regex group numbers and a double accessor call — otherwise the code is well-structured and the new classes follow established Hudi conventions cleanly.
| String fileName = pathInfo.getPath().getName(); | ||
| return FSUtils.isLogFile(pathInfo.getPath()) && fileName.contains(logFileExtension); | ||
| return FSUtils.isLogFile(pathInfo.getPath()) | ||
| && (FSUtils.matchNativeLogFile(fileName).isPresent() || fileName.contains(logFileExtension)); |
There was a problem hiding this comment.
🤖 This now surfaces native .cdc.parquet files in a file slice's log files (the new test asserts this). But the read path excludes CDC logs in InputSplit via getFileName().endsWith(CDC_LOGFILE_SUFFIX) (".cdc"), which won't match .cdc.parquet. Since HoodieNativeLogFileReader treats any non-delete native file as a data block, could native CDC logs leak into snapshot/merge reads here? @yihua
| */ | ||
| public static HoodieSchema readSchemaFromNativeLogFile( | ||
| HoodieStorage storage, StoragePath path, Matcher nativeLogMatcher) { | ||
| String nativeLogType = nativeLogMatcher.group(8); |
There was a problem hiding this comment.
🤖 nit: group(8) and group(9) below are opaque without cross-referencing the regex in FSUtils. Could you extract named constants (e.g. NATIVE_LOG_TYPE_GROUP = 8, NATIVE_LOG_EXTENSION_GROUP = 9) here or in FSUtils, so a reader can understand what each group captures at a glance?
| */ | ||
| public static long getFileSize(HoodieStorage storage, HoodieLogFile logFile) { | ||
| try { | ||
| return logFile.getFileSize() >= 0 ? logFile.getFileSize() : storage.getPathInfo(logFile.getPath()).getLength(); |
There was a problem hiding this comment.
🤖 nit: logFile.getFileSize() is called twice in the same expression — could you extract it to a local variable (long fileSize = logFile.getFileSize()) so a reader doesn't have to wonder whether the double call is intentional?
| } | ||
|
|
||
| @Override | ||
| protected <T> ClosableIterator<T> readRecordsFromBlockPayload(HoodieReaderContext<T> readerContext) throws IOException { |
There was a problem hiding this comment.
Key-filtered scans will fail for native data logs here. FileGroupRecordBuffer.getRecordsIterator calls dataBlock.getEngineRecordIterator(readerContext, keys, fullKey) whenever HoodieMergedLogRecordReader.createKeySpec(...) builds an IN/STARTS_WITH filter. The base HoodieDataBlock implementation routes that to lookupEngineRecords, which this block does not override, so .log.parquet files throw Point lookups are not supported by this Data block type (NATIVE_DATA_BLOCK) instead of returning the matching rows. Since native logs are meant to be transparent to the file-group reader, please add a scan-and-filter fallback (or native predicate pushdown) for lookupEngineRecords here and cover an IN/prefix-filtered read test.
…at log files
Describe the issue this Pull Request addresses
Hudi is introducing RFC-103 native-format log files (e.g.
.log.parquet,.deletes.parquet), where the log file is itself a native columnar file carrying records plus footer metadata, rather than legacy inline log blocks. For compatibility and migration, the existingHoodieFileGroupReaderread path must be able to consume both legacy inline logs and the new native logs within the same file slice — automatically and with no user-facing config or API change.This PR adapts the log-reading stack so native data and delete log files are detected and read transparently through the existing block-processing and merge machinery.
Fixes #19057.
Summary and Changelog
Native log reading path
HoodieNativeLogFileReader(implementsHoodieLogFormat.Reader) that opens a native log file and exposes its records as a single synthetic block, reusing the standard reader plumbing.HoodieNativeDataBlock(extendsHoodieDataBlock) andHoodieNativeDeleteBlock(extendsHoodieDeleteBlock) to surface native records/deletes through the existing data/delete-block processing.NATIVE_DATA_BLOCK("native", v9)inHoodieLogBlock; wired it into thecasehandling inBaseHoodieLogRecordReader(both forward and last-block paths) alongside the other data block types.HoodieLogFormatReadernow takesHoodieReaderContextandHoodieTableMetaClient, holds aHoodieLogFormat.Reader(instead of a concreteHoodieLogFileReader), and dispatches to the native reader for native log files; the legacy constructor is preserved as an overload.Footer metadata
NativeLogFooterMetadatato centralize encoding/decoding of log-block header metadata to/from the native file footer (keyhudi.log.format.metadata, format version2), includingreadSchemaFromNativeLogFileand format-version validation.HoodieNativeLogFormatWriterto delegate footer construction toNativeLogFooterMetadata.toFooterMetadata, removing the duplicated inline metadata-building logic.TableSchemaResolver.readSchemaFromLogFilenow detects native log files viaFSUtils.matchNativeLogFileand reads the schema from the footer instead of a log-block header.Ordering-value (DeleteRecord) correctness
RecordContext.getOrderingValueAsJava/getValueAsJava, returning the engine-agnostic Java type expected byDeleteRecord(the merge path later converts back to the engine type viaconvertOrderingValueToEngineType).getValueAsJavainFlinkRecordContextandBaseSparkInternalRecordContext; the Avro/Hive defaults inherit the base behavior.HoodieNativeDeleteBlock.readRecordsToDeleteuses the Java-typed ordering value when constructingDeleteRecords to prevent a double conversion (e.g.ClassCastExceptionfor Flink, wrong values for Spark).Impact
HoodieDataBlock/delete-block processing and merge paths via a synthetic block type, keeping the compaction/merge machinery untouched. Adds a small, well-scopedgetValueAsJavaextension point onRecordContext.Risk Level
Low. The change is additive and gated on native-file detection, so legacy read paths are unaffected and fail-fast guards cover unsupported contexts. Mitigation: unit tests cover footer-header parsing/precedence/validation and per-engine
getValueAsJavaconversions, and an end-to-end test validates reading a mixed legacy + native parquet data/delete file slice through theHoodieFileGroupReader.Documentation Update
None. This is an internal read-path adaptation with no new user-facing config, API, or storage-format change introduced by this PR (the native log format itself is covered by RFC-103).
Contributor's checklist