Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,32 @@ public class HoodieTTLConfig extends HoodieConfig {
.sinceVersion("1.0.0")
.withDocumentation("max partitions to delete in partition ttl management");

public static final ConfigProperty<String> EVENT_TIME_FORMAT = ConfigProperty
.key(PARTITION_TTL_STRATEGY_PARAM_PREFIX + "event.time.format")
.defaultValue("yyyy-MM-dd")
.markAdvanced()
.sinceVersion("1.3.0")
.withDocumentation("Used by KEEP_BY_EVENT_TIME. Date-time pattern for the event time encoded in the partition path. "
+ "A '/' in the pattern means the time spans multiple path segments. Examples: 'yyyy-MM-dd' (default), "
+ "'yyyyMMdd', 'yyyy-MM-dd/HH', 'yyyyMMdd/HH'.");

public static final ConfigProperty<Integer> EVENT_TIME_PARTITION_START_INDEX = ConfigProperty
.key(PARTITION_TTL_STRATEGY_PARAM_PREFIX + "event.time.partition.start.index")
.defaultValue(0)
.markAdvanced()
.sinceVersion("1.3.0")
.withDocumentation("Used by KEEP_BY_EVENT_TIME. 0-based index of the first path segment that carries the event time. "
+ "Defaults to 0 for pure time partitions like 'dt=2026-04-24'. Set to a higher value when non-time prefix segments exist, "
+ "e.g. 1 for 'region=us/20260424/05'.");

public static final ConfigProperty<Boolean> EVENT_TIME_DELETE_HIVE_DEFAULT_PARTITION = ConfigProperty
.key(PARTITION_TTL_STRATEGY_PARAM_PREFIX + "event.time.delete.hive.default.partition")
.defaultValue(false)
.markAdvanced()
.sinceVersion("1.3.0")
.withDocumentation("When true, KEEP_BY_EVENT_TIME treats partitions containing __HIVE_DEFAULT_PARTITION__ as expired and removes them. "
+ "Defaults to false so such partitions are skipped (with a WARN log) and the user keeps explicit control over their lifecycle.");

public static class Builder {
private final HoodieTTLConfig ttlConfig = new HoodieTTLConfig();

Expand Down Expand Up @@ -112,6 +138,21 @@ public HoodieTTLConfig.Builder withTTLStrategyType(PartitionTTLStrategyType ttlS
return this;
}

public HoodieTTLConfig.Builder withEventTimeFormat(String format) {
ttlConfig.setValue(EVENT_TIME_FORMAT, format);
return this;
}

public HoodieTTLConfig.Builder withEventTimePartitionStartIndex(int startIndex) {
ttlConfig.setValue(EVENT_TIME_PARTITION_START_INDEX, Integer.toString(startIndex));
return this;
}

public HoodieTTLConfig.Builder withDeleteHiveDefaultPartition(boolean enable) {
ttlConfig.setValue(EVENT_TIME_DELETE_HIVE_DEFAULT_PARTITION, Boolean.toString(enable));
return this;
}

public HoodieTTLConfig.Builder fromProperties(Properties props) {
this.ttlConfig.getProps().putAll(props);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3056,6 +3056,23 @@ public Integer getPartitionTTLMaxPartitionsToDelete() {
return getInt(HoodieTTLConfig.MAX_PARTITION_TO_DELETE);
}

// The three event-time TTL getters all use *OrDefault flavors so the ConfigProperty defaults
// are honored even on paths that bypass HoodieTTLConfig.Builder (e.g. raw properties injected
// straight into HoodieWriteConfig without the builder's setDefaults pass).
// Note: getBoolean already short-circuits to getBooleanOrDefault when a default exists, so
// shouldDeleteHiveDefaultPartitionForEventTimeTTL keeps the plain call.
public String getPartitionTTLEventTimeFormat() {
return getStringOrDefault(HoodieTTLConfig.EVENT_TIME_FORMAT);
}

public int getPartitionTTLEventTimePartitionStartIndex() {
return getIntOrDefault(HoodieTTLConfig.EVENT_TIME_PARTITION_START_INDEX);
}

public boolean shouldDeleteHiveDefaultPartitionForEventTimeTTL() {
return getBoolean(HoodieTTLConfig.EVENT_TIME_DELETE_HIVE_DEFAULT_PARTITION);
}

public boolean isSecondaryIndexEnabled() {
return metadataConfig.isSecondaryIndexEnabled();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ public static String getPartitionTTLStrategyFromType(PartitionTTLStrategyType ty
return KeepByTimeStrategy.class.getName();
case KEEP_BY_CREATION_TIME:
return KeepByCreationTimeStrategy.class.getName();
case KEEP_BY_EVENT_TIME:
return KeepByEventTimeStrategy.class.getName();
default:
throw new HoodieException("Unsupported PartitionTTLStrategy Type " + type);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.table.action.ttl.strategy;

import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.util.PartitionPathEncodeUtils;
import org.apache.hudi.table.HoodieTable;

import lombok.extern.slf4j.Slf4j;

import java.text.ParseException;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.time.temporal.TemporalAccessor;
import java.util.List;
import java.util.stream.Collectors;

/**
* Event-time based partition TTL strategy: lifetime is read from the partition path, not from
* commit metadata. Late-arriving writes into an old partition do not extend its lifetime, and
* backfilled historic partitions are still considered old. Compare with {@link KeepByTimeStrategy}
* (last commit time) and {@link KeepByCreationTimeStrategy} (creation commit time).
*
* <h3>Supported path shapes</h3>
* The first-class, tested shapes are day and hour granularity. Each example shows the partition
* path on the left and the required {@code startIndex} on the right (see
* <i>Locating the time block</i> below); non-time segments may appear before and/or after the
* time block.
* <ul>
* <li>Day, {@code format=yyyy-MM-dd}
* <ul>
* <li>time only: {@code 2026-06-27}, {@code dt=2026-06-27} — startIndex {@code 0}</li>
* <li>prefix + time: {@code region=us/2026-06-27}, {@code region=us/dt=2026-06-27} — startIndex {@code 1}</li>
* <li>time + suffix: {@code 2026-06-27/source=app}, {@code dt=2026-06-27/source=app} — startIndex {@code 0}</li>
* <li>prefix + time + suffix: {@code region=us/dt=2026-06-27/source=app} — startIndex {@code 1}</li>
* </ul>
* </li>
* <li>Day, {@code format=yyyyMMdd}
* <ul>
* <li>time only: {@code 20260627}, {@code dt=20260627} — startIndex {@code 0}</li>
* <li>prefix + time: {@code region=us/20260627}, {@code region=us/dt=20260627} — startIndex {@code 1}</li>
* <li>time + suffix: {@code 20260627/source=app}, {@code dt=20260627/source=app} — startIndex {@code 0}</li>
* <li>prefix + time + suffix: {@code region=us/dt=20260627/source=app} — startIndex {@code 1}</li>
* </ul>
* </li>
* <li>Hour, {@code format=yyyy-MM-dd/HH}
* <ul>
* <li>time only: {@code 2026-06-27/12}, {@code dt=2026-06-27/hh=12} — startIndex {@code 0}</li>
* <li>prefix + time: {@code region=us/2026-06-27/12}, {@code region=us/dt=2026-06-27/hh=12} — startIndex {@code 1}</li>
* <li>time + suffix: {@code 2026-06-27/12/source=app}, {@code dt=2026-06-27/hh=12/source=app} — startIndex {@code 0}</li>
* <li>prefix + time + suffix: {@code region=us/dt=2026-06-27/hh=12/source=app} — startIndex {@code 1}</li>
* </ul>
* </li>
* <li>Hour, {@code format=yyyyMMdd/HH}
* <ul>
* <li>time only: {@code 20260627/12}, {@code dt=20260627/hh=12} — startIndex {@code 0}</li>
* <li>prefix + time: {@code region=us/20260627/12}, {@code region=us/dt=20260627/hh=12} — startIndex {@code 1}</li>
* <li>time + suffix: {@code 20260627/12/source=app}, {@code dt=20260627/hh=12/source=app} — startIndex {@code 0}</li>
* <li>prefix + time + suffix: {@code region=us/dt=20260627/hh=12/source=app} — startIndex {@code 1}</li>
* </ul>
* </li>
* </ul>
* Hive-style key names are not constrained: {@code dt=}, {@code day=}, {@code event_date=},
* {@code hh=}, {@code hour=} all work; only the value after {@code =} is parsed.
* <p>
* Any {@link java.time.format.DateTimeFormatter} pattern works as long as the resulting
* {@link java.time.temporal.TemporalAccessor} can be resolved to either an {@link java.time.Instant}
* or a {@link java.time.LocalDate} (day-only patterns are anchored at UTC start-of-day). A
* {@code /} in the pattern means the time value spans that many consecutive path segments.
* Patterns missing day-of-month, e.g. month-only {@code yyyy-MM}, cannot be resolved and will
* raise the standard parse-failure error at runtime.
*
* <h3>Locating the time block</h3>
* The time block must occupy a <i>contiguous</i> segment range of the partition path. Only the
* segments before it count toward {@code startIndex}; segments after are ignored. Interleaved
* layouts such as {@code dt=20260627/source=app/hh=12} are not supported -- the time block must
* be in one piece.
*
* <h3>Time zone</h3>
* Both the partition's event time and the cutoff derived from {@code instantTime} are interpreted
* in UTC. Set {@code hoodie.table.timeline.timezone=UTC} so the timeline writes instants under the
* same convention; otherwise the cutoff drifts by the JVM's UTC offset -- a boundary effect at
* day granularity, a full-offset shift at hour granularity.
*
* <h3>Configuration</h3>
* All three knobs come with defaults, so a table whose partition path is purely a date in
* {@code yyyy-MM-dd} form works out of the box.
* <ul>
* <li>{@link org.apache.hudi.config.HoodieTTLConfig#EVENT_TIME_FORMAT} — date-time pattern of
* the time block in the partition path. Default {@code yyyy-MM-dd}. A {@code /} in the
* pattern means the time block spans that many consecutive segments.</li>
* <li>{@link org.apache.hudi.config.HoodieTTLConfig#EVENT_TIME_PARTITION_START_INDEX} —
* 0-based index of the first segment that carries the time block. Default {@code 0}.
* Raise it when non-time segments come before the time block (see examples above).</li>
* <li>{@link org.apache.hudi.config.HoodieTTLConfig#EVENT_TIME_DELETE_HIVE_DEFAULT_PARTITION} —
* whether to treat partitions containing {@code __HIVE_DEFAULT_PARTITION__} (i.e. data
* whose event-time column was {@code NULL}) as expired. Default {@code false}, i.e. such
* partitions are skipped with a WARN and the user keeps explicit control over them.</li>
* </ul>
*/
@Slf4j
public class KeepByEventTimeStrategy extends KeepByTimeStrategy {

private final String eventTimeFormat;
private final int startIndex;
private final boolean deleteHiveDefaultPartition;
private final boolean hiveStylePartitioning;

public KeepByEventTimeStrategy(HoodieTable hoodieTable, String instantTime) {
super(hoodieTable, instantTime);
// Defaults: format='yyyy-MM-dd', startIndex=0, deleteHiveDefaultPartition=false. The two
// guards below catch users who set the values explicitly to obviously-broken inputs.
this.eventTimeFormat = writeConfig.getPartitionTTLEventTimeFormat();
if (eventTimeFormat == null || eventTimeFormat.isEmpty()) {
throw new IllegalArgumentException(
"hoodie.partition.ttl.strategy.event.time.format must not be empty.");
}
this.startIndex = writeConfig.getPartitionTTLEventTimePartitionStartIndex();
if (startIndex < 0) {
throw new IllegalArgumentException(
"hoodie.partition.ttl.strategy.event.time.partition.start.index must be >= 0, got " + startIndex);
}
this.deleteHiveDefaultPartition = writeConfig.shouldDeleteHiveDefaultPartitionForEventTimeTTL();
// Hive-style partitioning is a table-level property recorded at table creation; trust it
// rather than guessing per-segment from a stray '=' character in a value.
this.hiveStylePartitioning = Boolean.parseBoolean(
hoodieTable.getMetaClient().getTableConfig().getHiveStylePartitioningEnable());
}

@Override
protected List<String> getExpiredPartitionsForTimeStrategy(List<String> partitionPathsForTTL) {
long cutoffMillis = resolveCutoffMillis(instantTime, ttlInMilis);
Comment thread
wangxianghu marked this conversation as resolved.
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(eventTimeFormat).withZone(ZoneOffset.UTC);
int segCount = segmentCount(eventTimeFormat);
return partitionPathsForTTL.stream().parallel()
.filter(path -> isPartitionExpiredByEventTime(
path, formatter, startIndex, segCount, cutoffMillis, deleteHiveDefaultPartition, hiveStylePartitioning))
.collect(Collectors.toList());
}

/**
* Number of '/'-separated path segments the configured format occupies.
* Example: {@code yyyy-MM-dd/HH} -> 2.
*/
static int segmentCount(String format) {
return format.split("/").length;
}

/**
* Resolve the "now" reference timestamp from {@code instantTime}, in UTC.
* <p>
* Anchoring on {@code instantTime} keeps the strategy idempotent across retries of the same
* replace commit. We parse it in UTC so the cutoff and the partition's event time (also parsed
* in UTC above) sit on the same axis -- otherwise expiry drifts by the JVM's UTC offset, which
* is negligible at day granularity but a full-offset shift at hour granularity.
*/
static long resolveCutoffMillis(String instantTime, long ttlInMillis) {
try {
return HoodieInstantTimeGenerator.parseDateFromInstantTime(instantTime, ZoneOffset.UTC).getTime() - ttlInMillis;
} catch (ParseException e) {
throw new IllegalStateException("Failed to parse instant time " + instantTime, e);
}
}

/**
* Decide whether a partition path is expired.
* <p>
* The strategy treats any partition that cannot be parsed under the configured format /
* start-index / hive-style as a hard error. Reasoning: this class derives lifetime from the
* path itself, so a partition we cannot parse has no defined lifetime, and silently skipping
* it would leave it in the table forever while the rest of TTL appears to succeed. Switch to
* {@code KEEP_BY_TIME} or {@code KEEP_BY_CREATION_TIME} (which key off commit metadata, not
* the path) if the table contains partitions that don't conform to a single event-time shape.
* <p>
* Package-private so unit tests can exercise it directly without spinning up a HoodieTable.
*/
static boolean isPartitionExpiredByEventTime(String partitionPath,
DateTimeFormatter formatter,
int startIndex,
int segCount,
long cutoffMillis,
boolean deleteHiveDefaultPartition,
boolean hiveStylePartitioning) {
String[] segments = partitionPath.split("/");
Comment thread
wangxianghu marked this conversation as resolved.
if (segments.length < startIndex + segCount) {
throw new IllegalArgumentException(String.format(
"Partition '%s' has %d segment(s) but the configured event time spans %d segment(s) starting at index %d. "
+ "Check hoodie.partition.ttl.strategy.event.time.format and event.time.partition.start.index, "
+ "or switch to KEEP_BY_TIME / KEEP_BY_CREATION_TIME if not all partitions of this table follow an event-time shape.",
partitionPath, segments.length, segCount, startIndex));
}

String[] timeSegs = new String[segCount];
for (int i = 0; i < segCount; i++) {
String seg = segments[startIndex + i];
if (hiveStylePartitioning) {
int eq = seg.indexOf('=');
if (eq < 0) {
throw new IllegalArgumentException(String.format(
"Partition '%s' segment '%s' has no hive-style 'field=value' prefix but "
+ "hoodie.datasource.write.hive_style_partitioning=true on the table. "
+ "Switch to KEEP_BY_TIME / KEEP_BY_CREATION_TIME if such legacy partitions must coexist.",
partitionPath, seg));
}
timeSegs[i] = seg.substring(eq + 1);
} else {
timeSegs[i] = seg;
}
if (PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH.equals(timeSegs[i])) {
// Hive default partition is explicitly user-controlled (see config above); not a parse error.
if (deleteHiveDefaultPartition) {
log.info("Partition '{}' contains {} and delete switch is on; marking expired",
partitionPath, PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH);
return true;
}
log.warn("Skipping partition '{}': contains {} (set hoodie.partition.ttl.strategy."
+ "event.time.delete.hive.default.partition=true to delete)",
partitionPath, PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH);
return false;
}
}

String timeStr = String.join("/", timeSegs);
Long eventMillis = parseEventMillis(timeStr, formatter);
if (eventMillis == null) {
throw new IllegalArgumentException(String.format(
"Partition '%s': cannot parse '%s' with pattern '%s'. "
+ "Fix hoodie.partition.ttl.strategy.event.time.format, or switch to "
+ "KEEP_BY_TIME / KEEP_BY_CREATION_TIME if such partitions must remain in the table.",
partitionPath, timeStr, formatter));
}
return eventMillis < cutoffMillis;
}

/**
* Parse {@code timeStr} into epoch millis. Tries full date-time first; falls back to a
* date-only parse anchored at UTC start-of-day so day-level patterns (e.g. {@code yyyy-MM-dd})
* also work.
*/
static Long parseEventMillis(String timeStr, DateTimeFormatter formatter) {
TemporalAccessor parsed;
try {
parsed = formatter.parse(timeStr);
} catch (DateTimeParseException e) {
return null;
}
try {
return java.time.Instant.from(parsed).toEpochMilli();
} catch (java.time.DateTimeException ignore) {
// Day-level pattern with no time-of-day: anchor at UTC start of day.
try {
return java.time.LocalDate.from(parsed).atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli();
} catch (java.time.DateTimeException e) {
return null;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
*/
public enum PartitionTTLStrategyType {
KEEP_BY_TIME("org.apache.hudi.table.action.ttl.strategy.KeepByTimeStrategy"),
KEEP_BY_CREATION_TIME("org.apache.hudi.table.action.ttl.strategy.KeepByCreationTimeStrategy");
KEEP_BY_CREATION_TIME("org.apache.hudi.table.action.ttl.strategy.KeepByCreationTimeStrategy"),
KEEP_BY_EVENT_TIME("org.apache.hudi.table.action.ttl.strategy.KeepByEventTimeStrategy");

@Getter
private final String className;
Expand Down
Loading
Loading