diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 3b825bbf1ade7..4cb04827c8cc0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -249,12 +249,19 @@ private void init(HoodieRecord record) { Option fileSliceOpt = populateWriteStatAndFetchFileSlice(record, deltaWriteStat); averageRecordSize = sizeEstimator.sizeEstimate(record); try { - // Save hoodie partition meta in the partition path - HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(storage, instantTime, - new StoragePath(config.getBasePath()), - FSUtils.constructAbsolutePath(config.getBasePath(), partitionPath), - hoodieTable.getPartitionMetafileFormat()); - partitionMetadata.trySave(); + // Save hoodie partition meta in the partition path. For MDT writes under a non-flat layout + // (e.g., sub-directory bucketing), the physical partitionPath here is a layout sub-path + // (e.g. "record_index/0004") and the marker must live at the logical partition root instead. + // The marker at the logical root is written once at MDT initialization in + // HoodieBackedTableMetadataWriter.initializeFileGroups; skipping here keeps partition + // discovery returning logical names rather than per-bucket sub-paths. + if (!isMDTLayoutSubPath(partitionPath)) { + HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(storage, instantTime, + new StoragePath(config.getBasePath()), + FSUtils.constructAbsolutePath(config.getBasePath(), partitionPath), + hoodieTable.getPartitionMetafileFormat()); + partitionMetadata.trySave(); + } String instantTime = config.getWriteVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT) ? getInstantTimeForLogFile(record) : deltaWriteStat.getPrevCommit(); @@ -268,6 +275,37 @@ private void init(HoodieRecord record) { doInit = false; } + /** + * Returns true when this append handle is writing to a layout sub-path of an MDT partition + * (e.g. {@code record_index/0004} under sub-directory bucketing). In that case, the + * {@code .hoodie_partition_metadata} marker must NOT be created at the bucket level; it is + * written once at the logical partition root by the MDT initialization path. + * + *

Heuristic: an MDT partition path of the form {@code /} where + * {@code NNNN} is the standard 4-digit bucket name produced by the sub-directory bucketing + * layout. Third-party layouts using a different sub-path naming scheme can ship their own + * append-handle integration; the OSS-shipped layouts use this convention. + */ + private boolean isMDTLayoutSubPath(String physicalPartitionPath) { + if (!hoodieTable.isMetadataTable() || physicalPartitionPath == null) { + return false; + } + int slash = physicalPartitionPath.lastIndexOf('/'); + if (slash <= 0 || slash >= physicalPartitionPath.length() - 1) { + return false; + } + String last = physicalPartitionPath.substring(slash + 1); + if (last.length() != 4) { + return false; + } + for (int i = 0; i < 4; i++) { + if (!Character.isDigit(last.charAt(i))) { + return false; + } + } + return true; + } + /** * Returns the instant time to use in the log file name. */ diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 2131080a0ac2a..723c16ae91975 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -48,6 +48,7 @@ import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieIndexDefinition; import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; @@ -57,6 +58,7 @@ import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.schema.HoodieSchemaCache; import org.apache.hudi.common.schema.HoodieSchemaUtils; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.log.HoodieLogFormat; @@ -1163,14 +1165,14 @@ private List listAllPartitionsFromMDT(String initializationTime, } /** - * Initialize file groups for a partition. For file listing, we just have one file group. - *

- * All FileGroups for a given metadata partition has a fixed prefix as per the {@link MetadataPartitionType#getFileIdPrefix()}. - * Each file group is suffixed with 4 digits with increments of 1 starting with 0000. - *

- * Let's say we configure 10 file groups for record level index partition, and prefix as "record-index-bucket-" - * File groups will be named as : - * record-index-bucket-0000, .... -> ..., record-index-bucket-0009 + * Initialize file groups for an MDT partition under the configured layout. + * + *

FileIds are prefixed by {@link MetadataPartitionType#getFileIdPrefix()} and suffixed with a 4-digit + * file-group index starting at 0000. The on-disk directory for each file group is supplied by the + * configured {@link HoodieMetadataTableLayout}: the flat layout writes everything under the partition + * root; sub-directory bucketing writes them into bucket sub-directories. The persisted layout state + * (class + per-partition fileGroupCount + bucket size) is later used by readers to enumerate the same + * sub-paths without needing a filesystem listing. */ private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, MetadataPartitionType metadataPartition, String instantTime, int fileGroupCount, String relativePartitionPath, Option dataPartitionName) throws IOException { @@ -1184,22 +1186,30 @@ private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, Metadata // during initial commit, then the fileGroup would still be recognized (as a FileSlice with no baseFiles but a // valid logFile). Since these log files being created have no content, it is safe to add them here before // the bulkInsert. - final String msg = String.format("Creating %d file groups for partition %s with base fileId %s at instant time %s", - fileGroupCount, relativePartitionPath, metadataPartition.getFileIdPrefix(), instantTime); + HoodieMetadataTableLayout layout = resolveLayoutForMDTInit(); + final String msg = String.format("Creating %d file groups for partition %s with base fileId %s at instant time %s (layout=%s)", + fileGroupCount, relativePartitionPath, metadataPartition.getFileIdPrefix(), instantTime, layout.getLayoutId()); LOG.info(msg); - final List fileGroupFileIds = IntStream.range(0, fileGroupCount) - .mapToObj(i -> HoodieTableMetadataUtil.getFileIDForFileGroup(metadataPartition, i, relativePartitionPath, dataPartitionName)) + + // Pair each file group's fileId with the relative directory it should live in, computed via the layout. + final List> fileGroupIdAndPath = IntStream.range(0, fileGroupCount) + .mapToObj(i -> { + LayoutContext ctx = new LayoutContext(metadataPartition, i, fileGroupCount, dataPartitionName); + return Pair.of(layout.getFileId(ctx), layout.getFileGroupRelativePath(ctx)); + }) .collect(Collectors.toList()); - ValidationUtils.checkArgument(fileGroupFileIds.size() == fileGroupCount); + ValidationUtils.checkArgument(fileGroupIdAndPath.size() == fileGroupCount); engineContext.setJobStatus(this.getClass().getSimpleName(), msg); - engineContext.foreach(fileGroupFileIds, fileGroupFileId -> { + engineContext.foreach(fileGroupIdAndPath, p -> { + final String fileGroupFileId = p.getLeft(); + final String relativePath = p.getRight(); try { final Map blockHeader = Collections.singletonMap(HeaderMetadataType.INSTANT_TIME, instantTime); final HoodieDeleteBlock block = new HoodieDeleteBlock(Collections.emptyList(), blockHeader); try (HoodieLogFormat.Writer writer = HoodieLogFormatWriter.builder() - .withParentPath(FSUtils.constructAbsolutePath(metadataWriteConfig.getBasePath(), relativePartitionPath)) + .withParentPath(FSUtils.constructAbsolutePath(metadataWriteConfig.getBasePath(), relativePath)) .withLogFileId(fileGroupFileId) .withInstantTime(instantTime) .withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION) @@ -1212,9 +1222,110 @@ private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, Metadata writer.appendBlock(block); } } catch (InterruptedException e) { - throw new HoodieException(String.format("Failed to created fileGroup %s for partition %s", fileGroupFileId, relativePartitionPath), e); + throw new HoodieException(String.format("Failed to created fileGroup %s for partition %s", fileGroupFileId, relativePath), e); + } + }, fileGroupIdAndPath.size()); + + // For non-flat layouts, write .hoodie_partition_metadata at the logical partition root now, + // before the HoodieAppendHandle path (which would otherwise create one inside the bucket + // sub-dir). The AppendHandle skips marker creation at layout sub-paths via + // HoodieAppendHandle#isMDTLayoutSubPath, so this is the sole marker write under bucketing. + // For the flat default we leave marker creation to the AppendHandle and write nothing here — + // existing tables keep bit-identical behavior. + if (!FlatMDTLayout.LAYOUT_ID.equals(layout.getLayoutId())) { + for (String markerPath : layout.getPartitionMarkerPaths(relativePartitionPath, fileGroupCount)) { + HoodiePartitionMetadata marker = new HoodiePartitionMetadata( + dataMetaClient.getStorage(), + instantTime, + new StoragePath(metadataWriteConfig.getBasePath()), + FSUtils.constructAbsolutePath(metadataWriteConfig.getBasePath(), markerPath), + Option.empty()); + marker.trySave(); } - }, fileGroupFileIds.size()); + } + + // Persist layout state for this partition (skipped entirely for the flat default so existing + // tables get the identical on-disk and properties layout as before). When a non-flat layout is + // in use, readers consult this property to enumerate physical sub-paths without an FS listing. + if (metadataMetaClient != null && !FlatMDTLayout.LAYOUT_ID.equals(layout.getLayoutId())) { + maybePersistLayoutOnMDTInit(layout); + metadataMetaClient.getTableConfig().addMetadataLayoutPartitionFileGroupCounts( + metadataMetaClient, Collections.singletonMap(relativePartitionPath, fileGroupCount)); + } + } + + /** + * Resolve the layout class to use for MDT initialization. If the MDT's table config already records a + * layout (set on a prior partition's init within this MDT), use it — the MDT is uniform. Otherwise, + * honor the writer's {@code hoodie.metadata.layout.class} config; absent that, default to flat. + * + *

Throws {@link HoodieMetadataException} if the user requested a non-flat layout while the RLI is + * configured in the partitioned mode. The first-patch implementation of + * {@link SubDirBucketedMDTLayout} does not handle partitioned RLI; the user must either disable the + * layout config or switch the RLI to the global (non-partitioned) mode. + */ + private HoodieMetadataTableLayout resolveLayoutForMDTInit() { + HoodieMetadataTableLayout layout = resolveLayoutForMDTInitInternal(); + if (!FlatMDTLayout.LAYOUT_ID.equals(layout.getLayoutId()) + && dataWriteConfig.isRecordLevelIndexEnabled()) { + throw new HoodieMetadataException( + "MDT layout " + layout.getClass().getName() + " (id=" + layout.getLayoutId() + + ") cannot be used together with partitioned RLI. Either disable " + + "hoodie.metadata.layout.class or switch the RLI to the global (non-partitioned) " + + "mode. Partitioned-RLI support for sub-directory bucketing will be addressed in a " + + "follow-up patch / RFC."); + } + return layout; + } + + private HoodieMetadataTableLayout resolveLayoutForMDTInitInternal() { + if (metadataMetaClient != null) { + Option existing = metadataMetaClient.getTableConfig().getMetadataLayoutClass(); + if (existing.isPresent() && !existing.get().isEmpty()) { + return HoodieMetadataTableLayouts.load(metadataMetaClient.getTableConfig()); + } + } + Option requested = dataWriteConfig.getMetadataConfig().getMetadataLayoutClass(); + if (!requested.isPresent()) { + return new FlatMDTLayout(); + } + String cls = requested.get(); + if (FlatMDTLayout.class.getName().equals(cls)) { + return new FlatMDTLayout(); + } + if (SubDirBucketedMDTLayout.class.getName().equals(cls)) { + return new SubDirBucketedMDTLayout(dataWriteConfig.getMetadataConfig().getMetadataLayoutBucketSize()); + } + // Fall through to the central factory for any user-defined layout class. + if (metadataMetaClient != null) { + // Stash the class string on a transient view of the MDT config so the factory can find it. + metadataMetaClient.getTableConfig().setValue(HoodieTableConfig.METADATA_LAYOUT_CLASS, cls); + metadataMetaClient.getTableConfig().setValue(HoodieTableConfig.METADATA_LAYOUT_BUCKET_SIZE, + String.valueOf(dataWriteConfig.getMetadataConfig().getMetadataLayoutBucketSize())); + } + return HoodieMetadataTableLayouts.load(metadataMetaClient.getTableConfig()); + } + + /** + * Persist the layout class + bucket size onto the MDT's hoodie.properties on the first init that + * involves a non-flat layout. Idempotent for subsequent partition inits within the same MDT. + */ + private void maybePersistLayoutOnMDTInit(HoodieMetadataTableLayout layout) { + if (FlatMDTLayout.LAYOUT_ID.equals(layout.getLayoutId())) { + // Flat is the implicit default; persist nothing to keep on-disk layout bit-identical for + // tables that did not opt in. + return; + } + if (metadataMetaClient.getTableConfig().getMetadataLayoutClass().isPresent()) { + // Already persisted by an earlier partition's init. + return; + } + int bucketSize = dataWriteConfig.getMetadataConfig().getMetadataLayoutBucketSize(); + metadataMetaClient.getTableConfig().setMetadataLayout( + metadataMetaClient, + layout.getClass().getName(), + bucketSize, + Collections.emptyMap()); } void clearExistingMetadataPartition(String relativePartitionPath) throws IOException { @@ -2081,6 +2192,14 @@ private SerializableFunction getRecordTagger(String FileSlice slice = fileSlices.get(mappingFunction.apply(r.getRecordKey(), fileGroupCount)); r.unseal(); r.setCurrentLocation(new HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId())); + // Under layouts that place file groups in sub-directories (e.g. bucketing), the file slice's + // physical partition path may differ from the logical MDT partition name carried on the + // record. Realign the record's partition path with where the file slice actually lives so + // downstream HoodieAppendHandle's partition-path consistency check passes. + String physical = slice.getPartitionPath(); + if (physical != null && !physical.equals(r.getPartitionPath())) { + r.getKey().setPartitionPath(physical); + } r.seal(); return r; }; diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java index da30e0dd923ae..b33f78d6fe041 100644 --- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java @@ -349,15 +349,27 @@ private Map> filterFiles(List part // API. Note that for COW table, the merging logic of two slices does not happen as there // is no compaction, thus there is no performance impact. HoodieTableFileSystemView finalFileSystemView = fileSystemView; + // For an MDT under a non-flat layout, the FS view indexes file slices by their physical + // partition (bucket sub-paths), but the partition list here uses logical partition names. + // Resolve via HoodieTableMetadataUtil which fans out across the layout's physical sub-paths. + final boolean isMdt = HoodieTableMetadata.isMetadataTable(basePath); return partitions.stream().collect( Collectors.toMap( Function.identity(), - partitionPath -> - queryInstant.map(instant -> - finalFileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionPath.path, queryInstant.get()) - ) - .orElseGet(() -> finalFileSystemView.getLatestFileSlices(partitionPath.path)) - .collect(Collectors.toList()) + partitionPath -> { + if (isMdt) { + return queryInstant.isPresent() + ? HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metaClient, + finalFileSystemView, partitionPath.path) + : HoodieTableMetadataUtil.getPartitionLatestFileSlices(metaClient, + Option.of(finalFileSystemView), partitionPath.path); + } + return queryInstant.map(instant -> + finalFileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionPath.path, queryInstant.get()) + ) + .orElseGet(() -> finalFileSystemView.getLatestFileSlices(partitionPath.path)) + .collect(Collectors.toList()); + } )); } finally { long elapsedMs = timer.endTimer(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java index cd4d03ee4ee70..87f177e14506d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java @@ -687,6 +687,32 @@ public long getMaxLogFileSize() { return getLong(MAX_LOG_FILE_SIZE_BYTES_PROP); } + public static final ConfigProperty METADATA_LAYOUT_CLASS = ConfigProperty + .key("hoodie.metadata.layout.class") + .noDefaultValue() + .markAdvanced() + .sinceVersion("1.3.0") + .withDocumentation("Fully-qualified class name of the HoodieMetadataTableLayout implementation that organizes " + + "MDT file groups on disk. When unset, MDT uses the flat layout (file groups directly under each metadata " + + "partition). Applies only at MDT initialization; an MDT already on disk keeps its existing layout."); + + public static final ConfigProperty METADATA_LAYOUT_BUCKET_SIZE = ConfigProperty + .key("hoodie.metadata.layout.bucket.size") + .defaultValue(1000) + .markAdvanced() + .sinceVersion("1.3.0") + .withDocumentation("When the layout is SubDirBucketedMDTLayout, the maximum number of file groups per bucket " + + "sub-directory. Ignored for the flat layout. Default 1000."); + + public Option getMetadataLayoutClass() { + String cls = getString(METADATA_LAYOUT_CLASS); + return (cls == null || cls.isEmpty()) ? Option.empty() : Option.of(cls); + } + + public int getMetadataLayoutBucketSize() { + return getIntOrDefault(METADATA_LAYOUT_BUCKET_SIZE); + } + private HoodieMetadataConfig() { super(); } @@ -1349,6 +1375,16 @@ public Builder withAutoDeletePartitions(boolean autoDeletePartitions) { return this; } + public Builder withMetadataLayoutClass(String layoutClass) { + metadataConfig.setValue(METADATA_LAYOUT_CLASS, layoutClass); + return this; + } + + public Builder withMetadataLayoutBucketSize(int bucketSize) { + metadataConfig.setValue(METADATA_LAYOUT_BUCKET_SIZE, String.valueOf(bucketSize)); + return this; + } + public HoodieMetadataConfig build() { metadataConfig.setDefaultValue(ENABLE, getDefaultMetadataEnable(engineType)); metadataConfig.setDefaultValue(ENABLE_METADATA_INDEX_COLUMN_STATS, getDefaultColStatsEnable(engineType)); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index ca775136257c4..faaa0c801bd08 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -374,6 +374,29 @@ public static final String getDefaultPayloadClassName() { .sinceVersion("1.1.0") .withDocumentation("This property when set, will define how two versions of the record will be merged together when records are partially formed"); + public static final ConfigProperty METADATA_LAYOUT_CLASS = ConfigProperty + .key("hoodie.metadata.layout.class") + .noDefaultValue() + .sinceVersion("1.3.0") + .withDocumentation("Fully-qualified class name of the HoodieMetadataTableLayout implementation that organizes " + + "MDT file groups on disk. When unset, MDT uses the flat layout (file groups directly under each metadata " + + "partition). Set once at MDT initialization; immutable thereafter."); + + public static final ConfigProperty METADATA_LAYOUT_BUCKET_SIZE = ConfigProperty + .key("hoodie.metadata.layout.bucket.size") + .defaultValue(1000) + .sinceVersion("1.3.0") + .withDocumentation("Layout-specific: maximum number of file groups per bucket sub-directory when the layout is " + + "SubDirBucketedMDTLayout. Ignored otherwise."); + + public static final ConfigProperty METADATA_LAYOUT_PARTITION_FILE_GROUP_COUNTS = ConfigProperty + .key("hoodie.metadata.layout.partition.file.group.counts") + .noDefaultValue() + .sinceVersion("1.3.0") + .withDocumentation("Comma-separated map of MDT partition name to file-group count, e.g. " + + "\"record_index=2500,files=1,col_stats=10\". Persisted on the MDT at initialization so readers can " + + "compute physical sub-paths without performing a filesystem listing."); + public static final ConfigProperty URL_ENCODE_PARTITIONING = KeyGeneratorOptions.URL_ENCODE_PARTITIONING; public static final ConfigProperty HIVE_STYLE_PARTITIONING_ENABLE = KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE; public static final ConfigProperty SLASH_SEPARATED_DATE_PARTITIONING = KeyGeneratorOptions.SLASH_SEPARATED_DATE_PARTITIONING; @@ -1357,6 +1380,77 @@ public void clearMetadataPartitions(HoodieTableMetaClient metaClient) { setMetadataPartitionState(metaClient, MetadataPartitionType.FILES.getPartitionPath(), false); } + /** + * @return the FQCN of the configured MDT layout class, or empty if no layout was set + * (in which case the flat layout is the implicit default). + */ + public Option getMetadataLayoutClass() { + return Option.ofNullable(getString(METADATA_LAYOUT_CLASS)); + } + + /** + * @return the configured layout bucket size; only meaningful when the layout class is a + * sub-directory bucketing implementation. + */ + public int getMetadataLayoutBucketSize() { + return getIntOrDefault(METADATA_LAYOUT_BUCKET_SIZE); + } + + /** + * @return the persisted per-MDT-partition file-group counts, decoded from + * {@link #METADATA_LAYOUT_PARTITION_FILE_GROUP_COUNTS}. Empty when not set. + */ + public Map getMetadataLayoutPartitionFileGroupCounts() { + String raw = getString(METADATA_LAYOUT_PARTITION_FILE_GROUP_COUNTS); + if (raw == null || raw.isEmpty()) { + return Collections.emptyMap(); + } + Map result = new HashMap<>(); + for (String entry : raw.split(CONFIG_VALUES_DELIMITER)) { + int eq = entry.indexOf('='); + if (eq <= 0) { + continue; + } + result.put(entry.substring(0, eq), Integer.parseInt(entry.substring(eq + 1))); + } + return result; + } + + /** + * Persist the layout class and its per-partition file-group counts on this (MDT) table config. + * Intended to be called once at MDT initialization; the layout is immutable thereafter. + */ + public void setMetadataLayout(HoodieTableMetaClient metaClient, + String layoutClass, + int bucketSize, + Map partitionFileGroupCounts) { + setValue(METADATA_LAYOUT_CLASS, layoutClass); + setValue(METADATA_LAYOUT_BUCKET_SIZE, String.valueOf(bucketSize)); + String encoded = partitionFileGroupCounts.entrySet().stream() + .sorted(Map.Entry.comparingByKey()) + .map(e -> e.getKey() + "=" + e.getValue()) + .collect(Collectors.joining(CONFIG_VALUES_DELIMITER)); + setValue(METADATA_LAYOUT_PARTITION_FILE_GROUP_COUNTS, encoded); + update(metaClient.getStorage(), metaClient.getMetaPath(), getProps()); + } + + /** + * Append or update file-group counts for additional MDT partitions initialized after the first + * MDT bootstrap (e.g., a new index being enabled). Preserves existing entries and overwrites any + * key collision with the new count. + */ + public void addMetadataLayoutPartitionFileGroupCounts(HoodieTableMetaClient metaClient, + Map additionalCounts) { + Map merged = new HashMap<>(getMetadataLayoutPartitionFileGroupCounts()); + merged.putAll(additionalCounts); + String encoded = merged.entrySet().stream() + .sorted(Map.Entry.comparingByKey()) + .map(e -> e.getKey() + "=" + e.getValue()) + .collect(Collectors.joining(CONFIG_VALUES_DELIMITER)); + setValue(METADATA_LAYOUT_PARTITION_FILE_GROUP_COUNTS, encoded); + update(metaClient.getStorage(), metaClient.getMetaPath(), getProps()); + } + /** * Returns the format to use for partition meta files. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileIdInfo.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileIdInfo.java new file mode 100644 index 0000000000000..402481544f932 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileIdInfo.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata; + +import org.apache.hudi.common.util.Option; + +import java.io.Serializable; + +/** + * Parsed representation of an MDT fileId, produced by + * {@link HoodieMetadataTableLayout#parseFileId}. + */ +public final class FileIdInfo implements Serializable { + + private final int fileGroupIndex; + private final Option dataPartitionName; + + public FileIdInfo(int fileGroupIndex, Option dataPartitionName) { + this.fileGroupIndex = fileGroupIndex; + this.dataPartitionName = dataPartitionName; + } + + public int getFileGroupIndex() { + return fileGroupIndex; + } + + public Option getDataPartitionName() { + return dataPartitionName; + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java index 12b1c6757af2e..b96b5c5b62742 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java @@ -72,6 +72,12 @@ public class FileSystemBackedTableMetadata extends AbstractHoodieTableMetadata { private final String tableName; private final boolean hiveStylePartitioningEnabled; private final boolean urlEncodePartitioningEnabled; + // If this base path is an MDT, this is the MDT's configured layout. Resolves to FlatMDTLayout + // for every MDT that did not opt in. Null when the base path is not an MDT. + private final HoodieMetadataTableLayout mdtLayout; + // Per-MDT-partition file-group counts (sourced from the MDT's hoodie.properties). Empty for + // non-MDT base paths and for MDTs on the default flat layout. + private final Map mdtLayoutPartitionFileGroupCounts; public FileSystemBackedTableMetadata(HoodieEngineContext engineContext, HoodieTableConfig tableConfig, HoodieStorage storage, String datasetBasePath) { @@ -80,6 +86,13 @@ public FileSystemBackedTableMetadata(HoodieEngineContext engineContext, HoodieTa this.tableName = tableConfig.getTableName(); this.hiveStylePartitioningEnabled = Boolean.parseBoolean(tableConfig.getHiveStylePartitioningEnable()); this.urlEncodePartitioningEnabled = Boolean.parseBoolean(tableConfig.getUrlEncodePartitioning()); + if (HoodieTableMetadata.isMetadataTable(datasetBasePath)) { + this.mdtLayout = HoodieMetadataTableLayouts.load(tableConfig); + this.mdtLayoutPartitionFileGroupCounts = tableConfig.getMetadataLayoutPartitionFileGroupCounts(); + } else { + this.mdtLayout = null; + this.mdtLayoutPartitionFileGroupCounts = Collections.emptyMap(); + } } public FileSystemBackedTableMetadata(HoodieEngineContext engineContext, @@ -96,13 +109,44 @@ public FileSystemBackedTableMetadata(HoodieEngineContext engineContext, Boolean.parseBoolean(tableConfig.getHiveStylePartitioningEnable()); this.urlEncodePartitioningEnabled = Boolean.parseBoolean(tableConfig.getUrlEncodePartitioning()); + if (HoodieTableMetadata.isMetadataTable(datasetBasePath)) { + this.mdtLayout = HoodieMetadataTableLayouts.load(tableConfig); + this.mdtLayoutPartitionFileGroupCounts = tableConfig.getMetadataLayoutPartitionFileGroupCounts(); + } else { + this.mdtLayout = null; + this.mdtLayoutPartitionFileGroupCounts = Collections.emptyMap(); + } } @Override public List getAllFilesInPartition(StoragePath partitionPath) throws IOException { + // For an MDT under a non-flat layout, the logical partition (e.g., "record_index") contains + // bucket sub-directories rather than file groups directly. Fan out the listing across the + // layout-supplied sub-paths so direct Spark queries (and any caller that goes through + // FileSystemBackedTableMetadata) see the file groups under the partition. + if (mdtLayout != null) { + String logicalPartition = FSUtils.getRelativePartitionPath(dataBasePath, partitionPath); + List physicalSubPaths = expandPhysicalSubPaths(logicalPartition); + if (physicalSubPaths.size() > 1 || (physicalSubPaths.size() == 1 && !physicalSubPaths.get(0).equals(logicalPartition))) { + List all = new ArrayList<>(); + for (String sub : physicalSubPaths) { + StoragePath subPath = new StoragePath(dataBasePath, sub); + all.addAll(FSUtils.getAllDataFilesInPartition(getStorage(), subPath)); + } + return all; + } + } return FSUtils.getAllDataFilesInPartition(getStorage(), partitionPath); } + private List expandPhysicalSubPaths(String logicalPartition) { + if (mdtLayout == null) { + return Collections.singletonList(logicalPartition); + } + int fgCount = mdtLayoutPartitionFileGroupCounts.getOrDefault(logicalPartition, 0); + return mdtLayout.getPhysicalPartitions(logicalPartition, fgCount); + } + @Override public List getAllPartitionPaths() throws IOException { return getPartitionPathWithPathPrefixes(Collections.singletonList("")); @@ -269,10 +313,29 @@ public Map> getAllFilesInPartitions(Collection layoutCountsSnapshot = mdtLayoutPartitionFileGroupCounts; + final String basePathStr = dataBasePath.toString(); List>> partitionToFiles = engineContext.map(new ArrayList<>(partitionPaths), partitionPathStr -> { StoragePath partitionPath = new StoragePath(partitionPathStr); + if (layoutSnapshot != null) { + String logicalPartition = FSUtils.getRelativePartitionPath(new StoragePath(basePathStr), partitionPath); + int fgCount = layoutCountsSnapshot.getOrDefault(logicalPartition, 0); + List physicalSubPaths = layoutSnapshot.getPhysicalPartitions(logicalPartition, fgCount); + if (physicalSubPaths.size() > 1 + || (physicalSubPaths.size() == 1 && !physicalSubPaths.get(0).equals(logicalPartition))) { + List aggregated = new ArrayList<>(); + for (String sub : physicalSubPaths) { + StoragePath subPath = new StoragePath(basePathStr, sub); + aggregated.addAll(FSUtils.getAllDataFilesInPartitionByPathFilter(getStorage(), subPath, pathFilterOption)); + } + return Pair.of(partitionPathStr, aggregated); + } + } return Pair.of(partitionPathStr, FSUtils.getAllDataFilesInPartitionByPathFilter(getStorage(), partitionPath, pathFilterOption)); }, parallelism); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FlatMDTLayout.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FlatMDTLayout.java new file mode 100644 index 0000000000000..e81c355f31af6 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FlatMDTLayout.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata; + +import org.apache.hudi.common.util.Option; + +import java.util.Collections; +import java.util.List; + +/** + * Default MDT layout: every file group lives directly under its metadata + * partition directory, and a single {@code .hoodie_partition_metadata} marker + * lives at the partition root. This is the layout that existed before the + * layout SPI was introduced and remains the default for all tables that have + * not explicitly opted into a different layout. + */ +public final class FlatMDTLayout implements HoodieMetadataTableLayout { + + public static final String LAYOUT_ID = "flat"; + + @Override + public String getLayoutId() { + return LAYOUT_ID; + } + + @Override + public String getFileGroupRelativePath(LayoutContext ctx) { + return ctx.getPartitionType().getPartitionPath(); + } + + @Override + public String getFileId(LayoutContext ctx) { + return HoodieTableMetadataUtil.getFileIDForFileGroup( + ctx.getPartitionType(), + ctx.getFileGroupIndex(), + ctx.getPartitionType().getPartitionPath(), + ctx.getDataPartitionName()); + } + + @Override + public FileIdInfo parseFileId(MetadataPartitionType partitionType, String fileId) { + int idx = HoodieTableMetadataUtil.getFileGroupIndexFromFileId(fileId); + return new FileIdInfo(idx, Option.empty()); + } + + @Override + public List getPhysicalPartitions(String logicalPartition, int fileGroupCount) { + return Collections.singletonList(logicalPartition); + } + + @Override + public List getPartitionMarkerPaths(String logicalPartition, int fileGroupCount) { + return Collections.singletonList(logicalPartition); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataTableLayout.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataTableLayout.java new file mode 100644 index 0000000000000..4eda80b19e06f --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataTableLayout.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata; + +import java.io.Serializable; +import java.util.List; + +/** + * Strategy that controls how MDT file groups are organized on disk. + * + *

The layout is selected at MDT initialization via the + * {@code hoodie.metadata.layout.class} table property and is immutable for + * the lifetime of the MDT. Implementations must be safe to serialize across + * executors and must be deterministic for given inputs. + * + *

Two implementations ship in OSS: + *

+ * + *

Third parties may ship custom implementations and wire them in via the + * {@code hoodie.metadata.layout.class} property. + */ +public interface HoodieMetadataTableLayout extends Serializable { + + /** + * Stable identifier for this layout. Persisted on the MDT at init time and + * asserted on every open as a guard against the configured class being + * swapped to one with mismatched on-disk semantics. + */ + String getLayoutId(); + + /** + * Return the path (relative to the MDT base path) where the file group + * described by {@code ctx} should live. + * + *

For the flat layout this is always the partition's path. For + * sub-directory bucketing this includes the bucket sub-directory. + */ + String getFileGroupRelativePath(LayoutContext ctx); + + /** + * Return the fileId to use for the file group described by {@code ctx}. + * + *

Most layouts will delegate to + * {@link HoodieTableMetadataUtil#getFileIDForFileGroup} since the fileId + * scheme is independent of the on-disk directory layout. + */ + String getFileId(LayoutContext ctx); + + /** + * Inverse of {@link #getFileId}: given a fileId observed on disk, recover + * its global file-group index and (for partitioned RLI) its data-partition + * name. + */ + FileIdInfo parseFileId(MetadataPartitionType partitionType, String fileId); + + /** + * Given a logical MDT partition name and the total number of file groups + * known to live under it, return the list of sub-paths to scan to discover + * all file groups. + * + *

For the flat layout this is {@code [logicalPartition]}. For + * sub-directory bucketing this returns the list of bucket directories + * (e.g. {@code ["record_index/0000", "record_index/0001", ...]}). + * + *

The {@code fileGroupCount} is supplied by the caller (sourced from MDT + * properties) so this method must not perform any filesystem listing. + */ + List getPhysicalPartitions(String logicalPartition, int fileGroupCount); + + /** + * Return the paths (relative to the MDT base path) where + * {@code .hoodie_partition_metadata} markers must be written for this + * logical partition. + * + *

To preserve MDT-as-Hudi-table semantics, layouts should place a single + * marker at the logical partition root, never inside a bucket + * sub-directory. + */ + List getPartitionMarkerPaths(String logicalPartition, int fileGroupCount); +} diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataTableLayouts.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataTableLayouts.java new file mode 100644 index 0000000000000..8390ff38e7787 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataTableLayouts.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata; + +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.exception.HoodieMetadataException; + +/** + * Factory for resolving the {@link HoodieMetadataTableLayout} from an MDT's + * persisted {@link HoodieTableConfig}. + * + *

Resolution rules: + *

    + *
  1. If {@link HoodieTableConfig#METADATA_LAYOUT_CLASS} is unset, return + * {@link FlatMDTLayout}. This is the case for every existing MDT and + * preserves backward compatibility.
  2. + *
  3. If set to the FQCN of {@link FlatMDTLayout} or + * {@link SubDirBucketedMDTLayout}, the corresponding built-in instance is + * returned. {@code SubDirBucketedMDTLayout} reads its bucket size from + * {@link HoodieTableConfig#METADATA_LAYOUT_BUCKET_SIZE}.
  4. + *
  5. Otherwise, the class is loaded reflectively. The class must either + * have a no-arg constructor or a constructor that accepts a single + * {@link HoodieTableConfig}.
  6. + *
+ */ +public final class HoodieMetadataTableLayouts { + + private HoodieMetadataTableLayouts() { + } + + /** + * Resolve the layout for the given MDT table config. + * + * @param mdtConfig the {@link HoodieTableConfig} of the MDT (not the data table) + */ + public static HoodieMetadataTableLayout load(HoodieTableConfig mdtConfig) { + Option layoutClass = mdtConfig.getMetadataLayoutClass(); + if (!layoutClass.isPresent() || layoutClass.get().isEmpty()) { + return new FlatMDTLayout(); + } + String cls = layoutClass.get(); + if (FlatMDTLayout.class.getName().equals(cls)) { + return new FlatMDTLayout(); + } + if (SubDirBucketedMDTLayout.class.getName().equals(cls)) { + return new SubDirBucketedMDTLayout(mdtConfig.getMetadataLayoutBucketSize()); + } + try { + Object instance; + if (ReflectionUtils.hasConstructor(cls, new Class[] {HoodieTableConfig.class}, true)) { + instance = ReflectionUtils.loadClass(cls, new Class[] {HoodieTableConfig.class}, mdtConfig); + } else { + instance = ReflectionUtils.loadClass(cls); + } + if (!(instance instanceof HoodieMetadataTableLayout)) { + throw new HoodieMetadataException("Configured MDT layout class " + cls + + " is not a HoodieMetadataTableLayout"); + } + return (HoodieMetadataTableLayout) instance; + } catch (Exception e) { + throw new HoodieMetadataException("Failed to load MDT layout class " + cls + + ". The MDT was initialized with this layout class; the same class must be available " + + "on the classpath of any writer or reader opening the table.", e); + } + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 0ede3ae5015e5..2a5664f3f7e9d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -1514,27 +1514,52 @@ private static List getPartitionFileSlices(HoodieTableMetaClient meta HoodieTableFileSystemView fsView = null; try { fsView = fileSystemView.orElseGet(() -> getFileSystemViewForMetadataTable(metaClient)); - Stream fileSliceStream; - if (mergeFileSlices) { - if (metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().isPresent()) { - fileSliceStream = fsView.getLatestMergedFileSlicesBeforeOrOn( - // including pending compaction instant as the last instant so that the finished delta commits - // that start earlier than the compaction can be queried. - partition, metaClient.getActiveTimeline().filterCompletedAndCompactionInstants().lastInstant().get().requestedTime()); + List physicalPartitions = resolvePhysicalPartitions(metaClient, partition); + List all = new ArrayList<>(); + boolean any = false; + for (String physical : physicalPartitions) { + Stream fileSliceStream; + if (mergeFileSlices) { + if (metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().isPresent()) { + fileSliceStream = fsView.getLatestMergedFileSlicesBeforeOrOn( + // including pending compaction instant as the last instant so that the finished delta commits + // that start earlier than the compaction can be queried. + physical, metaClient.getActiveTimeline().filterCompletedAndCompactionInstants().lastInstant().get().requestedTime()); + } else { + return Collections.emptyList(); + } } else { - return Collections.emptyList(); + fileSliceStream = fsView.getLatestFileSlices(physical); } - } else { - fileSliceStream = fsView.getLatestFileSlices(partition); + fileSliceStream.forEach(all::add); + any = true; + } + if (!any) { + return Collections.emptyList(); } - return fileSliceStream.sorted(Comparator.comparing(FileSlice::getFileId)).collect(Collectors.toList()); + all.sort(Comparator.comparing(FileSlice::getFileId)); + return all; } finally { - if (!fileSystemView.isPresent()) { + if (!fileSystemView.isPresent() && fsView != null) { fsView.close(); } } } + /** + * Resolve the physical sub-paths to scan for the given MDT partition under the configured + * layout. Layout-aware: for the flat layout this returns {@code [partition]}; for sub-directory + * bucketing it returns one entry per bucket directory. {@code fileGroupCount} is sourced from + * the MDT's persisted layout state — this method does not perform any filesystem listing. + */ + private static List resolvePhysicalPartitions(HoodieTableMetaClient metaClient, String partition) { + HoodieMetadataTableLayout layout = HoodieMetadataTableLayouts.load(metaClient.getTableConfig()); + int fgCount = metaClient.getTableConfig() + .getMetadataLayoutPartitionFileGroupCounts() + .getOrDefault(partition, 0); + return layout.getPhysicalPartitions(partition, fgCount); + } + /** * Get the latest file slices for a given partition including the inflight ones. * @@ -1549,10 +1574,13 @@ public static List getPartitionLatestFileSlicesIncludingInflight(Hood HoodieTableFileSystemView fsView = null; try { fsView = fileSystemView.orElseGet(() -> getFileSystemViewForMetadataTable(metaClient)); - Stream fileSliceStream = fsView.getLatestFileSlicesIncludingInflight(partition); - return fileSliceStream - .sorted(Comparator.comparing(FileSlice::getFileId)) - .collect(Collectors.toList()); + List physicalPartitions = resolvePhysicalPartitions(metaClient, partition); + List all = new ArrayList<>(); + for (String physical : physicalPartitions) { + fsView.getLatestFileSlicesIncludingInflight(physical).forEach(all::add); + } + all.sort(Comparator.comparing(FileSlice::getFileId)); + return all; } finally { if (!fileSystemView.isPresent() && fsView != null) { fsView.close(); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/LayoutContext.java b/hudi-common/src/main/java/org/apache/hudi/metadata/LayoutContext.java new file mode 100644 index 0000000000000..ebecac877e19f --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/LayoutContext.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata; + +import org.apache.hudi.common.util.Option; + +import java.io.Serializable; + +/** + * Per-file-group context handed to a {@link HoodieMetadataTableLayout}. + * + *

Carries everything a layout needs to compute the on-disk relative path + * and fileId for a single file group: the metadata partition type, the global + * file-group index, the total number of file groups in that partition, and + * the data-table partition name (set only for partitioned RLI). + */ +public final class LayoutContext implements Serializable { + + private final MetadataPartitionType partitionType; + private final int fileGroupIndex; + private final int fileGroupCount; + private final Option dataPartitionName; + + public LayoutContext(MetadataPartitionType partitionType, + int fileGroupIndex, + int fileGroupCount, + Option dataPartitionName) { + this.partitionType = partitionType; + this.fileGroupIndex = fileGroupIndex; + this.fileGroupCount = fileGroupCount; + this.dataPartitionName = dataPartitionName; + } + + public MetadataPartitionType getPartitionType() { + return partitionType; + } + + public int getFileGroupIndex() { + return fileGroupIndex; + } + + public int getFileGroupCount() { + return fileGroupCount; + } + + public Option getDataPartitionName() { + return dataPartitionName; + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/SubDirBucketedMDTLayout.java b/hudi-common/src/main/java/org/apache/hudi/metadata/SubDirBucketedMDTLayout.java new file mode 100644 index 0000000000000..4785cf6a801df --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/SubDirBucketedMDTLayout.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata; + +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieMetadataException; +import org.apache.hudi.storage.StoragePath; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Opt-in MDT layout that distributes file groups into bucket sub-directories so a + * single MDT partition directory does not exceed per-directory file-count limits + * common on HDFS-style filesystems. + * + *

On-disk shape for {@code record_index} with {@code fileGroupCount=2500} and + * {@code bucketSize=1000}: + *

+ *   .hoodie/metadata/record_index/
+ *   ├── .hoodie_partition_metadata               ← single marker at the partition root
+ *   ├── 0000/
+ *   │   ├── .record-index-0000-0_<instant>.log
+ *   │   ├── ...
+ *   │   └── .record-index-0999-0_<instant>.log
+ *   ├── 0001/
+ *   │   └── ...
+ *   └── 0002/
+ *       └── ... (partial)
+ * 
+ * + *

Notes: + *

    + *
  • The fileId scheme is unchanged. The bucket for a given file group is + * recoverable from the file-group index, which is itself encoded in the + * fileId. No additional state is required to map a fileId to its bucket.
  • + *
  • A single {@code .hoodie_partition_metadata} marker lives at the logical + * MDT partition root. None are written inside bucket sub-directories. This + * preserves MDT-as-Hudi-table semantics so {@code FSUtils.getAllPartitionPaths} + * on the MDT returns logical partition names rather than physical bucket + * paths.
  • + *
  • Reads enumerate the physical sub-paths via + * {@link #getPhysicalPartitions(String, int)} sourced from the MDT's persisted + * {@code hoodie.metadata.layout.partition.file.group.counts} property. No + * filesystem listing is needed on the read path.
  • + *
+ * + *

Scope: this initial implementation supports the non-partitioned MDT + * partitions (files, column_stats, bloom_filters, expression_index, + * secondary_index, and the global RLI). Partitioned RLI is rejected up front: if + * a writer enables this layout on a table whose RLI is in the partitioned mode, + * {@link #getFileGroupRelativePath} throws {@link HoodieMetadataException}. + * Partitioned-RLI bucketing requires a different growth model and lands in a + * separate follow-up. + */ +public final class SubDirBucketedMDTLayout implements HoodieMetadataTableLayout { + + public static final String LAYOUT_ID = "subdir-bucketed"; + public static final int DEFAULT_BUCKET_SIZE = 1000; + + private final int bucketSize; + + public SubDirBucketedMDTLayout() { + this(DEFAULT_BUCKET_SIZE); + } + + public SubDirBucketedMDTLayout(int bucketSize) { + ValidationUtils.checkArgument(bucketSize > 0, + "SubDirBucketedMDTLayout bucketSize must be > 0, got " + bucketSize); + this.bucketSize = bucketSize; + } + + @Override + public String getLayoutId() { + return LAYOUT_ID; + } + + public int getBucketSize() { + return bucketSize; + } + + @Override + public String getFileGroupRelativePath(LayoutContext ctx) { + if (ctx.getDataPartitionName().isPresent()) { + throw new HoodieMetadataException( + "SubDirBucketedMDTLayout does not support partitioned RLI. The MDT was opened with " + + "a data-partition-keyed file group for partition '" + + ctx.getPartitionType().getPartitionPath() + "' (data partition '" + + ctx.getDataPartitionName().get() + "'). Disable hoodie.metadata.layout.class " + + "or switch the RLI to the global (non-partitioned) mode."); + } + int bucket = ctx.getFileGroupIndex() / bucketSize; + return bucketRelativePath(ctx.getPartitionType().getPartitionPath(), bucket); + } + + @Override + public String getFileId(LayoutContext ctx) { + // FileId scheme is bucket-independent. The bucket is recoverable from the + // file-group index, which is encoded in the fileId itself. + return HoodieTableMetadataUtil.getFileIDForFileGroup( + ctx.getPartitionType(), + ctx.getFileGroupIndex(), + ctx.getPartitionType().getPartitionPath(), + ctx.getDataPartitionName()); + } + + @Override + public FileIdInfo parseFileId(MetadataPartitionType partitionType, String fileId) { + int idx = HoodieTableMetadataUtil.getFileGroupIndexFromFileId(fileId); + return new FileIdInfo(idx, Option.empty()); + } + + @Override + public List getPhysicalPartitions(String logicalPartition, int fileGroupCount) { + if (fileGroupCount <= 0) { + // Caller has no recorded count for this logical partition (e.g., the partition + // is not yet initialized or is being read by an early-init code path). Fall + // back to the partition root; the FS view will return zero file groups, which + // is the correct empty-state for callers that pre-check existence themselves. + return Collections.singletonList(logicalPartition); + } + int numBuckets = (int) Math.ceil((double) fileGroupCount / bucketSize); + List paths = new ArrayList<>(numBuckets); + for (int b = 0; b < numBuckets; b++) { + paths.add(bucketRelativePath(logicalPartition, b)); + } + return paths; + } + + @Override + public List getPartitionMarkerPaths(String logicalPartition, int fileGroupCount) { + // Single marker at the logical partition root. Never inside a bucket dir; + // that is the central correctness property that lets partition discovery on + // the MDT still return logical names rather than physical sub-paths. + return Collections.singletonList(logicalPartition); + } + + private static String bucketRelativePath(String partitionRelativePath, int bucketIndex) { + return String.format("%s%s%04d", partitionRelativePath, StoragePath.SEPARATOR, bucketIndex); + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataTableLayout.java b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataTableLayout.java new file mode 100644 index 0000000000000..d6d74def78354 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataTableLayout.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata; + +import org.apache.hudi.common.util.Option; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Unit tests for {@link HoodieMetadataTableLayout} implementations. Validates that the layout + * abstraction yields the expected on-disk paths and fileIds for both flat and sub-directory + * bucketed layouts, including the partitioned-RLI fallback. + */ +class TestHoodieMetadataTableLayout { + + // ---- FlatMDTLayout -------------------------------------------------------- + + @Test + void flatLayout_returnsPartitionRootAsRelativePath() { + HoodieMetadataTableLayout layout = new FlatMDTLayout(); + LayoutContext ctx = new LayoutContext(MetadataPartitionType.RECORD_INDEX, 1500, 2500, Option.empty()); + assertEquals("record_index", layout.getFileGroupRelativePath(ctx)); + assertEquals("record-index-1500-0", layout.getFileId(ctx)); + } + + @Test + void flatLayout_physicalPartitionsIsSingleton() { + HoodieMetadataTableLayout layout = new FlatMDTLayout(); + assertEquals(java.util.Collections.singletonList("record_index"), + layout.getPhysicalPartitions("record_index", 2500)); + } + + @Test + void flatLayout_markerAtPartitionRoot() { + HoodieMetadataTableLayout layout = new FlatMDTLayout(); + assertEquals(java.util.Collections.singletonList("record_index"), + layout.getPartitionMarkerPaths("record_index", 2500)); + } + + @Test + void flatLayout_layoutIdIsStable() { + assertEquals("flat", new FlatMDTLayout().getLayoutId()); + } + + // ---- SubDirBucketedMDTLayout --------------------------------------------- + + @Test + void bucketedLayout_pathDerivedFromFileGroupIndex() { + HoodieMetadataTableLayout layout = new SubDirBucketedMDTLayout(1000); + assertEquals("record_index/0000", + layout.getFileGroupRelativePath(new LayoutContext(MetadataPartitionType.RECORD_INDEX, 0, 2500, Option.empty()))); + assertEquals("record_index/0000", + layout.getFileGroupRelativePath(new LayoutContext(MetadataPartitionType.RECORD_INDEX, 999, 2500, Option.empty()))); + assertEquals("record_index/0001", + layout.getFileGroupRelativePath(new LayoutContext(MetadataPartitionType.RECORD_INDEX, 1000, 2500, Option.empty()))); + assertEquals("record_index/0001", + layout.getFileGroupRelativePath(new LayoutContext(MetadataPartitionType.RECORD_INDEX, 1500, 2500, Option.empty()))); + assertEquals("record_index/0002", + layout.getFileGroupRelativePath(new LayoutContext(MetadataPartitionType.RECORD_INDEX, 2499, 2500, Option.empty()))); + } + + @Test + void bucketedLayout_fileIdSchemeUnchanged() { + HoodieMetadataTableLayout layout = new SubDirBucketedMDTLayout(1000); + // FileId encoding must be bucket-independent so that bucket = fileGroupIndex / bucketSize is + // recoverable from the fileId itself. + assertEquals("record-index-0000-0", + layout.getFileId(new LayoutContext(MetadataPartitionType.RECORD_INDEX, 0, 2500, Option.empty()))); + assertEquals("record-index-1500-0", + layout.getFileId(new LayoutContext(MetadataPartitionType.RECORD_INDEX, 1500, 2500, Option.empty()))); + assertEquals("record-index-2499-0", + layout.getFileId(new LayoutContext(MetadataPartitionType.RECORD_INDEX, 2499, 2500, Option.empty()))); + } + + @ParameterizedTest + @ValueSource(ints = {1, 100, 1000, 1024}) + void bucketedLayout_physicalPartitionsHonorBucketSize(int bucketSize) { + SubDirBucketedMDTLayout layout = new SubDirBucketedMDTLayout(bucketSize); + // Exactly N full buckets. + List exact = layout.getPhysicalPartitions("record_index", bucketSize * 3); + assertEquals(3, exact.size()); + assertEquals("record_index/0000", exact.get(0)); + assertEquals("record_index/0001", exact.get(1)); + assertEquals("record_index/0002", exact.get(2)); + + // N+1 file groups → N+1 buckets (partial last). + List partial = layout.getPhysicalPartitions("record_index", bucketSize * 3 + 1); + assertEquals(4, partial.size()); + assertEquals("record_index/0003", partial.get(3)); + + // Fewer than bucketSize → one bucket. + List small = layout.getPhysicalPartitions("record_index", Math.max(1, bucketSize / 2)); + assertEquals(1, small.size()); + } + + @Test + void bucketedLayout_emptyPartitionReturnsRootOnly() { + // fileGroupCount=0 means the layout has nothing persisted for that partition; the caller is + // expected to fall back to the partition root (e.g., partitioned RLI on read). + HoodieMetadataTableLayout layout = new SubDirBucketedMDTLayout(1000); + assertEquals(java.util.Collections.singletonList("record_index"), + layout.getPhysicalPartitions("record_index", 0)); + } + + @Test + void bucketedLayout_markerOnlyAtPartitionRoot() { + // Central correctness property: never write .hoodie_partition_metadata inside a bucket dir. + HoodieMetadataTableLayout layout = new SubDirBucketedMDTLayout(1000); + List markers = layout.getPartitionMarkerPaths("record_index", 2500); + assertEquals(1, markers.size()); + assertEquals("record_index", markers.get(0)); + } + + @Test + void bucketedLayout_rejectsPartitionedRLI() { + // Partitioned RLI is explicitly unsupported in this initial implementation. Invoking the layout + // with a data partition present must fail loudly rather than silently produce a flat path; the + // partitioned-RLI growth model needs a distinct strategy that lands in a follow-up patch / RFC. + HoodieMetadataTableLayout layout = new SubDirBucketedMDTLayout(1000); + LayoutContext ctx = new LayoutContext(MetadataPartitionType.RECORD_INDEX, 3, 4, Option.of("p2")); + org.apache.hudi.exception.HoodieMetadataException ex = assertThrows( + org.apache.hudi.exception.HoodieMetadataException.class, + () -> layout.getFileGroupRelativePath(ctx)); + assertTrue(ex.getMessage().contains("partitioned RLI"), + "exception message should call out partitioned RLI unsupported: " + ex.getMessage()); + } + + @Test + void bucketedLayout_rejectsZeroOrNegativeBucketSize() { + assertThrows(IllegalArgumentException.class, () -> new SubDirBucketedMDTLayout(0)); + assertThrows(IllegalArgumentException.class, () -> new SubDirBucketedMDTLayout(-1)); + } + + @Test + void bucketedLayout_parseFileIdRoundTrip() { + HoodieMetadataTableLayout layout = new SubDirBucketedMDTLayout(1000); + for (int idx : new int[] {0, 1, 999, 1000, 1500, 2499}) { + LayoutContext ctx = new LayoutContext(MetadataPartitionType.RECORD_INDEX, idx, 2500, Option.empty()); + String fileId = layout.getFileId(ctx); + FileIdInfo info = layout.parseFileId(MetadataPartitionType.RECORD_INDEX, fileId); + assertEquals(idx, info.getFileGroupIndex(), "round-trip fileId for index " + idx); + } + } + + @Test + void layoutIdsAreDistinct() { + assertTrue(!new FlatMDTLayout().getLayoutId().equals(new SubDirBucketedMDTLayout(1).getLayoutId())); + } +} diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java index 70567b9337d36..7e2313ea9387f 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java @@ -384,7 +384,7 @@ void testDropInvalidConfigs() { @Test void testDefinedTableConfigs() { List> configProperties = HoodieTableConfig.definedTableConfigs(); - assertEquals(44, configProperties.size()); + assertEquals(47, configProperties.size()); configProperties.forEach(c -> { assertNotNull(c); assertFalse(c.doc().isEmpty()); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala index 5c6919a280693..6f3c0b60c8c52 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala @@ -119,8 +119,13 @@ class RecordLevelIndexTestBase extends HoodieStatsIndexTestBase { } protected def getFileGroupCountForRecordIndex(writeConfig: HoodieWriteConfig): Long = { + // Route through HoodieTableMetadataUtil so we pick up bucketed sub-paths under the configured + // MDT layout (flat returns the partition root only; bucketed fans out across bucket dirs). Using(getHoodieTable(metaClient, writeConfig).getTableMetadata.asInstanceOf[HoodieBackedTableMetadata]) { metadataTable => - metadataTable.getMetadataFileSystemView.getAllFileGroups(MetadataPartitionType.RECORD_INDEX.getPartitionPath).count + HoodieTableMetadataUtil.getPartitionLatestFileSlices( + metadataTable.getMetadataMetaClient, + org.apache.hudi.common.util.Option.of(metadataTable.getMetadataFileSystemView), + MetadataPartitionType.RECORD_INDEX.getPartitionPath).size.toLong }.get } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMDTLayoutBucketing.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMDTLayoutBucketing.scala new file mode 100644 index 0000000000000..d54acbcba58c0 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMDTLayoutBucketing.scala @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.metadata.{FlatMDTLayout, HoodieTableMetadata, MetadataPartitionType, SubDirBucketedMDTLayout} +import org.apache.hudi.storage.StoragePath + +import org.apache.spark.sql.SaveMode +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotNull, assertTrue} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource + +import scala.collection.JavaConverters._ + +/** + * Validates that the MDT layout SPI works end-to-end for the two OSS-shipped implementations: + * + * - {@link FlatMDTLayout} — today's behavior, file groups directly under each MDT partition. + * - {@link SubDirBucketedMDTLayout} — file groups grouped into bucket sub-directories. + * + * The same workload is run under both layouts and the MDT contract is checked: + * + * - RLI lookups return identical results under both layouts. + * - Logical MDT partitions are discoverable as Hudi partitions regardless of bucketing + * ({@code FSUtils.getAllPartitionPaths} returns {@code [files, record_index, ...]}, NOT bucket + * paths). This is the central correctness property we are protecting. + * - Direct Spark queries on the MDT path return non-empty results under both layouts. + * - When bucketing is enabled, the on-disk structure actually uses bucket sub-directories. + */ +class TestMDTLayoutBucketing extends RecordLevelIndexTestBase { + + /** + * @param layoutClass FQCN of the layout to test; null means do not override (flat default). + * @param bucketSize Bucket size to set when using sub-directory bucketing. Ignored otherwise. + */ + private def layoutOpts(layoutClass: String, bucketSize: Int): Map[String, String] = { + if (layoutClass == null) { + Map.empty + } else { + Map( + HoodieMetadataConfig.METADATA_LAYOUT_CLASS.key -> layoutClass, + HoodieMetadataConfig.METADATA_LAYOUT_BUCKET_SIZE.key -> bucketSize.toString) + } + } + + @ParameterizedTest + @ValueSource(strings = Array( + "org.apache.hudi.metadata.FlatMDTLayout", + "org.apache.hudi.metadata.SubDirBucketedMDTLayout")) + def testRecordLevelIndexWritesAndLookupsAcrossLayouts(layoutClass: String): Unit = { + // Force a small bucket size so even a modest workload exercises >1 buckets under the bucketed + // layout. The flat layout ignores bucketSize. + val opts = commonOpts ++ layoutOpts(layoutClass, bucketSize = 2) + + // Bootstrap MDT + RLI with an INSERT. + doWriteAndValidateDataAndRecordIndex(opts, INSERT_OPERATION_OPT_VAL, SaveMode.Overwrite) + // A couple of UPSERTs to exercise reads against initialized file groups. + doWriteAndValidateDataAndRecordIndex(opts, UPSERT_OPERATION_OPT_VAL, SaveMode.Append) + doWriteAndValidateDataAndRecordIndex(opts, UPSERT_OPERATION_OPT_VAL, SaveMode.Append) + + metaClient = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build() + + // Open MDT metaClient to inspect persisted layout state. + val mdtBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath) + val mdtMetaClient = HoodieTableMetaClient.builder() + .setBasePath(mdtBasePath).setConf(storageConf).build() + + if (layoutClass == classOf[FlatMDTLayout].getName) { + // Flat layout must not persist a layout class — the default is implicit, and existing tables + // (with no layout property) must continue to behave identically. + assertFalse(mdtMetaClient.getTableConfig.getMetadataLayoutClass.isPresent, + "flat layout must not persist hoodie.metadata.layout.class") + } else { + assertTrue(mdtMetaClient.getTableConfig.getMetadataLayoutClass.isPresent, + "non-flat layout must persist hoodie.metadata.layout.class") + assertEquals(layoutClass, mdtMetaClient.getTableConfig.getMetadataLayoutClass.get) + assertTrue(mdtMetaClient.getTableConfig.getMetadataLayoutPartitionFileGroupCounts.asScala.nonEmpty, + "non-flat layout must persist per-partition file-group counts") + } + + // Central correctness property: partition discovery on the MDT must return logical names + // regardless of bucketing. + val mdtPartitions = FSUtils.getAllPartitionPaths( + context, mdtMetaClient, /* assumeDatePartitioning */ false).asScala.toSet + assertTrue(mdtPartitions.contains(MetadataPartitionType.FILES.getPartitionPath), + s"MDT must expose files partition; got: $mdtPartitions") + assertTrue(mdtPartitions.contains(MetadataPartitionType.RECORD_INDEX.getPartitionPath), + s"MDT must expose record_index partition; got: $mdtPartitions") + // None of the returned partitions should look like a bucket sub-path (4 digits at the end). + val bucketLike = mdtPartitions.filter(p => p.matches(".*/[0-9]{4}$")) + assertTrue(bucketLike.isEmpty, + s"MDT partition discovery must not expose bucket sub-paths as logical partitions; got bucket-like: $bucketLike") + + // For the bucketed layout, verify the on-disk structure actually uses sub-directories. + if (layoutClass == classOf[SubDirBucketedMDTLayout].getName) { + val recordIndexDir = new StoragePath(mdtBasePath, MetadataPartitionType.RECORD_INDEX.getPartitionPath) + val children = mdtMetaClient.getStorage.listDirectEntries(recordIndexDir).asScala + val bucketDirs = children.filter(_.isDirectory) + assertTrue(bucketDirs.nonEmpty, + s"bucketed layout must produce at least one bucket sub-directory under record_index; got children=${children.map(_.getPath.getName)}") + // Each bucket dir must be %04d-formatted. + bucketDirs.foreach { d => + val name = d.getPath.getName + assertTrue(name.matches("[0-9]{4}"), + s"bucket sub-directory name must be %04d-formatted, got: $name") + } + // Marker must NOT live inside a bucket dir — it must live at the partition root. + bucketDirs.foreach { d => + val markerInsideBucket = new StoragePath(d.getPath, ".hoodie_partition_metadata") + assertFalse(mdtMetaClient.getStorage.exists(markerInsideBucket), + s".hoodie_partition_metadata must not exist inside bucket dir ${d.getPath}") + } + val markerAtRoot = new StoragePath(recordIndexDir, ".hoodie_partition_metadata") + assertTrue(mdtMetaClient.getStorage.exists(markerAtRoot), + s".hoodie_partition_metadata must exist at the logical partition root: $markerAtRoot") + } + + // Direct Spark query against the MDT path must return at least one row under either layout. + val mdtDf = spark.read.format("hudi").load(mdtBasePath) + val mdtCount = mdtDf.count() + assertTrue(mdtCount > 0L, + s"direct Spark scan on MDT path must return at least one row under layout $layoutClass; got $mdtCount") + assertNotNull(mdtDf.schema.fieldNames, "MDT schema must resolve via Spark datasource") + } +}