Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,19 @@ private void init(HoodieRecord record) {
Option<FileSlice> fileSliceOpt = populateWriteStatAndFetchFileSlice(record, deltaWriteStat);
averageRecordSize = sizeEstimator.sizeEstimate(record);
try {
// Save hoodie partition meta in the partition path
HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(storage, instantTime,
new StoragePath(config.getBasePath()),
FSUtils.constructAbsolutePath(config.getBasePath(), partitionPath),
hoodieTable.getPartitionMetafileFormat());
partitionMetadata.trySave();
// Save hoodie partition meta in the partition path. For MDT writes under a non-flat layout
// (e.g., sub-directory bucketing), the physical partitionPath here is a layout sub-path
// (e.g. "record_index/0004") and the marker must live at the logical partition root instead.
// The marker at the logical root is written once at MDT initialization in
// HoodieBackedTableMetadataWriter.initializeFileGroups; skipping here keeps partition
// discovery returning logical names rather than per-bucket sub-paths.
if (!isMDTLayoutSubPath(partitionPath)) {
HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(storage, instantTime,
new StoragePath(config.getBasePath()),
FSUtils.constructAbsolutePath(config.getBasePath(), partitionPath),
hoodieTable.getPartitionMetafileFormat());
partitionMetadata.trySave();
}

String instantTime = config.getWriteVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT)
? getInstantTimeForLogFile(record) : deltaWriteStat.getPrevCommit();
Expand All @@ -268,6 +275,37 @@ private void init(HoodieRecord record) {
doInit = false;
}

/**
* Returns true when this append handle is writing to a layout sub-path of an MDT partition
* (e.g. {@code record_index/0004} under sub-directory bucketing). In that case, the
* {@code .hoodie_partition_metadata} marker must NOT be created at the bucket level; it is
* written once at the logical partition root by the MDT initialization path.
*
* <p>Heuristic: an MDT partition path of the form {@code <known-mdt-partition>/<NNNN>} where
* {@code NNNN} is the standard 4-digit bucket name produced by the sub-directory bucketing
* layout. Third-party layouts using a different sub-path naming scheme can ship their own
* append-handle integration; the OSS-shipped layouts use this convention.
*/
private boolean isMDTLayoutSubPath(String physicalPartitionPath) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This couples the engine-agnostic append handle to one layout's directory-naming convention through a string heuristic. Two concerns: (1) brittleness — SubDirBucketedMDTLayout.bucketRelativePath uses %04d, so once the bucket index reaches 10000 the suffix becomes 5 digits, isMDTLayoutSubPath returns false, and a .hoodie_partition_metadata marker gets written inside the bucket dir — which would then surface record_index/10000 as a logical partition and break the central invariant; (2) layering — could the marker-skip decision be driven by the resolved HoodieMetadataTableLayout (it already knows its scheme via getPartitionMarkerPaths) instead of re-deriving "is this a sub-path" heuristically here?

⚠️ AI-generated; verify before applying. React 👍/👎 to flag quality.

if (!hoodieTable.isMetadataTable() || physicalPartitionPath == null) {
return false;
}
int slash = physicalPartitionPath.lastIndexOf('/');
if (slash <= 0 || slash >= physicalPartitionPath.length() - 1) {
return false;
}
String last = physicalPartitionPath.substring(slash + 1);
if (last.length() != 4) {
return false;
}
for (int i = 0; i < 4; i++) {
if (!Character.isDigit(last.charAt(i))) {
return false;
}
}
return true;
}

