diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieTTLConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieTTLConfig.java index 1f9a4e40e9830..e671b64451ba2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieTTLConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieTTLConfig.java @@ -84,6 +84,32 @@ public class HoodieTTLConfig extends HoodieConfig { .sinceVersion("1.0.0") .withDocumentation("max partitions to delete in partition ttl management"); + public static final ConfigProperty EVENT_TIME_FORMAT = ConfigProperty + .key(PARTITION_TTL_STRATEGY_PARAM_PREFIX + "event.time.format") + .defaultValue("yyyy-MM-dd") + .markAdvanced() + .sinceVersion("1.3.0") + .withDocumentation("Used by KEEP_BY_EVENT_TIME. Date-time pattern for the event time encoded in the partition path. " + + "A '/' in the pattern means the time spans multiple path segments. Examples: 'yyyy-MM-dd' (default), " + + "'yyyyMMdd', 'yyyy-MM-dd/HH', 'yyyyMMdd/HH'."); + + public static final ConfigProperty EVENT_TIME_PARTITION_START_INDEX = ConfigProperty + .key(PARTITION_TTL_STRATEGY_PARAM_PREFIX + "event.time.partition.start.index") + .defaultValue(0) + .markAdvanced() + .sinceVersion("1.3.0") + .withDocumentation("Used by KEEP_BY_EVENT_TIME. 0-based index of the first path segment that carries the event time. " + + "Defaults to 0 for pure time partitions like 'dt=2026-04-24'. Set to a higher value when non-time prefix segments exist, " + + "e.g. 1 for 'region=us/20260424/05'."); + + public static final ConfigProperty EVENT_TIME_DELETE_HIVE_DEFAULT_PARTITION = ConfigProperty + .key(PARTITION_TTL_STRATEGY_PARAM_PREFIX + "event.time.delete.hive.default.partition") + .defaultValue(false) + .markAdvanced() + .sinceVersion("1.3.0") + .withDocumentation("When true, KEEP_BY_EVENT_TIME treats partitions containing __HIVE_DEFAULT_PARTITION__ as expired and removes them. " + + "Defaults to false so such partitions are skipped (with a WARN log) and the user keeps explicit control over their lifecycle."); + public static class Builder { private final HoodieTTLConfig ttlConfig = new HoodieTTLConfig(); @@ -112,6 +138,21 @@ public HoodieTTLConfig.Builder withTTLStrategyType(PartitionTTLStrategyType ttlS return this; } + public HoodieTTLConfig.Builder withEventTimeFormat(String format) { + ttlConfig.setValue(EVENT_TIME_FORMAT, format); + return this; + } + + public HoodieTTLConfig.Builder withEventTimePartitionStartIndex(int startIndex) { + ttlConfig.setValue(EVENT_TIME_PARTITION_START_INDEX, Integer.toString(startIndex)); + return this; + } + + public HoodieTTLConfig.Builder withDeleteHiveDefaultPartition(boolean enable) { + ttlConfig.setValue(EVENT_TIME_DELETE_HIVE_DEFAULT_PARTITION, Boolean.toString(enable)); + return this; + } + public HoodieTTLConfig.Builder fromProperties(Properties props) { this.ttlConfig.getProps().putAll(props); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index b7bafe0f0b21d..432b30ed3052a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -3056,6 +3056,23 @@ public Integer getPartitionTTLMaxPartitionsToDelete() { return getInt(HoodieTTLConfig.MAX_PARTITION_TO_DELETE); } + // The three event-time TTL getters all use *OrDefault flavors so the ConfigProperty defaults + // are honored even on paths that bypass HoodieTTLConfig.Builder (e.g. raw properties injected + // straight into HoodieWriteConfig without the builder's setDefaults pass). + // Note: getBoolean already short-circuits to getBooleanOrDefault when a default exists, so + // shouldDeleteHiveDefaultPartitionForEventTimeTTL keeps the plain call. + public String getPartitionTTLEventTimeFormat() { + return getStringOrDefault(HoodieTTLConfig.EVENT_TIME_FORMAT); + } + + public int getPartitionTTLEventTimePartitionStartIndex() { + return getIntOrDefault(HoodieTTLConfig.EVENT_TIME_PARTITION_START_INDEX); + } + + public boolean shouldDeleteHiveDefaultPartitionForEventTimeTTL() { + return getBoolean(HoodieTTLConfig.EVENT_TIME_DELETE_HIVE_DEFAULT_PARTITION); + } + public boolean isSecondaryIndexEnabled() { return metadataConfig.isSecondaryIndexEnabled(); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/HoodiePartitionTTLStrategyFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/HoodiePartitionTTLStrategyFactory.java index c50e4a4589d4b..7401e7ebdfdf9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/HoodiePartitionTTLStrategyFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/HoodiePartitionTTLStrategyFactory.java @@ -79,6 +79,8 @@ public static String getPartitionTTLStrategyFromType(PartitionTTLStrategyType ty return KeepByTimeStrategy.class.getName(); case KEEP_BY_CREATION_TIME: return KeepByCreationTimeStrategy.class.getName(); + case KEEP_BY_EVENT_TIME: + return KeepByEventTimeStrategy.class.getName(); default: throw new HoodieException("Unsupported PartitionTTLStrategy Type " + type); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByEventTimeStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByEventTimeStrategy.java new file mode 100644 index 0000000000000..0e3c484e02468 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByEventTimeStrategy.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.ttl.strategy; + +import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator; +import org.apache.hudi.common.util.PartitionPathEncodeUtils; +import org.apache.hudi.table.HoodieTable; + +import lombok.extern.slf4j.Slf4j; + +import java.text.ParseException; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; +import java.time.temporal.TemporalAccessor; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Event-time based partition TTL strategy: lifetime is read from the partition path, not from + * commit metadata. Late-arriving writes into an old partition do not extend its lifetime, and + * backfilled historic partitions are still considered old. Compare with {@link KeepByTimeStrategy} + * (last commit time) and {@link KeepByCreationTimeStrategy} (creation commit time). + * + *

Supported path shapes

+ * The first-class, tested shapes are day and hour granularity. Each example shows the partition + * path on the left and the required {@code startIndex} on the right (see + * Locating the time block below); non-time segments may appear before and/or after the + * time block. + *
    + *
  • Day, {@code format=yyyy-MM-dd} + *
      + *
    • time only: {@code 2026-06-27}, {@code dt=2026-06-27} — startIndex {@code 0}
    • + *
    • prefix + time: {@code region=us/2026-06-27}, {@code region=us/dt=2026-06-27} — startIndex {@code 1}
    • + *
    • time + suffix: {@code 2026-06-27/source=app}, {@code dt=2026-06-27/source=app} — startIndex {@code 0}
    • + *
    • prefix + time + suffix: {@code region=us/dt=2026-06-27/source=app} — startIndex {@code 1}
    • + *
    + *
  • + *
  • Day, {@code format=yyyyMMdd} + *
      + *
    • time only: {@code 20260627}, {@code dt=20260627} — startIndex {@code 0}
    • + *
    • prefix + time: {@code region=us/20260627}, {@code region=us/dt=20260627} — startIndex {@code 1}
    • + *
    • time + suffix: {@code 20260627/source=app}, {@code dt=20260627/source=app} — startIndex {@code 0}
    • + *
    • prefix + time + suffix: {@code region=us/dt=20260627/source=app} — startIndex {@code 1}
    • + *
    + *
  • + *
  • Hour, {@code format=yyyy-MM-dd/HH} + *
      + *
    • time only: {@code 2026-06-27/12}, {@code dt=2026-06-27/hh=12} — startIndex {@code 0}
    • + *
    • prefix + time: {@code region=us/2026-06-27/12}, {@code region=us/dt=2026-06-27/hh=12} — startIndex {@code 1}
    • + *
    • time + suffix: {@code 2026-06-27/12/source=app}, {@code dt=2026-06-27/hh=12/source=app} — startIndex {@code 0}
    • + *
    • prefix + time + suffix: {@code region=us/dt=2026-06-27/hh=12/source=app} — startIndex {@code 1}
    • + *
    + *
  • + *
  • Hour, {@code format=yyyyMMdd/HH} + *
      + *
    • time only: {@code 20260627/12}, {@code dt=20260627/hh=12} — startIndex {@code 0}
    • + *
    • prefix + time: {@code region=us/20260627/12}, {@code region=us/dt=20260627/hh=12} — startIndex {@code 1}
    • + *
    • time + suffix: {@code 20260627/12/source=app}, {@code dt=20260627/hh=12/source=app} — startIndex {@code 0}
    • + *
    • prefix + time + suffix: {@code region=us/dt=20260627/hh=12/source=app} — startIndex {@code 1}
    • + *
    + *
  • + *
+ * Hive-style key names are not constrained: {@code dt=}, {@code day=}, {@code event_date=}, + * {@code hh=}, {@code hour=} all work; only the value after {@code =} is parsed. + *

+ * Any {@link java.time.format.DateTimeFormatter} pattern works as long as the resulting + * {@link java.time.temporal.TemporalAccessor} can be resolved to either an {@link java.time.Instant} + * or a {@link java.time.LocalDate} (day-only patterns are anchored at UTC start-of-day). A + * {@code /} in the pattern means the time value spans that many consecutive path segments. + * Patterns missing day-of-month, e.g. month-only {@code yyyy-MM}, cannot be resolved and will + * raise the standard parse-failure error at runtime. + * + *

Locating the time block

+ * The time block must occupy a contiguous segment range of the partition path. Only the + * segments before it count toward {@code startIndex}; segments after are ignored. Interleaved + * layouts such as {@code dt=20260627/source=app/hh=12} are not supported -- the time block must + * be in one piece. + * + *

Time zone

+ * Both the partition's event time and the cutoff derived from {@code instantTime} are interpreted + * in UTC. Set {@code hoodie.table.timeline.timezone=UTC} so the timeline writes instants under the + * same convention; otherwise the cutoff drifts by the JVM's UTC offset -- a boundary effect at + * day granularity, a full-offset shift at hour granularity. + * + *

Configuration

+ * All three knobs come with defaults, so a table whose partition path is purely a date in + * {@code yyyy-MM-dd} form works out of the box. + *
    + *
  • {@link org.apache.hudi.config.HoodieTTLConfig#EVENT_TIME_FORMAT} — date-time pattern of + * the time block in the partition path. Default {@code yyyy-MM-dd}. A {@code /} in the + * pattern means the time block spans that many consecutive segments.
  • + *
  • {@link org.apache.hudi.config.HoodieTTLConfig#EVENT_TIME_PARTITION_START_INDEX} — + * 0-based index of the first segment that carries the time block. Default {@code 0}. + * Raise it when non-time segments come before the time block (see examples above).
  • + *
  • {@link org.apache.hudi.config.HoodieTTLConfig#EVENT_TIME_DELETE_HIVE_DEFAULT_PARTITION} — + * whether to treat partitions containing {@code __HIVE_DEFAULT_PARTITION__} (i.e. data + * whose event-time column was {@code NULL}) as expired. Default {@code false}, i.e. such + * partitions are skipped with a WARN and the user keeps explicit control over them.
  • + *
+ */ +@Slf4j +public class KeepByEventTimeStrategy extends KeepByTimeStrategy { + + private final String eventTimeFormat; + private final int startIndex; + private final boolean deleteHiveDefaultPartition; + private final boolean hiveStylePartitioning; + + public KeepByEventTimeStrategy(HoodieTable hoodieTable, String instantTime) { + super(hoodieTable, instantTime); + // Defaults: format='yyyy-MM-dd', startIndex=0, deleteHiveDefaultPartition=false. The two + // guards below catch users who set the values explicitly to obviously-broken inputs. + this.eventTimeFormat = writeConfig.getPartitionTTLEventTimeFormat(); + if (eventTimeFormat == null || eventTimeFormat.isEmpty()) { + throw new IllegalArgumentException( + "hoodie.partition.ttl.strategy.event.time.format must not be empty."); + } + this.startIndex = writeConfig.getPartitionTTLEventTimePartitionStartIndex(); + if (startIndex < 0) { + throw new IllegalArgumentException( + "hoodie.partition.ttl.strategy.event.time.partition.start.index must be >= 0, got " + startIndex); + } + this.deleteHiveDefaultPartition = writeConfig.shouldDeleteHiveDefaultPartitionForEventTimeTTL(); + // Hive-style partitioning is a table-level property recorded at table creation; trust it + // rather than guessing per-segment from a stray '=' character in a value. + this.hiveStylePartitioning = Boolean.parseBoolean( + hoodieTable.getMetaClient().getTableConfig().getHiveStylePartitioningEnable()); + } + + @Override + protected List getExpiredPartitionsForTimeStrategy(List partitionPathsForTTL) { + long cutoffMillis = resolveCutoffMillis(instantTime, ttlInMilis); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern(eventTimeFormat).withZone(ZoneOffset.UTC); + int segCount = segmentCount(eventTimeFormat); + return partitionPathsForTTL.stream().parallel() + .filter(path -> isPartitionExpiredByEventTime( + path, formatter, startIndex, segCount, cutoffMillis, deleteHiveDefaultPartition, hiveStylePartitioning)) + .collect(Collectors.toList()); + } + + /** + * Number of '/'-separated path segments the configured format occupies. + * Example: {@code yyyy-MM-dd/HH} -> 2. + */ + static int segmentCount(String format) { + return format.split("/").length; + } + + /** + * Resolve the "now" reference timestamp from {@code instantTime}, in UTC. + *

+ * Anchoring on {@code instantTime} keeps the strategy idempotent across retries of the same + * replace commit. We parse it in UTC so the cutoff and the partition's event time (also parsed + * in UTC above) sit on the same axis -- otherwise expiry drifts by the JVM's UTC offset, which + * is negligible at day granularity but a full-offset shift at hour granularity. + */ + static long resolveCutoffMillis(String instantTime, long ttlInMillis) { + try { + return HoodieInstantTimeGenerator.parseDateFromInstantTime(instantTime, ZoneOffset.UTC).getTime() - ttlInMillis; + } catch (ParseException e) { + throw new IllegalStateException("Failed to parse instant time " + instantTime, e); + } + } + + /** + * Decide whether a partition path is expired. + *

+ * The strategy treats any partition that cannot be parsed under the configured format / + * start-index / hive-style as a hard error. Reasoning: this class derives lifetime from the + * path itself, so a partition we cannot parse has no defined lifetime, and silently skipping + * it would leave it in the table forever while the rest of TTL appears to succeed. Switch to + * {@code KEEP_BY_TIME} or {@code KEEP_BY_CREATION_TIME} (which key off commit metadata, not + * the path) if the table contains partitions that don't conform to a single event-time shape. + *

+ * Package-private so unit tests can exercise it directly without spinning up a HoodieTable. + */ + static boolean isPartitionExpiredByEventTime(String partitionPath, + DateTimeFormatter formatter, + int startIndex, + int segCount, + long cutoffMillis, + boolean deleteHiveDefaultPartition, + boolean hiveStylePartitioning) { + String[] segments = partitionPath.split("/"); + if (segments.length < startIndex + segCount) { + throw new IllegalArgumentException(String.format( + "Partition '%s' has %d segment(s) but the configured event time spans %d segment(s) starting at index %d. " + + "Check hoodie.partition.ttl.strategy.event.time.format and event.time.partition.start.index, " + + "or switch to KEEP_BY_TIME / KEEP_BY_CREATION_TIME if not all partitions of this table follow an event-time shape.", + partitionPath, segments.length, segCount, startIndex)); + } + + String[] timeSegs = new String[segCount]; + for (int i = 0; i < segCount; i++) { + String seg = segments[startIndex + i]; + if (hiveStylePartitioning) { + int eq = seg.indexOf('='); + if (eq < 0) { + throw new IllegalArgumentException(String.format( + "Partition '%s' segment '%s' has no hive-style 'field=value' prefix but " + + "hoodie.datasource.write.hive_style_partitioning=true on the table. " + + "Switch to KEEP_BY_TIME / KEEP_BY_CREATION_TIME if such legacy partitions must coexist.", + partitionPath, seg)); + } + timeSegs[i] = seg.substring(eq + 1); + } else { + timeSegs[i] = seg; + } + if (PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH.equals(timeSegs[i])) { + // Hive default partition is explicitly user-controlled (see config above); not a parse error. + if (deleteHiveDefaultPartition) { + log.info("Partition '{}' contains {} and delete switch is on; marking expired", + partitionPath, PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH); + return true; + } + log.warn("Skipping partition '{}': contains {} (set hoodie.partition.ttl.strategy." + + "event.time.delete.hive.default.partition=true to delete)", + partitionPath, PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH); + return false; + } + } + + String timeStr = String.join("/", timeSegs); + Long eventMillis = parseEventMillis(timeStr, formatter); + if (eventMillis == null) { + throw new IllegalArgumentException(String.format( + "Partition '%s': cannot parse '%s' with pattern '%s'. " + + "Fix hoodie.partition.ttl.strategy.event.time.format, or switch to " + + "KEEP_BY_TIME / KEEP_BY_CREATION_TIME if such partitions must remain in the table.", + partitionPath, timeStr, formatter)); + } + return eventMillis < cutoffMillis; + } + + /** + * Parse {@code timeStr} into epoch millis. Tries full date-time first; falls back to a + * date-only parse anchored at UTC start-of-day so day-level patterns (e.g. {@code yyyy-MM-dd}) + * also work. + */ + static Long parseEventMillis(String timeStr, DateTimeFormatter formatter) { + TemporalAccessor parsed; + try { + parsed = formatter.parse(timeStr); + } catch (DateTimeParseException e) { + return null; + } + try { + return java.time.Instant.from(parsed).toEpochMilli(); + } catch (java.time.DateTimeException ignore) { + // Day-level pattern with no time-of-day: anchor at UTC start of day. + try { + return java.time.LocalDate.from(parsed).atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli(); + } catch (java.time.DateTimeException e) { + return null; + } + } + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/PartitionTTLStrategyType.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/PartitionTTLStrategyType.java index 6a7ed12b3c059..45f6fe1af31a5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/PartitionTTLStrategyType.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/PartitionTTLStrategyType.java @@ -36,7 +36,8 @@ */ public enum PartitionTTLStrategyType { KEEP_BY_TIME("org.apache.hudi.table.action.ttl.strategy.KeepByTimeStrategy"), - KEEP_BY_CREATION_TIME("org.apache.hudi.table.action.ttl.strategy.KeepByCreationTimeStrategy"); + KEEP_BY_CREATION_TIME("org.apache.hudi.table.action.ttl.strategy.KeepByCreationTimeStrategy"), + KEEP_BY_EVENT_TIME("org.apache.hudi.table.action.ttl.strategy.KeepByEventTimeStrategy"); @Getter private final String className; diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/ttl/strategy/TestKeepByEventTimeStrategy.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/ttl/strategy/TestKeepByEventTimeStrategy.java new file mode 100644 index 0000000000000..dfd4053c4f17f --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/ttl/strategy/TestKeepByEventTimeStrategy.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.ttl.strategy; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests for {@link KeepByEventTimeStrategy}'s pure parsing logic. Exercises the static helpers + * directly so we don't need to spin up a HoodieTable / write client. + */ +public class TestKeepByEventTimeStrategy { + + @Test + public void segmentCountCountsSlashes() { + assertEquals(1, KeepByEventTimeStrategy.segmentCount("yyyy-MM-dd")); + assertEquals(1, KeepByEventTimeStrategy.segmentCount("yyyyMMdd")); + assertEquals(2, KeepByEventTimeStrategy.segmentCount("yyyy-MM-dd/HH")); + assertEquals(2, KeepByEventTimeStrategy.segmentCount("yyyyMMdd/HH")); + } + + // path, format, startIdx, hiveStyle, expectedExpired + // + // Full matrix mirroring the class-level JavaDoc: 4 formats x 4 position shapes + // (time only / prefix+time / time+suffix / prefix+time+suffix) x hive vs non-hive. + // Cutoff is "2026-04-30 00:00 UTC" so any 2026-04-22 path expires; the two + // "not expired" rows pin down the strict-less-than boundary. + @ParameterizedTest + @CsvSource({ + // -------- Day, format=yyyy-MM-dd -------- + // time only + "'dt=2026-04-22', 'yyyy-MM-dd', 0, true, true", + "'dt=2026-04-30', 'yyyy-MM-dd', 0, true, false", + "'2026-04-22', 'yyyy-MM-dd', 0, false, true", + "'2026-04-30', 'yyyy-MM-dd', 0, false, false", + // prefix + time + "'region=us/dt=2026-04-22', 'yyyy-MM-dd', 1, true, true", + "'eventType=login/dt=2026-04-30', 'yyyy-MM-dd', 1, true, false", + "'region=us/2026-04-22', 'yyyy-MM-dd', 1, false, true", + // time + suffix + "'dt=2026-04-22/source=app', 'yyyy-MM-dd', 0, true, true", + "'2026-04-22/source=app', 'yyyy-MM-dd', 0, false, true", + // prefix + time + suffix + "'region=us/dt=2026-04-22/source=app', 'yyyy-MM-dd', 1, true, true", + "'region=us/2026-04-22/source=app', 'yyyy-MM-dd', 1, false, true", + + // -------- Day, format=yyyyMMdd -------- + // time only + "'dt=20260422', 'yyyyMMdd', 0, true, true", + "'dt=20260430', 'yyyyMMdd', 0, true, false", + "'20260422', 'yyyyMMdd', 0, false, true", + // prefix + time + "'region=us/dt=20260422', 'yyyyMMdd', 1, true, true", + "'region=us/20260422', 'yyyyMMdd', 1, false, true", + // time + suffix + "'dt=20260422/source=app', 'yyyyMMdd', 0, true, true", + "'20260422/source=app', 'yyyyMMdd', 0, false, true", + // prefix + time + suffix + "'region=us/dt=20260422/source=app', 'yyyyMMdd', 1, true, true", + "'region=us/20260422/source=app', 'yyyyMMdd', 1, false, true", + + // -------- Hour, format=yyyy-MM-dd/HH -------- + // time only + "'dt=2026-04-22/hh=06', 'yyyy-MM-dd/HH', 0, true, true", + "'2026-04-22/06', 'yyyy-MM-dd/HH', 0, false, true", + // prefix + time + "'region=us/dt=2026-04-22/hh=06', 'yyyy-MM-dd/HH', 1, true, true", + "'region=us/2026-04-22/06', 'yyyy-MM-dd/HH', 1, false, true", + // time + suffix + "'dt=2026-04-22/hh=06/source=app', 'yyyy-MM-dd/HH', 0, true, true", + "'2026-04-22/06/source=app', 'yyyy-MM-dd/HH', 0, false, true", + // prefix + time + suffix + "'region=us/dt=2026-04-22/hh=06/source=app', 'yyyy-MM-dd/HH', 1, true, true", + "'region=us/2026-04-22/06/source=app', 'yyyy-MM-dd/HH', 1, false, true", + + // -------- Hour, format=yyyyMMdd/HH -------- + // time only + "'dt=20260422/hh=06', 'yyyyMMdd/HH', 0, true, true", + "'20260422/06', 'yyyyMMdd/HH', 0, false, true", + // prefix + time + "'eventType=login/dt=20260422/hh=06', 'yyyyMMdd/HH', 1, true, true", + "'login/20260422/06', 'yyyyMMdd/HH', 1, false, true", + // time + suffix + "'dt=20260422/hh=06/eventType=login', 'yyyyMMdd/HH', 0, true, true", + "'20260422/06/source=app', 'yyyyMMdd/HH', 0, false, true", + // prefix + time + suffix + "'region=us/dt=20260422/hh=06/source=app', 'yyyyMMdd/HH', 1, true, true", + "'region=us/20260422/06/source=app', 'yyyyMMdd/HH', 1, false, true", + }) + public void parsesPartitionAndCompares(String path, + String format, + int startIdx, + boolean hiveStyle, + boolean expectedExpired) { + // Cutoff is "2026-04-30 00:00 UTC" so 2026-04-22 is expired (strictly before) while 2026-04-30 is not. + long cutoff = LocalDate.of(2026, 4, 30).atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli(); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern(format).withZone(ZoneOffset.UTC); + int segCount = KeepByEventTimeStrategy.segmentCount(format); + + boolean expired = KeepByEventTimeStrategy.isPartitionExpiredByEventTime( + path, formatter, startIdx, segCount, cutoff, false, hiveStyle); + + assertEquals(expectedExpired, expired, "path=" + path + " format=" + format); + } + + @Test + public void parseFailureThrows() { + // A partition whose time segment can't be parsed has no defined lifetime under event-time + // semantics, so the strategy fails fast and asks the user to switch to commit-time TTL. + DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneOffset.UTC); + assertThrows(IllegalArgumentException.class, () -> + KeepByEventTimeStrategy.isPartitionExpiredByEventTime( + "dt=not-a-date", f, 0, 1, Long.MAX_VALUE, false, true)); + } + + @Test + public void shorterPathThanFormatThrows() { + // Partition layout is fixed per table — a mismatch is a configuration error, not a per-row skip. + DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd/HH").withZone(ZoneOffset.UTC); + assertThrows(IllegalArgumentException.class, () -> + KeepByEventTimeStrategy.isPartitionExpiredByEventTime( + "dt=2026-04-22", f, 0, 2, Long.MAX_VALUE, false, true)); + } + + @Test + public void startIndexOutOfRangeThrows() { + DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneOffset.UTC); + assertThrows(IllegalArgumentException.class, () -> + KeepByEventTimeStrategy.isPartitionExpiredByEventTime( + "dt=2026-04-22", f, 5, 1, Long.MAX_VALUE, false, true)); + } + + @Test + public void hiveStyleSegmentWithoutEqualsThrows() { + // Table is hive-style but the segment lacks 'field=' prefix -> misconfiguration, fail fast. + DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneOffset.UTC); + assertThrows(IllegalArgumentException.class, () -> + KeepByEventTimeStrategy.isPartitionExpiredByEventTime( + "2026-04-22", f, 0, 1, Long.MAX_VALUE, false, /*hiveStyle*/ true)); + } + + @Test + public void nonHiveStyleSegmentWithEqualsThrows() { + // hiveStyle=false: segment is taken verbatim, so '=' becomes part of the time string and + // fails parsing -> hard error, same as any other unparseable partition. + DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneOffset.UTC); + assertThrows(IllegalArgumentException.class, () -> + KeepByEventTimeStrategy.isPartitionExpiredByEventTime( + "dt=2026-04-22", f, 0, 1, Long.MAX_VALUE, false, /*hiveStyle*/ false)); + } + + @Test + public void hiveDefaultPartitionSkippedWhenSwitchOff() { + DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneOffset.UTC); + boolean expired = KeepByEventTimeStrategy.isPartitionExpiredByEventTime( + "dt=__HIVE_DEFAULT_PARTITION__", f, 0, 1, Long.MAX_VALUE, false, true); + assertFalse(expired); + } + + @Test + public void hiveDefaultPartitionDeletedWhenSwitchOn() { + DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneOffset.UTC); + boolean expired = KeepByEventTimeStrategy.isPartitionExpiredByEventTime( + "dt=__HIVE_DEFAULT_PARTITION__", f, 0, 1, /*cutoff*/ 0, /*deleteHiveDefault*/ true, /*hiveStyle*/ true); + assertTrue(expired); + } + + @Test + public void resolveCutoffMillisInterpretsInstantInUtc() { + // The instant string '20260430120000000' must be read as 2026-04-30T12:00:00Z regardless of the + // JVM default zone. With ttl=0 the cutoff equals that exact instant; this pins down the contract + // that resolveCutoffMillis and the partition formatter both speak UTC. + long expected = LocalDateTime.of(2026, 4, 30, 12, 0).toInstant(ZoneOffset.UTC).toEpochMilli(); + assertEquals(expected, KeepByEventTimeStrategy.resolveCutoffMillis("20260430120000000", 0)); + + // ttl=1d shifts back exactly 24h, again with no dependence on the JVM zone. + long oneDay = TimeUnit.DAYS.toMillis(1); + assertEquals(expected - oneDay, + KeepByEventTimeStrategy.resolveCutoffMillis("20260430120000000", oneDay)); + } + + @Test + public void hourBoundaryRespected() { + // Cutoff is 2026-04-22 12:00 UTC. 11:00 expired, 12:00 not expired. + long cutoff = LocalDateTime.of(2026, 4, 22, 12, 0).toInstant(ZoneOffset.UTC).toEpochMilli(); + DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd/HH").withZone(ZoneOffset.UTC); + + assertTrue(KeepByEventTimeStrategy.isPartitionExpiredByEventTime( + "2026-04-22/11", f, 0, 2, cutoff, false, /*hiveStyle*/ false)); + assertFalse(KeepByEventTimeStrategy.isPartitionExpiredByEventTime( + "2026-04-22/12", f, 0, 2, cutoff, false, /*hiveStyle*/ false)); + } +} diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/ttl/strategy/TestPartitionTTLStrategyType.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/ttl/strategy/TestPartitionTTLStrategyType.java index 276273cb8953e..782ec16af0cc9 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/ttl/strategy/TestPartitionTTLStrategyType.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/ttl/strategy/TestPartitionTTLStrategyType.java @@ -51,6 +51,16 @@ public void resolvesKeepByCreationTimeFromType() { PartitionTTLStrategyType.getPartitionTTLStrategyClassName(config)); } + @Test + public void resolvesKeepByEventTimeFromType() { + HoodieConfig config = new HoodieConfig(); + config.setValue(HoodieTTLConfig.PARTITION_TTL_STRATEGY_TYPE, + PartitionTTLStrategyType.KEEP_BY_EVENT_TIME.name()); + + assertEquals(PartitionTTLStrategyType.KEEP_BY_EVENT_TIME.getClassName(), + PartitionTTLStrategyType.getPartitionTTLStrategyClassName(config)); + } + @Test public void throwsOnUnknownType() { HoodieConfig config = new HoodieConfig(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestPartitionTTLManagement.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestPartitionTTLManagement.java index cd783c10302bf..ea25e4e189be9 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestPartitionTTLManagement.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestPartitionTTLManagement.java @@ -47,6 +47,7 @@ import java.nio.file.Paths; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.stream.Collectors; @@ -147,6 +148,46 @@ public void testKeepByTime() { } } + @Test + public void testKeepByEventTime() { + final HoodieWriteConfig cfg = getConfigBuilder() + .withPath(metaClient.getBasePath()) + .withTTLConfig(HoodieTTLConfig + .newBuilder() + .withTTLDaysRetain(10) + .withTTLStrategyType(PartitionTTLStrategyType.KEEP_BY_EVENT_TIME) + .withEventTimeFormat("yyyy/MM/dd") + .build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().build()) + .build(); + // Use default partition paths (yyyy/MM/dd) plus one in the far future so we exercise both branches. + String futurePartition = "2099/01/01"; + String[] partitions = { + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, // 2016/03/15 -> expired + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, // 2015/03/16 -> expired + futurePartition // 2099/01/01 -> not expired + }; + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEED, partitions, new HashMap<>()); + try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) { + String instant0 = getCommitTimeAtUTC(0); + writeRecordsForPartition(client, dataGen, partitions[0], instant0); + + String instant1 = getCommitTimeAtUTC(1000); + writeRecordsForPartition(client, dataGen, partitions[1], instant1); + + String currentInstant = WriteClientTestUtils.createNewInstantTime(); + writeRecordsForPartition(client, dataGen, partitions[2], currentInstant); + + String instantTime = client.startDeletePartitionCommit(metaClient); + HoodieWriteResult result = client.managePartitionTTL(instantTime); + + // Both historic partitions are expired by event time; the future one is preserved. + Assertions.assertEquals(Sets.newHashSet(partitions[0], partitions[1]), + result.getPartitionToReplaceFileIds().keySet()); + Assertions.assertEquals(10, readRecords(partitions).size()); + } + } + @Test public void testInlinePartitionTTL() { final HoodieWriteConfig cfg = getConfigBuilder() diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java index 6224b5a921354..094d9f36f446a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java @@ -84,10 +84,22 @@ public static String createNewInstantTime(boolean shouldLock, TimeGenerator time } public static Date parseDateFromInstantTime(String timestamp) throws ParseException { + return parseDateFromInstantTime(timestamp, ZoneId.systemDefault()); + } + + /** + * Parse an instant time string into a {@link Date}, interpreting the digits in {@code zoneId}. + *

+ * Instant times are bare {@code yyyyMMddHHmmssSSS} digits with no zone information, so the + * caller decides which zone produced them -- typically the table's + * {@code hoodie.table.timeline.timezone}. The no-arg overload preserves the legacy behaviour + * of {@link ZoneId#systemDefault()}. + */ + public static Date parseDateFromInstantTime(String timestamp, ZoneId zoneId) throws ParseException { try { String timestampInMillis = fixInstantTimeCompatibility(timestamp); LocalDateTime dt = LocalDateTime.parse(timestampInMillis, MILLIS_INSTANT_TIME_FORMATTER); - return Date.from(dt.atZone(ZoneId.systemDefault()).toInstant()); + return Date.from(dt.atZone(zoneId).toInstant()); } catch (DateTimeParseException e) { throw new ParseException(e.getMessage(), e.getErrorIndex()); }