-
Notifications
You must be signed in to change notification settings - Fork 2.5k
feat(metadata): introduce HoodieMetadataTableLayout SPI with sub-directory bucketing for global RLI #19045
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
feat(metadata): introduce HoodieMetadataTableLayout SPI with sub-directory bucketing for global RLI #19045
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -48,6 +48,7 @@ | |
| import org.apache.hudi.common.model.HoodieFileGroupId; | ||
| import org.apache.hudi.common.model.HoodieIndexDefinition; | ||
| import org.apache.hudi.common.model.HoodieLogFile; | ||
| import org.apache.hudi.common.model.HoodiePartitionMetadata; | ||
| import org.apache.hudi.common.model.HoodieRecord; | ||
| import org.apache.hudi.common.model.HoodieRecordLocation; | ||
| import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; | ||
|
|
@@ -57,6 +58,7 @@ | |
| import org.apache.hudi.common.schema.HoodieSchema; | ||
| import org.apache.hudi.common.schema.HoodieSchemaCache; | ||
| import org.apache.hudi.common.schema.HoodieSchemaUtils; | ||
| import org.apache.hudi.common.table.HoodieTableConfig; | ||
| import org.apache.hudi.common.table.HoodieTableMetaClient; | ||
| import org.apache.hudi.common.table.TableSchemaResolver; | ||
| import org.apache.hudi.common.table.log.HoodieLogFormat; | ||
|
|
@@ -1163,14 +1165,14 @@ private List<DirectoryInfo> listAllPartitionsFromMDT(String initializationTime, | |
| } | ||
|
|
||
| /** | ||
| * Initialize file groups for a partition. For file listing, we just have one file group. | ||
| * <p> | ||
| * All FileGroups for a given metadata partition has a fixed prefix as per the {@link MetadataPartitionType#getFileIdPrefix()}. | ||
| * Each file group is suffixed with 4 digits with increments of 1 starting with 0000. | ||
| * <p> | ||
| * Let's say we configure 10 file groups for record level index partition, and prefix as "record-index-bucket-" | ||
| * File groups will be named as : | ||
| * record-index-bucket-0000, .... -> ..., record-index-bucket-0009 | ||
| * Initialize file groups for an MDT partition under the configured layout. | ||
| * | ||
| * <p>FileIds are prefixed by {@link MetadataPartitionType#getFileIdPrefix()} and suffixed with a 4-digit | ||
| * file-group index starting at 0000. The on-disk directory for each file group is supplied by the | ||
| * configured {@link HoodieMetadataTableLayout}: the flat layout writes everything under the partition | ||
| * root; sub-directory bucketing writes them into bucket sub-directories. The persisted layout state | ||
| * (class + per-partition fileGroupCount + bucket size) is later used by readers to enumerate the same | ||
| * sub-paths without needing a filesystem listing. | ||
| */ | ||
| private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, MetadataPartitionType metadataPartition, String instantTime, | ||
| int fileGroupCount, String relativePartitionPath, Option<String> dataPartitionName) throws IOException { | ||
|
|
@@ -1184,22 +1186,30 @@ private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, Metadata | |
| // during initial commit, then the fileGroup would still be recognized (as a FileSlice with no baseFiles but a | ||
| // valid logFile). Since these log files being created have no content, it is safe to add them here before | ||
| // the bulkInsert. | ||
| final String msg = String.format("Creating %d file groups for partition %s with base fileId %s at instant time %s", | ||
| fileGroupCount, relativePartitionPath, metadataPartition.getFileIdPrefix(), instantTime); | ||
| HoodieMetadataTableLayout layout = resolveLayoutForMDTInit(); | ||
| final String msg = String.format("Creating %d file groups for partition %s with base fileId %s at instant time %s (layout=%s)", | ||
| fileGroupCount, relativePartitionPath, metadataPartition.getFileIdPrefix(), instantTime, layout.getLayoutId()); | ||
| LOG.info(msg); | ||
| final List<String> fileGroupFileIds = IntStream.range(0, fileGroupCount) | ||
| .mapToObj(i -> HoodieTableMetadataUtil.getFileIDForFileGroup(metadataPartition, i, relativePartitionPath, dataPartitionName)) | ||
|
|
||
| // Pair each file group's fileId with the relative directory it should live in, computed via the layout. | ||
| final List<Pair<String, String>> fileGroupIdAndPath = IntStream.range(0, fileGroupCount) | ||
| .mapToObj(i -> { | ||
| LayoutContext ctx = new LayoutContext(metadataPartition, i, fileGroupCount, dataPartitionName); | ||
| return Pair.of(layout.getFileId(ctx), layout.getFileGroupRelativePath(ctx)); | ||
| }) | ||
| .collect(Collectors.toList()); | ||
| ValidationUtils.checkArgument(fileGroupFileIds.size() == fileGroupCount); | ||
| ValidationUtils.checkArgument(fileGroupIdAndPath.size() == fileGroupCount); | ||
| engineContext.setJobStatus(this.getClass().getSimpleName(), msg); | ||
| engineContext.foreach(fileGroupFileIds, fileGroupFileId -> { | ||
| engineContext.foreach(fileGroupIdAndPath, p -> { | ||
| final String fileGroupFileId = p.getLeft(); | ||
| final String relativePath = p.getRight(); | ||
| try { | ||
| final Map<HeaderMetadataType, String> blockHeader = Collections.singletonMap(HeaderMetadataType.INSTANT_TIME, instantTime); | ||
|
|
||
| final HoodieDeleteBlock block = new HoodieDeleteBlock(Collections.emptyList(), blockHeader); | ||
|
|
||
| try (HoodieLogFormat.Writer writer = HoodieLogFormatWriter.builder() | ||
| .withParentPath(FSUtils.constructAbsolutePath(metadataWriteConfig.getBasePath(), relativePartitionPath)) | ||
| .withParentPath(FSUtils.constructAbsolutePath(metadataWriteConfig.getBasePath(), relativePath)) | ||
| .withLogFileId(fileGroupFileId) | ||
| .withInstantTime(instantTime) | ||
| .withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION) | ||
|
|
@@ -1212,9 +1222,110 @@ private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, Metadata | |
| writer.appendBlock(block); | ||
| } | ||
| } catch (InterruptedException e) { | ||
| throw new HoodieException(String.format("Failed to created fileGroup %s for partition %s", fileGroupFileId, relativePartitionPath), e); | ||
| throw new HoodieException(String.format("Failed to created fileGroup %s for partition %s", fileGroupFileId, relativePath), e); | ||
| } | ||
| }, fileGroupIdAndPath.size()); | ||
|
|
||
| // For non-flat layouts, write .hoodie_partition_metadata at the logical partition root now, | ||
| // before the HoodieAppendHandle path (which would otherwise create one inside the bucket | ||
| // sub-dir). The AppendHandle skips marker creation at layout sub-paths via | ||
| // HoodieAppendHandle#isMDTLayoutSubPath, so this is the sole marker write under bucketing. | ||
| // For the flat default we leave marker creation to the AppendHandle and write nothing here — | ||
| // existing tables keep bit-identical behavior. | ||
| if (!FlatMDTLayout.LAYOUT_ID.equals(layout.getLayoutId())) { | ||
| for (String markerPath : layout.getPartitionMarkerPaths(relativePartitionPath, fileGroupCount)) { | ||
| HoodiePartitionMetadata marker = new HoodiePartitionMetadata( | ||
| dataMetaClient.getStorage(), | ||
| instantTime, | ||
| new StoragePath(metadataWriteConfig.getBasePath()), | ||
| FSUtils.constructAbsolutePath(metadataWriteConfig.getBasePath(), markerPath), | ||
| Option.empty()); | ||
| marker.trySave(); | ||
| } | ||
| }, fileGroupFileIds.size()); | ||
| } | ||
|
|
||
| // Persist layout state for this partition (skipped entirely for the flat default so existing | ||
| // tables get the identical on-disk and properties layout as before). When a non-flat layout is | ||
| // in use, readers consult this property to enumerate physical sub-paths without an FS listing. | ||
| if (metadataMetaClient != null && !FlatMDTLayout.LAYOUT_ID.equals(layout.getLayoutId())) { | ||
| maybePersistLayoutOnMDTInit(layout); | ||
| metadataMetaClient.getTableConfig().addMetadataLayoutPartitionFileGroupCounts( | ||
| metadataMetaClient, Collections.singletonMap(relativePartitionPath, fileGroupCount)); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Resolve the layout class to use for MDT initialization. If the MDT's table config already records a | ||
| * layout (set on a prior partition's init within this MDT), use it — the MDT is uniform. Otherwise, | ||
| * honor the writer's {@code hoodie.metadata.layout.class} config; absent that, default to flat. | ||
| * | ||
| * <p>Throws {@link HoodieMetadataException} if the user requested a non-flat layout while the RLI is | ||
| * configured in the partitioned mode. The first-patch implementation of | ||
| * {@link SubDirBucketedMDTLayout} does not handle partitioned RLI; the user must either disable the | ||
| * layout config or switch the RLI to the global (non-partitioned) mode. | ||
| */ | ||
| private HoodieMetadataTableLayout resolveLayoutForMDTInit() { | ||
| HoodieMetadataTableLayout layout = resolveLayoutForMDTInitInternal(); | ||
| if (!FlatMDTLayout.LAYOUT_ID.equals(layout.getLayoutId()) | ||
| && dataWriteConfig.isRecordLevelIndexEnabled()) { | ||
| throw new HoodieMetadataException( | ||
| "MDT layout " + layout.getClass().getName() + " (id=" + layout.getLayoutId() | ||
| + ") cannot be used together with partitioned RLI. Either disable " | ||
| + "hoodie.metadata.layout.class or switch the RLI to the global (non-partitioned) " | ||
| + "mode. Partitioned-RLI support for sub-directory bucketing will be addressed in a " | ||
| + "follow-up patch / RFC."); | ||
| } | ||
| return layout; | ||
| } | ||
|
|
||
| private HoodieMetadataTableLayout resolveLayoutForMDTInitInternal() { | ||
| if (metadataMetaClient != null) { | ||
| Option<String> existing = metadataMetaClient.getTableConfig().getMetadataLayoutClass(); | ||
| if (existing.isPresent() && !existing.get().isEmpty()) { | ||
| return HoodieMetadataTableLayouts.load(metadataMetaClient.getTableConfig()); | ||
| } | ||
| } | ||
| Option<String> requested = dataWriteConfig.getMetadataConfig().getMetadataLayoutClass(); | ||
| if (!requested.isPresent()) { | ||
| return new FlatMDTLayout(); | ||
| } | ||
| String cls = requested.get(); | ||
| if (FlatMDTLayout.class.getName().equals(cls)) { | ||
| return new FlatMDTLayout(); | ||
| } | ||
| if (SubDirBucketedMDTLayout.class.getName().equals(cls)) { | ||
| return new SubDirBucketedMDTLayout(dataWriteConfig.getMetadataConfig().getMetadataLayoutBucketSize()); | ||
| } | ||
| // Fall through to the central factory for any user-defined layout class. | ||
| if (metadataMetaClient != null) { | ||
| // Stash the class string on a transient view of the MDT config so the factory can find it. | ||
| metadataMetaClient.getTableConfig().setValue(HoodieTableConfig.METADATA_LAYOUT_CLASS, cls); | ||
| metadataMetaClient.getTableConfig().setValue(HoodieTableConfig.METADATA_LAYOUT_BUCKET_SIZE, | ||
| String.valueOf(dataWriteConfig.getMetadataConfig().getMetadataLayoutBucketSize())); | ||
| } | ||
| return HoodieMetadataTableLayouts.load(metadataMetaClient.getTableConfig()); | ||
| } | ||
|
|
||
| /** | ||
| * Persist the layout class + bucket size onto the MDT's hoodie.properties on the first init that | ||
| * involves a non-flat layout. Idempotent for subsequent partition inits within the same MDT. | ||
| */ | ||
| private void maybePersistLayoutOnMDTInit(HoodieMetadataTableLayout layout) { | ||
| if (FlatMDTLayout.LAYOUT_ID.equals(layout.getLayoutId())) { | ||
| // Flat is the implicit default; persist nothing to keep on-disk layout bit-identical for | ||
| // tables that did not opt in. | ||
| return; | ||
| } | ||
| if (metadataMetaClient.getTableConfig().getMetadataLayoutClass().isPresent()) { | ||
| // Already persisted by an earlier partition's init. | ||
| return; | ||
| } | ||
| int bucketSize = dataWriteConfig.getMetadataConfig().getMetadataLayoutBucketSize(); | ||
| metadataMetaClient.getTableConfig().setMetadataLayout( | ||
| metadataMetaClient, | ||
| layout.getClass().getName(), | ||
| bucketSize, | ||
| Collections.emptyMap()); | ||
| } | ||
|
|
||
| void clearExistingMetadataPartition(String relativePartitionPath) throws IOException { | ||
|
|
@@ -2081,6 +2192,14 @@ private SerializableFunction<HoodieRecord, HoodieRecord> getRecordTagger(String | |
| FileSlice slice = fileSlices.get(mappingFunction.apply(r.getRecordKey(), fileGroupCount)); | ||
| r.unseal(); | ||
| r.setCurrentLocation(new HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId())); | ||
| // Under layouts that place file groups in sub-directories (e.g. bucketing), the file slice's | ||
| // physical partition path may differ from the logical MDT partition name carried on the | ||
| // record. Realign the record's partition path with where the file slice actually lives so | ||
| // downstream HoodieAppendHandle's partition-path consistency check passes. | ||
| String physical = slice.getPartitionPath(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤖 Realigning the record to the physical bucket path makes MDT delta-commit write stats record |
||
| if (physical != null && !physical.equals(r.getPartitionPath())) { | ||
| r.getKey().setPartitionPath(physical); | ||
| } | ||
| r.seal(); | ||
| return r; | ||
| }; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -349,15 +349,27 @@ private Map<PartitionPath, List<FileSlice>> filterFiles(List<PartitionPath> part | |
| // API. Note that for COW table, the merging logic of two slices does not happen as there | ||
| // is no compaction, thus there is no performance impact. | ||
| HoodieTableFileSystemView finalFileSystemView = fileSystemView; | ||
| // For an MDT under a non-flat layout, the FS view indexes file slices by their physical | ||
| // partition (bucket sub-paths), but the partition list here uses logical partition names. | ||
| // Resolve via HoodieTableMetadataUtil which fans out across the layout's physical sub-paths. | ||
| final boolean isMdt = HoodieTableMetadata.isMetadataTable(basePath); | ||
| return partitions.stream().collect( | ||
| Collectors.toMap( | ||
| Function.identity(), | ||
| partitionPath -> | ||
| queryInstant.map(instant -> | ||
| finalFileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionPath.path, queryInstant.get()) | ||
| ) | ||
| .orElseGet(() -> finalFileSystemView.getLatestFileSlices(partitionPath.path)) | ||
| .collect(Collectors.toList()) | ||
| partitionPath -> { | ||
| if (isMdt) { | ||
| return queryInstant.isPresent() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤖 For an MDT time-travel query ( |
||
| ? HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metaClient, | ||
| finalFileSystemView, partitionPath.path) | ||
| : HoodieTableMetadataUtil.getPartitionLatestFileSlices(metaClient, | ||
| Option.of(finalFileSystemView), partitionPath.path); | ||
| } | ||
| return queryInstant.map(instant -> | ||
| finalFileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionPath.path, queryInstant.get()) | ||
| ) | ||
| .orElseGet(() -> finalFileSystemView.getLatestFileSlices(partitionPath.path)) | ||
| .collect(Collectors.toList()); | ||
| } | ||
| )); | ||
| } finally { | ||
| long elapsedMs = timer.endTimer(); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤖 This couples the engine-agnostic append handle to one layout's directory-naming convention through a string heuristic. Two concerns: (1) brittleness —
SubDirBucketedMDTLayout.bucketRelativePathuses%04d, so once the bucket index reaches 10000 the suffix becomes 5 digits,isMDTLayoutSubPathreturns false, and a.hoodie_partition_metadatamarker gets written inside the bucket dir — which would then surfacerecord_index/10000as a logical partition and break the central invariant; (2) layering — could the marker-skip decision be driven by the resolvedHoodieMetadataTableLayout(it already knows its scheme viagetPartitionMarkerPaths) instead of re-deriving "is this a sub-path" heuristically here?