/**
* Returns the instant time to use in the log file name.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
Expand All @@ -57,6 +58,7 @@
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaCache;
import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieLogFormat;
Expand Down Expand Up @@ -1163,14 +1165,14 @@ private List<DirectoryInfo> listAllPartitionsFromMDT(String initializationTime,
}

/**
* Initialize file groups for a partition. For file listing, we just have one file group.
* <p>
* All FileGroups for a given metadata partition has a fixed prefix as per the {@link MetadataPartitionType#getFileIdPrefix()}.
* Each file group is suffixed with 4 digits with increments of 1 starting with 0000.
* <p>
* Let's say we configure 10 file groups for record level index partition, and prefix as "record-index-bucket-"
* File groups will be named as :
* record-index-bucket-0000, .... -> ..., record-index-bucket-0009
* Initialize file groups for an MDT partition under the configured layout.
*
* <p>FileIds are prefixed by {@link MetadataPartitionType#getFileIdPrefix()} and suffixed with a 4-digit
* file-group index starting at 0000. The on-disk directory for each file group is supplied by the
* configured {@link HoodieMetadataTableLayout}: the flat layout writes everything under the partition
* root; sub-directory bucketing writes them into bucket sub-directories. The persisted layout state
* (class + per-partition fileGroupCount + bucket size) is later used by readers to enumerate the same
* sub-paths without needing a filesystem listing.
*/
private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, MetadataPartitionType metadataPartition, String instantTime,
int fileGroupCount, String relativePartitionPath, Option<String> dataPartitionName) throws IOException {
Expand All @@ -1184,22 +1186,30 @@ private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, Metadata
// during initial commit, then the fileGroup would still be recognized (as a FileSlice with no baseFiles but a
// valid logFile). Since these log files being created have no content, it is safe to add them here before
// the bulkInsert.
final String msg = String.format("Creating %d file groups for partition %s with base fileId %s at instant time %s",
fileGroupCount, relativePartitionPath, metadataPartition.getFileIdPrefix(), instantTime);
HoodieMetadataTableLayout layout = resolveLayoutForMDTInit();
final String msg = String.format("Creating %d file groups for partition %s with base fileId %s at instant time %s (layout=%s)",
fileGroupCount, relativePartitionPath, metadataPartition.getFileIdPrefix(), instantTime, layout.getLayoutId());
LOG.info(msg);
final List<String> fileGroupFileIds = IntStream.range(0, fileGroupCount)
.mapToObj(i -> HoodieTableMetadataUtil.getFileIDForFileGroup(metadataPartition, i, relativePartitionPath, dataPartitionName))

// Pair each file group's fileId with the relative directory it should live in, computed via the layout.
final List<Pair<String, String>> fileGroupIdAndPath = IntStream.range(0, fileGroupCount)
.mapToObj(i -> {
LayoutContext ctx = new LayoutContext(metadataPartition, i, fileGroupCount, dataPartitionName);
return Pair.of(layout.getFileId(ctx), layout.getFileGroupRelativePath(ctx));
})
.collect(Collectors.toList());
ValidationUtils.checkArgument(fileGroupFileIds.size() == fileGroupCount);
ValidationUtils.checkArgument(fileGroupIdAndPath.size() == fileGroupCount);
engineContext.setJobStatus(this.getClass().getSimpleName(), msg);
engineContext.foreach(fileGroupFileIds, fileGroupFileId -> {
engineContext.foreach(fileGroupIdAndPath, p -> {
final String fileGroupFileId = p.getLeft();
final String relativePath = p.getRight();
try {
final Map<HeaderMetadataType, String> blockHeader = Collections.singletonMap(HeaderMetadataType.INSTANT_TIME, instantTime);

final HoodieDeleteBlock block = new HoodieDeleteBlock(Collections.emptyList(), blockHeader);

try (HoodieLogFormat.Writer writer = HoodieLogFormatWriter.builder()
.withParentPath(FSUtils.constructAbsolutePath(metadataWriteConfig.getBasePath(), relativePartitionPath))
.withParentPath(FSUtils.constructAbsolutePath(metadataWriteConfig.getBasePath(), relativePath))
.withLogFileId(fileGroupFileId)
.withInstantTime(instantTime)
.withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION)
Expand All @@ -1212,9 +1222,110 @@ private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, Metadata
writer.appendBlock(block);
}
} catch (InterruptedException e) {
throw new HoodieException(String.format("Failed to created fileGroup %s for partition %s", fileGroupFileId, relativePartitionPath), e);
throw new HoodieException(String.format("Failed to created fileGroup %s for partition %s", fileGroupFileId, relativePath), e);
}
}, fileGroupIdAndPath.size());

// For non-flat layouts, write .hoodie_partition_metadata at the logical partition root now,
// before the HoodieAppendHandle path (which would otherwise create one inside the bucket
// sub-dir). The AppendHandle skips marker creation at layout sub-paths via
// HoodieAppendHandle#isMDTLayoutSubPath, so this is the sole marker write under bucketing.
// For the flat default we leave marker creation to the AppendHandle and write nothing here —
// existing tables keep bit-identical behavior.
if (!FlatMDTLayout.LAYOUT_ID.equals(layout.getLayoutId())) {
for (String markerPath : layout.getPartitionMarkerPaths(relativePartitionPath, fileGroupCount)) {
HoodiePartitionMetadata marker = new HoodiePartitionMetadata(
dataMetaClient.getStorage(),
instantTime,
new StoragePath(metadataWriteConfig.getBasePath()),
FSUtils.constructAbsolutePath(metadataWriteConfig.getBasePath(), markerPath),
Option.empty());
marker.trySave();
}
}, fileGroupFileIds.size());
}

// Persist layout state for this partition (skipped entirely for the flat default so existing
// tables get the identical on-disk and properties layout as before). When a non-flat layout is
// in use, readers consult this property to enumerate physical sub-paths without an FS listing.
if (metadataMetaClient != null && !FlatMDTLayout.LAYOUT_ID.equals(layout.getLayoutId())) {
maybePersistLayoutOnMDTInit(layout);
metadataMetaClient.getTableConfig().addMetadataLayoutPartitionFileGroupCounts(
metadataMetaClient, Collections.singletonMap(relativePartitionPath, fileGroupCount));
}
}

/**
* Resolve the layout class to use for MDT initialization. If the MDT's table config already records a
* layout (set on a prior partition's init within this MDT), use it — the MDT is uniform. Otherwise,
* honor the writer's {@code hoodie.metadata.layout.class} config; absent that, default to flat.
*
* <p>Throws {@link HoodieMetadataException} if the user requested a non-flat layout while the RLI is
* configured in the partitioned mode. The first-patch implementation of
* {@link SubDirBucketedMDTLayout} does not handle partitioned RLI; the user must either disable the
* layout config or switch the RLI to the global (non-partitioned) mode.
*/
private HoodieMetadataTableLayout resolveLayoutForMDTInit() {
HoodieMetadataTableLayout layout = resolveLayoutForMDTInitInternal();
if (!FlatMDTLayout.LAYOUT_ID.equals(layout.getLayoutId())
&& dataWriteConfig.isRecordLevelIndexEnabled()) {
throw new HoodieMetadataException(
"MDT layout " + layout.getClass().getName() + " (id=" + layout.getLayoutId()
+ ") cannot be used together with partitioned RLI. Either disable "
+ "hoodie.metadata.layout.class or switch the RLI to the global (non-partitioned) "
+ "mode. Partitioned-RLI support for sub-directory bucketing will be addressed in a "
+ "follow-up patch / RFC.");
}
return layout;
}

private HoodieMetadataTableLayout resolveLayoutForMDTInitInternal() {
if (metadataMetaClient != null) {
Option<String> existing = metadataMetaClient.getTableConfig().getMetadataLayoutClass();
if (existing.isPresent() && !existing.get().isEmpty()) {
return HoodieMetadataTableLayouts.load(metadataMetaClient.getTableConfig());
}
}
Option<String> requested = dataWriteConfig.getMetadataConfig().getMetadataLayoutClass();
if (!requested.isPresent()) {
return new FlatMDTLayout();
}
String cls = requested.get();
if (FlatMDTLayout.class.getName().equals(cls)) {
return new FlatMDTLayout();
}
if (SubDirBucketedMDTLayout.class.getName().equals(cls)) {
return new SubDirBucketedMDTLayout(dataWriteConfig.getMetadataConfig().getMetadataLayoutBucketSize());
}
// Fall through to the central factory for any user-defined layout class.
if (metadataMetaClient != null) {
// Stash the class string on a transient view of the MDT config so the factory can find it.
metadataMetaClient.getTableConfig().setValue(HoodieTableConfig.METADATA_LAYOUT_CLASS, cls);
metadataMetaClient.getTableConfig().setValue(HoodieTableConfig.METADATA_LAYOUT_BUCKET_SIZE,
String.valueOf(dataWriteConfig.getMetadataConfig().getMetadataLayoutBucketSize()));
}
return HoodieMetadataTableLayouts.load(metadataMetaClient.getTableConfig());
}

/**
* Persist the layout class + bucket size onto the MDT's hoodie.properties on the first init that
* involves a non-flat layout. Idempotent for subsequent partition inits within the same MDT.
*/
private void maybePersistLayoutOnMDTInit(HoodieMetadataTableLayout layout) {
if (FlatMDTLayout.LAYOUT_ID.equals(layout.getLayoutId())) {
// Flat is the implicit default; persist nothing to keep on-disk layout bit-identical for
// tables that did not opt in.
return;
}
if (metadataMetaClient.getTableConfig().getMetadataLayoutClass().isPresent()) {
// Already persisted by an earlier partition's init.
return;
}
int bucketSize = dataWriteConfig.getMetadataConfig().getMetadataLayoutBucketSize();
metadataMetaClient.getTableConfig().setMetadataLayout(
metadataMetaClient,
layout.getClass().getName(),
bucketSize,
Collections.emptyMap());
}

void clearExistingMetadataPartition(String relativePartitionPath) throws IOException {
Expand Down Expand Up @@ -2081,6 +2192,14 @@ private SerializableFunction<HoodieRecord, HoodieRecord> getRecordTagger(String
FileSlice slice = fileSlices.get(mappingFunction.apply(r.getRecordKey(), fileGroupCount));
r.unseal();
r.setCurrentLocation(new HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
// Under layouts that place file groups in sub-directories (e.g. bucketing), the file slice's
// physical partition path may differ from the logical MDT partition name carried on the
// record. Realign the record's partition path with where the file slice actually lives so
// downstream HoodieAppendHandle's partition-path consistency check passes.
String physical = slice.getPartitionPath();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Realigning the record to the physical bucket path makes MDT delta-commit write stats record record_index/0004, which incremental compaction/cleaning can use. But the full-listing fallbacks don't fan out: BaseTableServicePlanActionExecutor.getPartitions() returns logical names via FSUtils.getAllPartitionPaths, then BaseHoodieCompactionPlanGenerator calls fileSystemView.getLatestFileSlicesStateless(<logical>); cleaning's getPartitionPathsForFullCleaning() does the same via getAllPartitionPaths(). Neither sees file groups under bucket sub-paths. Have you verified compaction and cleaning actually process the bucketed RLI file groups? The test here only does 3 commits, so it likely never triggers them — and uncompacted log growth would defeat the scaling goal. @yihua might want to weigh in.

⚠️ AI-generated; verify before applying. React 👍/👎 to flag quality.

if (physical != null && !physical.equals(r.getPartitionPath())) {
r.getKey().setPartitionPath(physical);
}
r.seal();
return r;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,15 +349,27 @@ private Map<PartitionPath, List<FileSlice>> filterFiles(List<PartitionPath> part
// API. Note that for COW table, the merging logic of two slices does not happen as there
// is no compaction, thus there is no performance impact.
HoodieTableFileSystemView finalFileSystemView = fileSystemView;
// For an MDT under a non-flat layout, the FS view indexes file slices by their physical
// partition (bucket sub-paths), but the partition list here uses logical partition names.
// Resolve via HoodieTableMetadataUtil which fans out across the layout's physical sub-paths.
final boolean isMdt = HoodieTableMetadata.isMetadataTable(basePath);
return partitions.stream().collect(
Collectors.toMap(
Function.identity(),
partitionPath ->
queryInstant.map(instant ->
finalFileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionPath.path, queryInstant.get())
)
.orElseGet(() -> finalFileSystemView.getLatestFileSlices(partitionPath.path))
.collect(Collectors.toList())
partitionPath -> {
if (isMdt) {
return queryInstant.isPresent()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 For an MDT time-travel query (queryInstant present), this routes to getPartitionLatestMergedFileSlices, which internally merges against the timeline's last completed (and compaction) instant rather than queryInstant. The previous code passed queryInstant.get() to getLatestMergedFileSlicesBeforeOrOn. Since isMdt is true for any MDT — including the flat default — this silently changes time-travel-on-MDT semantics to "always latest" even for tables that didn't opt into a layout. Is that intended? If time-travel on the MDT path matters, it may be worth threading queryInstant through a layout-aware merged-slice helper.

⚠️ AI-generated; verify before applying. React 👍/👎 to flag quality.

? HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metaClient,
finalFileSystemView, partitionPath.path)
: HoodieTableMetadataUtil.getPartitionLatestFileSlices(metaClient,
Option.of(finalFileSystemView), partitionPath.path);
}
return queryInstant.map(instant ->
finalFileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionPath.path, queryInstant.get())
)
.orElseGet(() -> finalFileSystemView.getLatestFileSlices(partitionPath.path))
.collect(Collectors.toList());
}
));
} finally {
long elapsedMs = timer.endTimer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,32 @@ public long getMaxLogFileSize() {
return getLong(MAX_LOG_FILE_SIZE_BYTES_PROP);
}

public static final ConfigProperty<String> METADATA_LAYOUT_CLASS = ConfigProperty
.key("hoodie.metadata.layout.class")
.noDefaultValue()
.markAdvanced()
.sinceVersion("1.3.0")
.withDocumentation("Fully-qualified class name of the HoodieMetadataTableLayout implementation that organizes "
+ "MDT file groups on disk. When unset, MDT uses the flat layout (file groups directly under each metadata "
+ "partition). Applies only at MDT initialization; an MDT already on disk keeps its existing layout.");

public static final ConfigProperty<Integer> METADATA_LAYOUT_BUCKET_SIZE = ConfigProperty
.key("hoodie.metadata.layout.bucket.size")
.defaultValue(1000)
.markAdvanced()
.sinceVersion("1.3.0")
.withDocumentation("When the layout is SubDirBucketedMDTLayout, the maximum number of file groups per bucket "
+ "sub-directory. Ignored for the flat layout. Default 1000.");

public Option<String> getMetadataLayoutClass() {
String cls = getString(METADATA_LAYOUT_CLASS);
return (cls == null || cls.isEmpty()) ? Option.empty() : Option.of(cls);
}

public int getMetadataLayoutBucketSize() {
return getIntOrDefault(METADATA_LAYOUT_BUCKET_SIZE);
}

private HoodieMetadataConfig() {
super();
}
Expand Down Expand Up @@ -1349,6 +1375,16 @@ public Builder withAutoDeletePartitions(boolean autoDeletePartitions) {
return this;
}

public Builder withMetadataLayoutClass(String layoutClass) {
metadataConfig.setValue(METADATA_LAYOUT_CLASS, layoutClass);
return this;
}

public Builder withMetadataLayoutBucketSize(int bucketSize) {
metadataConfig.setValue(METADATA_LAYOUT_BUCKET_SIZE, String.valueOf(bucketSize));
return this;
}

public HoodieMetadataConfig build() {
metadataConfig.setDefaultValue(ENABLE, getDefaultMetadataEnable(engineType));
metadataConfig.setDefaultValue(ENABLE_METADATA_INDEX_COLUMN_STATS, getDefaultColStatsEnable(engineType));
Expand Down
Loading
Loading