From dfc692d06963f64274e460c1d1344e9ec4abc270 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Tue, 23 Jun 2026 13:25:15 -0700 Subject: [PATCH 1/2] feat(hive-sync): batch and parallelize HiveQL partition operations Adds an opt-in pool that splits HiveQL partition DDL (add/update/touch/drop) into batches of `hoodie.datasource.hive_sync.batch_num` and dispatches them in parallel across a pool of single-thread workers. Each worker owns its own Hive `Driver` + `SessionState` (both thread-bound in Hive 2.x), so the fan-out is implemented as a fixed pool of dedicated single-thread executors rather than a shared thread pool. Table-level operations (create/alter table, last commit time, writer version) continue to use the single session `Driver`. Partition-phase SQL lists run through the pool only when `hoodie.datasource.hive_sync.batching.enabled` is set to true; default off, existing behavior unchanged. Hive 2.x's `ALTER PARTITION SET LOCATION` ignores db.table qualifiers and uses the connection's current database, so each worker is primed with the correct USE statement before any partition ALTER is dispatched. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../hudi/hive/HiveSyncConfigHolder.java | 17 + .../hudi/hive/HoodieHiveSyncClient.java | 21 +- .../hudi/hive/ddl/HiveQueryDDLExecutor.java | 69 +++- .../hudi/hive/ddl/QueryBasedDDLExecutor.java | 52 ++- .../apache/hudi/hive/util/HiveDriverPool.java | 340 ++++++++++++++++++ .../apache/hudi/hive/TestHiveSyncTool.java | 102 ++++++ .../hudi/hive/util/TestHiveDriverPool.java | 260 ++++++++++++++ 7 files changed, 843 insertions(+), 18 deletions(-) create mode 100644 hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveDriverPool.java create mode 100644 hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestHiveDriverPool.java diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java index 418676d591627..f2d521601fff7 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java @@ -122,6 +122,23 @@ public class HiveSyncConfigHolder { .defaultValue(1000) .markAdvanced() .withDocumentation("The number of partitions one batch when synchronous partitions to hive."); + public static final ConfigProperty HIVE_SYNC_BATCHING_ENABLED = ConfigProperty + .key("hoodie.datasource.hive_sync.batching.enabled") + .defaultValue(false) + .markAdvanced() + .sinceVersion("1.1.0") + .withDocumentation("When true, partition operations (add/update/touch/drop) in the active sync mode " + + "are split into batches of `hoodie.datasource.hive_sync.batch_num` and dispatched in parallel " + + "to a pool of workers. Table-level operations (create/alter table, last commit time, writer " + + "version) continue to use the single session client/driver. Default off; existing behavior is " + + "unchanged unless explicitly opted in."); + public static final ConfigProperty HIVE_SYNC_BATCHING_THREADS = ConfigProperty + .key("hoodie.datasource.hive_sync.batching.threads") + .defaultValue(4) + .markAdvanced() + .sinceVersion("1.1.0") + .withDocumentation("Pool size (number of worker resources) and worker-thread count used for parallel " + + "partition sync when `hoodie.datasource.hive_sync.batching.enabled` is true."); public static final ConfigProperty HIVE_SYNC_MODE = ConfigProperty .key("hoodie.datasource.hive_sync.mode") .noDefaultValue() diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java index a3d04737b8c5c..a83225ae63f10 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java @@ -37,6 +37,7 @@ import org.apache.hudi.hive.ddl.HiveSyncMode; import org.apache.hudi.hive.ddl.JDBCBasedMetadataOperator; import org.apache.hudi.hive.ddl.JDBCExecutor; +import org.apache.hudi.hive.util.HiveDriverPool; import org.apache.hudi.hive.util.IMetaStoreClientUtil; import org.apache.hudi.hive.util.PartitionFilterGenerator; import org.apache.hudi.sync.common.HoodieSyncClient; @@ -66,6 +67,8 @@ import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getInputFormatClassName; import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getOutputFormatClassName; import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getSerDeClassName; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BATCHING_ENABLED; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BATCHING_THREADS; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_MODE; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_USE_SPARK_CATALOG; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_JDBC; @@ -86,6 +89,10 @@ public class HoodieHiveSyncClient extends HoodieSyncClient { private final Map initialTableByName = new HashMap<>(); DDLExecutor ddlExecutor; private IMetaStoreClient client; + // Present only when HIVE_SYNC_BATCHING_ENABLED and sync mode is HIVEQL (explicit + // or legacy default). Owned by HiveQueryDDLExecutor; this field is kept for + // reference only — close() is delegated through ddlExecutor.close(). + private Option partitionDriverPool = Option.empty(); /** * JDBC-based metadata operator, lazily initialized on first Thrift @@ -124,7 +131,8 @@ public HoodieHiveSyncClient(HiveSyncConfig config, HoodieTableMetaClient metaCli ddlExecutor = new HMSDDLExecutor(config, this.client); break; case HIVEQL: - ddlExecutor = new HiveQueryDDLExecutor(config, this.client); + this.partitionDriverPool = maybeBuildHiveDriverPool(config); + ddlExecutor = new HiveQueryDDLExecutor(config, this.client, this.partitionDriverPool); break; case JDBC: JDBCExecutor jdbcExecutor = new JDBCExecutor(config); @@ -142,7 +150,8 @@ public HoodieHiveSyncClient(HiveSyncConfig config, HoodieTableMetaClient metaCli jdbcMetadataOperator = new JDBCBasedMetadataOperator( jdbcExecutor.getConnection(), databaseName); } else { - ddlExecutor = new HiveQueryDDLExecutor(config, this.client); + this.partitionDriverPool = maybeBuildHiveDriverPool(config); + ddlExecutor = new HiveQueryDDLExecutor(config, this.client, this.partitionDriverPool); } } } catch (Exception e) { @@ -201,6 +210,14 @@ private IMetaStoreClient createMetaStoreClient(HiveSyncConfig config) { } } + private Option maybeBuildHiveDriverPool(HiveSyncConfig config) { + if (!config.getBooleanOrDefault(HIVE_SYNC_BATCHING_ENABLED)) { + return Option.empty(); + } + int size = config.getIntOrDefault(HIVE_SYNC_BATCHING_THREADS); + return Option.of(new HiveDriverPool(config, size)); + } + private Table getInitialTable(String table) { return initialTableByName.computeIfAbsent(table, t -> { try { diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java index 25434d29eb3ff..8f52c099f1c02 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java @@ -19,8 +19,10 @@ package org.apache.hudi.hive.ddl; import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.common.util.Option; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveSyncException; +import org.apache.hudi.hive.util.HiveDriverPool; import org.apache.hudi.hive.util.HivePartitionUtil; import lombok.extern.slf4j.Slf4j; @@ -39,6 +41,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Future; import java.util.stream.Collectors; import static org.apache.hudi.sync.common.util.TableUtils.tableId; @@ -52,10 +55,20 @@ public class HiveQueryDDLExecutor extends QueryBasedDDLExecutor { private final IMetaStoreClient metaStoreClient; private SessionState sessionState; private Driver hiveDriver; + // When present, partition-phase SQL lists fan out across this pool; table-level SQL + // (createTable, schema evolution, single-statement runSQL callers) always uses the + // session `hiveDriver` above. See HiveDriverPool javadoc. + private final Option driverPool; public HiveQueryDDLExecutor(HiveSyncConfig config, IMetaStoreClient metaStoreClient) { + this(config, metaStoreClient, Option.empty()); + } + + public HiveQueryDDLExecutor(HiveSyncConfig config, IMetaStoreClient metaStoreClient, + Option driverPool) { super(config); this.metaStoreClient = metaStoreClient; + this.driverPool = driverPool; try { this.sessionState = new SessionState(config.getHiveConf(), UserGroupInformation.getCurrentUser().getShortUserName()); @@ -82,19 +95,61 @@ public void runSQL(String sql) { updateHiveSQLs(Collections.singletonList(sql)); } + /** + * Partition-phase SQL fan-out. When the driver pool is present, any leading + * {@code USE database} statements are run on every worker (Hive 2.x's + * ALTER PARTITION SET LOCATION ignores db.table qualifiers and uses the + * connection's current database, so each worker needs to USE the right db + * before any partition ALTER). The remaining statements are then dispatched + * round-robin across the pool. Falls through to the sequential path on the + * session Driver when no pool is configured. + */ + @Override + protected void runSQLs(List sqls) { + if (sqls.isEmpty()) { + return; + } + if (!driverPool.isPresent()) { + updateHiveSQLs(sqls); + return; + } + HiveDriverPool pool = driverPool.get(); + int firstNonUse = 0; + while (firstNonUse < sqls.size() && isUseStatement(sqls.get(firstNonUse))) { + firstNonUse++; + } + if (firstNonUse > 0) { + List setupStatements = sqls.subList(0, firstNonUse); + pool.runOnEachWorker(setupStatements); + } + List partitionStatements = sqls.subList(firstNonUse, sqls.size()); + if (partitionStatements.isEmpty()) { + return; + } + List> futures = pool.runAll(partitionStatements); + pool.awaitAll(futures); + } + + // Strict 4-char prefix match on "USE ". Internal callers (constructPartitionAlterStatements) + // always emit the USE statement without leading whitespace; do not call with externally + // supplied SQL that might be padded. + private static boolean isUseStatement(String sql) { + return sql != null && sql.regionMatches(true, 0, "USE ", 0, 4); + } + private List updateHiveSQLs(List sqls) { List responses = new ArrayList<>(); + HoodieTimer timer = HoodieTimer.start(); try { for (String sql : sqls) { if (hiveDriver != null) { - HoodieTimer timer = HoodieTimer.start(); responses.add(hiveDriver.run(sql)); - log.info("Time taken to execute [{}]: {} ms", sql, timer.endTimer()); } } } catch (Exception e) { throw new HoodieHiveSyncException("Failed in executing SQL", e); } + log.info("Executed {} SQL statements sequentially in {} ms", sqls.size(), timer.endTimer()); return responses; } @@ -149,6 +204,16 @@ public void dropPartitionsToTable(String tableName, List partitionsToDro @Override public void close() { + // Close the pool first so the worker threads stop dispatching against their + // Drivers before we tear down anything else. The pool's close() runs + // Driver/SessionState cleanup on each worker's own thread. + driverPool.ifPresent(pool -> { + try { + pool.close(); + } catch (Exception e) { + log.warn("Error closing HiveDriverPool", e); + } + }); if (metaStoreClient != null) { Hive.closeCurrent(); } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java index 7f776f2f7a04d..a68644ae86f05 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java @@ -20,6 +20,7 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.PartitionPathEncodeUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; @@ -75,6 +76,20 @@ public QueryBasedDDLExecutor(HiveSyncConfig config) { */ public abstract void runSQL(String sql); + /** + * Runs a list of SQL statements. The default implementation executes them + * sequentially via {@link #runSQL(String)}. Subclasses that can parallelize + * (e.g. {@link HiveQueryDDLExecutor} with a driver pool) override this hook + * to fan the list out across workers. The contract requires that the list + * has no positional dependencies — callers must fully qualify table names + * with {@code `db`.`tbl`} so any statement can run on any worker. + */ + protected void runSQLs(List sqls) { + for (String sql : sqls) { + runSQL(sql); + } + } + @Override public void createDatabase(String databaseName) { runSQL("create database if not exists " + databaseName); @@ -120,7 +135,7 @@ public void addPartitionsToTable(String tableName, List partitionsToAdd) } log.info("Adding partitions {} to table {}", partitionsToAdd.size(), tableName); List sqls = constructAddPartitions(tableName, partitionsToAdd); - sqls.stream().forEach(sql -> runSQL(sql)); + runSQLs(sqls); } @Override @@ -131,9 +146,7 @@ public void updatePartitionsToTable(String tableName, List changedPartit } log.info("Changing partitions {} on {}", changedPartitions.size(), tableName); List sqls = constructPartitionAlterStatements(tableName, changedPartitions, PartitionAlterType.SET_LOCATION); - for (String sql : sqls) { - runSQL(sql); - } + runSQLs(sqls); } @Override @@ -210,29 +223,40 @@ public void touchPartitionsToTable(String tableName, List touchPartition } log.info("Touching partitions " + touchPartitions.size() + " on " + tableName); List sqls = constructPartitionAlterStatements(tableName, touchPartitions, PartitionAlterType.TOUCH); - for (String sql : sqls) { - runSQL(sql); - } + runSQLs(sqls); } /** * Builds SQL statements to either touch partitions or set their location. - * TOUCH: one ALTER TABLE ... TOUCH PARTITION (p1) PARTITION (p2) ... - * SET_LOCATION: one ALTER TABLE ... PARTITION (p) SET LOCATION '...' per partition. + * + *

The first element of the returned list is always a {@code USE database} + * statement. Hive 2.x's ALTER PARTITION ... SET LOCATION does not respect the + * {@code db.table} qualifier (silently routes to the connection's current + * database), so the {@code USE} is load-bearing. Parallel execution paths must + * run this statement on every worker before fanning out the rest. + * + *

TOUCH: one {@code ALTER TABLE ... TOUCH PARTITION (p1) ...} per batch of + * {@code HIVE_BATCH_SYNC_PARTITION_NUM} partitions. + * + *

SET_LOCATION: one {@code ALTER TABLE ... PARTITION (p) SET LOCATION '...'} + * per partition (Hive SQL does not support multi-partition SET LOCATION in one + * statement). */ private List constructPartitionAlterStatements(String tableName, List partitions, PartitionAlterType alterType) { List result = new ArrayList<>(); - // Hive 2.x doesn't like db.table name for operations, hence we need to change to using the database first String useDatabase = "USE " + HIVE_ESCAPE_CHARACTER + databaseName + HIVE_ESCAPE_CHARACTER; result.add(useDatabase); String alterTablePrefix = "ALTER TABLE " + HIVE_ESCAPE_CHARACTER + tableName + HIVE_ESCAPE_CHARACTER; + int batchSyncPartitionNum = config.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM); switch (alterType) { case TOUCH: - String alterTable = alterTablePrefix + " TOUCH"; - for (String partition : partitions) { - alterTable += " PARTITION (" + getPartitionClause(partition) + ")"; + for (List batch : CollectionUtils.batches(partitions, batchSyncPartitionNum)) { + StringBuilder alterTable = new StringBuilder(alterTablePrefix).append(" TOUCH"); + for (String partition : batch) { + alterTable.append(" PARTITION (").append(getPartitionClause(partition)).append(")"); + } + result.add(alterTable.toString()); } - result.add(alterTable); break; case SET_LOCATION: for (String partition : partitions) { diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveDriverPool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveDriverPool.java new file mode 100644 index 0000000000000..2ea2ebc5e6e22 --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveDriverPool.java @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.util; + +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hive.HiveSyncConfig; +import org.apache.hudi.hive.HoodieHiveSyncException; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME; + +/** + * Pool of Hive {@link Driver} + {@link SessionState} pairs for parallel HiveQL DDL. + * + *

Hive's {@code SessionState.start(state)} binds state to the calling thread's + * thread-local, and {@code Driver} reads from that thread-local during {@code run()}. + * A Driver constructed on one thread cannot be safely used from another. This pool + * solves that by giving each slot its own dedicated worker thread (a single-thread + * executor) — the Driver and SessionState are built on that thread by a bootstrap + * task, and all subsequent SQL for that slot runs on the same thread. + * + *

Usage contract: use this pool only for partition-row DDL statements that + * are independent of each other and freely shuffleable across workers. Table-level + * statements (createTable, schema evolution, USE database) must continue to run on + * the session {@code Driver} held by {@code HiveQueryDDLExecutor} on the sync driver + * thread. The pool is gated behind {@code hoodie.datasource.hive_sync.batching.enabled} + * and is constructed only for HiveQL sync mode. + */ +public class HiveDriverPool implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(HiveDriverPool.class); + + // Per-worker Driver construction has to be fast in practice (a few hundred ms + // for the SessionState + Driver init). A 60s ceiling per worker leaves plenty of + // headroom for a slow JVM warm-up but bounds the failure mode if the metastore + // is unreachable or Hive hangs during init. + private static final long BOOTSTRAP_TIMEOUT_SECONDS = 60; + + private final List workers; + private final int size; + private volatile boolean closed; + + public HiveDriverPool(HiveSyncConfig config, int size) { + this(config, size, new DefaultDriverFactory(config)); + } + + // Package-private for tests: accepts a DriverFactory so unit tests can inject + // mock Driver instances without standing up a real Hive instance. + HiveDriverPool(HiveSyncConfig config, int size, DriverFactory factory) { + if (size < 1) { + throw new IllegalArgumentException("Pool size must be >= 1, got " + size); + } + this.size = size; + this.workers = new ArrayList<>(size); + String databaseName = config.getStringOrDefault(META_SYNC_DATABASE_NAME); + PoolThreadFactory threadFactory = new PoolThreadFactory(); + List> bootstrapFutures = new ArrayList<>(size); + try { + for (int i = 0; i < size; i++) { + Worker worker = new Worker(threadFactory); + workers.add(worker); + bootstrapFutures.add(worker.executor.submit(() -> { + worker.driver = factory.newDriver(databaseName); + return null; + })); + } + // Block until all bootstraps complete so we surface construction errors + // before any caller hands us SQL. Bounded by BOOTSTRAP_TIMEOUT_SECONDS so a + // hung Hive init doesn't deadlock the sync driver thread. + for (Future f : bootstrapFutures) { + f.get(BOOTSTRAP_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } + } catch (Exception e) { + tearDown(); + throw new HoodieException("Failed to construct HiveDriverPool of size " + size, e); + } + LOG.info("Initialized HiveDriverPool with {} workers", size); + } + + /** + * Runs each given SQL on every worker, in order. Used for setup statements + * (e.g. {@code USE database}) that must establish per-thread session context + * before any partition statement runs. Blocks until all workers have completed + * the setup. Throws on first error. + */ + public void runOnEachWorker(List setupSqls) { + if (closed) { + throw new IllegalStateException("Cannot dispatch to a closed HiveDriverPool"); + } + if (setupSqls.isEmpty()) { + return; + } + List> futures = new ArrayList<>(workers.size()); + for (Worker worker : workers) { + futures.add(worker.executor.submit(() -> { + for (String sql : setupSqls) { + worker.driver.run(sql); + } + return null; + })); + } + awaitAll(futures); + } + + /** + * Dispatches each SQL string to a worker (round-robin) and returns the list of + * futures. The caller is responsible for awaiting and collecting errors. SQL text + * is intentionally not logged per-statement here: batched TOUCH/ADD statements can + * be many kilobytes, and N parallel workers would multiply the log volume. See + * {@link #awaitAll(List)} for the per-call summary log. + */ + public List> runAll(List sqls) { + if (closed) { + throw new IllegalStateException("Cannot dispatch to a closed HiveDriverPool"); + } + List> futures = new ArrayList<>(sqls.size()); + for (int i = 0; i < sqls.size(); i++) { + String sql = sqls.get(i); + Worker worker = workers.get(i % workers.size()); + futures.add(worker.executor.submit(() -> { + worker.driver.run(sql); + return null; + })); + } + return futures; + } + + /** + * Awaits all futures and throws the first exception encountered. On first failure, + * cancels the remaining (not yet started) futures so workers don't keep running + * pointless work after a fatal error. Any errors that finished before cancellation + * are logged at WARN. Callers do not need per-statement results (Hive's Driver.run + * side-effects the metastore), so this method is void. + */ + public void awaitAll(List> futures) { + long start = System.currentTimeMillis(); + Exception firstError = null; + int completed = 0; + int cancelled = 0; + for (int i = 0; i < futures.size(); i++) { + Future f = futures.get(i); + try { + f.get(); + completed++; + } catch (CancellationException ce) { + // We cancelled this future ourselves after a prior error. Don't treat it + // as a new failure; just note it for the summary log. + cancelled++; + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + if (firstError == null) { + firstError = ie; + cancelled += cancelRemaining(futures, i + 1); + } + } catch (ExecutionException ee) { + Exception cause = unwrap(ee); + if (firstError == null) { + firstError = cause; + cancelled += cancelRemaining(futures, i + 1); + } else { + LOG.warn("Additional SQL batch failed (suppressed in favor of first error)", cause); + } + } + } + if (firstError != null) { + throw new HoodieHiveSyncException("Failed in executing SQL", firstError); + } + LOG.info("Completed {} SQL statements ({} cancelled) in {} ms across {} workers", + completed, cancelled, System.currentTimeMillis() - start, size); + } + + private static int cancelRemaining(List> futures, int fromIndex) { + int cancelled = 0; + for (int j = fromIndex; j < futures.size(); j++) { + // mayInterruptIfRunning=false: the worker thread is bound to a Hive Driver + // whose state we don't want to corrupt mid-statement. Cancel only those that + // haven't started yet; in-flight statements run to completion. + if (futures.get(j).cancel(false)) { + cancelled++; + } + } + return cancelled; + } + + private static Exception unwrap(ExecutionException ee) { + Throwable cause = ee.getCause(); + return (cause instanceof Exception) ? (Exception) cause : ee; + } + + public int size() { + return size; + } + + @Override + public void close() { + if (closed) { + return; + } + closed = true; + tearDown(); + } + + private void tearDown() { + // Close each worker's Driver/SessionState on its own thread, then shut the + // executor down. Running close() on the bound thread keeps SessionState's + // thread-local cleanup correct. + for (Worker worker : workers) { + try { + worker.executor.submit(() -> { + if (worker.driver != null) { + try { + worker.driver.close(); + } catch (Exception e) { + LOG.warn("Error closing pooled Driver", e); + } + } + SessionState ss = SessionState.get(); + if (ss != null) { + try { + ss.close(); + } catch (Exception e) { + LOG.warn("Error closing pooled SessionState", e); + } + } + return null; + }).get(30, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.warn("Error during pool worker shutdown", e); + } + worker.executor.shutdown(); + try { + if (!worker.executor.awaitTermination(10, TimeUnit.SECONDS)) { + worker.executor.shutdownNow(); + } + } catch (InterruptedException ie) { + worker.executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + workers.clear(); + } + + /** + * Per-slot state: a single-thread executor and the Driver bound to its thread. + * Driver is volatile because it is written by the bootstrap task and read by + * subsequent dispatch tasks on the same executor. + */ + private static final class Worker { + final ExecutorService executor; + volatile Driver driver; + + Worker(ThreadFactory threadFactory) { + this.executor = Executors.newSingleThreadExecutor(threadFactory); + } + } + + @FunctionalInterface + interface DriverFactory { + Driver newDriver(String databaseName) throws Exception; + } + + /** + * Builds a real Hive {@link Driver} on the calling thread. The SessionState is + * constructed lazily (once, on the first worker thread that builds a Driver) and + * shared across all worker threads — Hive uses ThreadLocal attachment, not + * exclusive ownership, so multiple workers calling + * {@code SessionState.start(sharedState)} all see the same config and scratch dir + * without each spending the cost of building their own SessionState (and risking + * resource-dir creation races during the constructor). + */ + private static final class DefaultDriverFactory implements DriverFactory { + private final HiveConf hiveConf; + private volatile SessionState sharedSessionState; + + DefaultDriverFactory(HiveSyncConfig config) { + this.hiveConf = config.getHiveConf(); + } + + @Override + public synchronized Driver newDriver(String databaseName) throws Exception { + // SessionState is shared across workers; build it once on the first call (with + // currentDatabase already set) and attach it to each worker's thread-local on + // subsequent calls. The database is a pool-wide property and never changes + // across workers, so setting it once at construction time is sufficient. + if (sharedSessionState == null) { + sharedSessionState = new SessionState(hiveConf, + UserGroupInformation.getCurrentUser().getShortUserName()); + sharedSessionState.setCurrentDatabase(databaseName); + } + SessionState.start(sharedSessionState); + return new Driver(hiveConf); + } + } + + private static final class PoolThreadFactory implements ThreadFactory { + private static final AtomicInteger POOL_ID = new AtomicInteger(0); + private final AtomicInteger threadId = new AtomicInteger(0); + private final String namePrefix = "hudi-hive-driver-pool-" + POOL_ID.incrementAndGet() + "-"; + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, namePrefix + threadId.incrementAndGet()); + t.setDaemon(true); + return t; + } + } +} diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 6d971bcc52263..221b52a1259be 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -96,9 +96,12 @@ import static org.apache.hudi.hive.HiveSyncConfig.HIVE_SYNC_FILTER_PUSHDOWN_ENABLED; import static org.apache.hudi.hive.HiveSyncConfig.RECREATE_HIVE_TABLE_ON_ERROR; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_IGNORE_EXCEPTIONS; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_AS_DATA_SOURCE_TABLE; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BATCHING_ENABLED; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BATCHING_THREADS; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_COMMENT; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_MODE; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_TABLE_STRATEGY; @@ -329,6 +332,105 @@ public void testDropUpperCasePartitionWithHMS() throws Exception { "Table partitions should match the number of partitions we wrote"); } + /** + * Exercises HiveQL sync with parallel partition batching enabled. Routes through + * the HiveDriverPool — each worker thread owns a Driver+SessionState pair, and + * the SQL list (qualified with `db`.`tbl`) is fanned out across them. + */ + @Test + public void testHiveQLSyncWithBatchingEnabled() throws Exception { + hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), HiveSyncMode.HIVEQL.name()); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_ENABLED.key(), "true"); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_THREADS.key(), "3"); + hiveSyncProps.setProperty(HIVE_BATCH_SYNC_PARTITION_NUM.key(), "3"); + + int partitionCount = 10; + HiveTestUtil.createCOWTable("100", partitionCount, true); + + reInitHiveSyncClient(); + assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), + "Table should not exist before initial sync"); + reSyncHiveTable(); + assertEquals(partitionCount, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(), + "All partitions should be added under parallel HiveQL batching"); + + // Add more partitions, then sync again to exercise the parallel update path. + HiveTestUtil.addCOWPartition("2050/01/01", true, true, "101"); + HiveTestUtil.addCOWPartition("2050/01/02", true, true, "102"); + HiveTestUtil.addCOWPartition("2050/01/03", true, true, "103"); + HiveTestUtil.addCOWPartition("2050/01/04", true, true, "104"); + reInitHiveSyncClient(); + reSyncHiveTable(); + assertEquals(partitionCount + 4, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(), + "Incremental add via parallel HiveQL batching should sync the new partitions"); + } + + /** + * Exercises the SET_LOCATION path in HiveQL mode with batching on. SET_LOCATION + * emits one ALTER PARTITION ... SET LOCATION statement per partition (Hive SQL + * has no multi-partition SET LOCATION), so this is the fan-out path most likely + * to exercise concurrent ALTER PARTITION calls against the same table. + */ + @Test + public void testHiveQLSetLocationWithBatching() throws Exception { + hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), HiveSyncMode.HIVEQL.name()); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_ENABLED.key(), "true"); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_THREADS.key(), "3"); + hiveSyncProps.setProperty(HIVE_BATCH_SYNC_PARTITION_NUM.key(), "2"); + + int partitionCount = 6; + HiveTestUtil.createCOWTable("100", partitionCount, true); + reInitHiveSyncClient(); + reSyncHiveTable(); + assertEquals(partitionCount, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size()); + + // Drive the SET_LOCATION path by directly calling updatePartitionsToTable with + // existing partition paths. Each partition produces its own ALTER ... SET LOCATION + // statement, fanned out across the 3 workers in the pool. + List existingPartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).stream() + .map(p -> getRelativePartitionPath(new Path(basePath), new Path(p.getStorageLocation()))) + .collect(Collectors.toList()); + hiveClient.updatePartitionsToTable(HiveTestUtil.TABLE_NAME, existingPartitions); + + List after = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); + assertEquals(partitionCount, after.size(), + "Parallel SET_LOCATION must not change the partition set"); + Set relativePaths = after.stream() + .map(p -> getRelativePartitionPath(new Path(basePath), new Path(p.getStorageLocation()))) + .collect(Collectors.toSet()); + assertEquals(partitionCount, relativePaths.size(), + "Each partition should resolve to a unique relative path after parallel SET_LOCATION"); + assertTrue(relativePaths.containsAll(existingPartitions), + "All original partition paths should still be present after parallel SET_LOCATION"); + } + + /** + * Exercises the TOUCH path in HiveQL mode with batching on. Verifies that + * splitting one giant ALTER TABLE TOUCH PARTITION(...)... into multiple smaller + * statements does not break partition visibility downstream. + */ + @Test + public void testHiveQLTouchPartitionsWithBatching() throws Exception { + hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), HiveSyncMode.HIVEQL.name()); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_ENABLED.key(), "true"); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_THREADS.key(), "2"); + hiveSyncProps.setProperty(HIVE_BATCH_SYNC_PARTITION_NUM.key(), "2"); + hiveSyncProps.setProperty(META_SYNC_TOUCH_PARTITIONS_ENABLED.key(), "true"); + + int partitionCount = 6; + HiveTestUtil.createCOWTable("100", partitionCount, true); + reInitHiveSyncClient(); + reSyncHiveTable(); + assertEquals(partitionCount, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size()); + + // Touch existing partitions (no new data) — should hit the batched TOUCH path + // without changing the partition count. + reInitHiveSyncClient(); + reSyncHiveTable(); + assertEquals(partitionCount, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(), + "TOUCH batching must not change the partition set"); + } + @ParameterizedTest @MethodSource({"syncModeAndSchemaFromCommitMetadata"}) public void testBasicSync(boolean useSchemaFromCommitMetadata, String syncMode, String enablePushDown) throws Exception { diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestHiveDriverPool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestHiveDriverPool.java new file mode 100644 index 0000000000000..cac89dcf54bf9 --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/TestHiveDriverPool.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.util; + +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hive.HiveSyncConfig; +import org.apache.hudi.hive.HoodieHiveSyncException; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver; +import org.junit.jupiter.api.Test; +import org.mockito.invocation.InvocationOnMock; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Unit tests for {@link HiveDriverPool} that exercise bootstrap, dispatch, error + * propagation, and close semantics without standing up a real Hive instance. + */ +class TestHiveDriverPool { + + private static HiveSyncConfig configWithEmptyHiveConf() { + HiveSyncConfig config = mock(HiveSyncConfig.class); + doAnswer(inv -> new HiveConf()).when(config).getHiveConf(); + doAnswer(inv -> "default").when(config).getStringOrDefault( + org.mockito.ArgumentMatchers.any()); + return config; + } + + @Test + void bootstrapBuildsOneDriverPerSlot() throws Exception { + HiveSyncConfig config = configWithEmptyHiveConf(); + AtomicInteger built = new AtomicInteger(); + HiveDriverPool.DriverFactory factory = (db) -> { + built.incrementAndGet(); + return mock(Driver.class); + }; + try (HiveDriverPool pool = new HiveDriverPool(config, 3, factory)) { + assertEquals(3, pool.size()); + assertEquals(3, built.get(), "One Driver per slot should be constructed eagerly"); + } + } + + @Test + void bootstrapFailurePropagatesAndTearsDown() { + HiveSyncConfig config = configWithEmptyHiveConf(); + AtomicInteger calls = new AtomicInteger(); + HiveDriverPool.DriverFactory factory = (db) -> { + int n = calls.incrementAndGet(); + if (n == 2) { + throw new RuntimeException("simulated driver build failure"); + } + return mock(Driver.class); + }; + HoodieException ex = assertThrows(HoodieException.class, + () -> new HiveDriverPool(config, 3, factory)); + assertTrue(ex.getMessage().contains("Failed to construct HiveDriverPool")); + } + + @Test + void runAllDispatchesEachSqlAcrossWorkers() throws Exception { + HiveSyncConfig config = configWithEmptyHiveConf(); + // Each worker counts how many SQLs it received and remembers the thread. + ConcurrentHashMap> seenThreadsByDriver = new ConcurrentHashMap<>(); + HiveDriverPool.DriverFactory factory = (db) -> { + Driver d = mock(Driver.class); + seenThreadsByDriver.put(d, ConcurrentHashMap.newKeySet()); + doAnswer((InvocationOnMock inv) -> { + seenThreadsByDriver.get(d).add(Thread.currentThread().getName()); + return null; + }).when(d).run(anyString()); + return d; + }; + try (HiveDriverPool pool = new HiveDriverPool(config, 2, factory)) { + List sqls = Arrays.asList("SELECT 1", "SELECT 2", "SELECT 3", "SELECT 4"); + List> futures = pool.runAll(sqls); + pool.awaitAll(futures); + assertEquals(2, seenThreadsByDriver.size(), "Expected exactly 2 worker Drivers"); + int totalCalls = seenThreadsByDriver.values().stream().mapToInt(Set::size).sum(); + assertTrue(totalCalls >= 1, "At least one worker should have logged a thread"); + // Each Driver should have been invoked exactly twice (round-robin with 4 sqls, 2 workers). + for (Driver d : seenThreadsByDriver.keySet()) { + verify(d, times(2)).run(anyString()); + } + } + } + + @Test + void awaitAllThrowsFirstError() throws Exception { + HiveSyncConfig config = configWithEmptyHiveConf(); + HiveDriverPool.DriverFactory factory = (db) -> { + Driver d = mock(Driver.class); + doAnswer(inv -> { + String sql = inv.getArgument(0); + if (sql.equals("FAIL")) { + throw new RuntimeException("boom: " + sql); + } + return null; + }).when(d).run(anyString()); + return d; + }; + try (HiveDriverPool pool = new HiveDriverPool(config, 2, factory)) { + List> futures = pool.runAll(Arrays.asList("OK", "FAIL", "OK")); + HoodieHiveSyncException ex = assertThrows(HoodieHiveSyncException.class, + () -> pool.awaitAll(futures)); + assertTrue(ex.getCause() != null && ex.getCause().getMessage().contains("boom")); + } + } + + @Test + void concurrentDispatchBoundedByPoolSize() throws Exception { + HiveSyncConfig config = configWithEmptyHiveConf(); + AtomicInteger inFlight = new AtomicInteger(); + AtomicInteger maxInFlight = new AtomicInteger(); + CountDownLatch hold = new CountDownLatch(1); + HiveDriverPool.DriverFactory factory = (db) -> { + Driver d = mock(Driver.class); + doAnswer(inv -> { + int now = inFlight.incrementAndGet(); + maxInFlight.updateAndGet(prev -> Math.max(prev, now)); + hold.await(2, TimeUnit.SECONDS); + inFlight.decrementAndGet(); + return null; + }).when(d).run(anyString()); + return d; + }; + try (HiveDriverPool pool = new HiveDriverPool(config, 2, factory)) { + // 5 SQLs against pool of size 2 → max in-flight should be 2. + List> futures = pool.runAll(Arrays.asList("a", "b", "c", "d", "e")); + // Release after a short wait so all SQLs progress. + Thread.sleep(150); + hold.countDown(); + pool.awaitAll(futures); + assertTrue(maxInFlight.get() <= 2, + "Max concurrent dispatches must not exceed pool size, observed " + maxInFlight.get()); + assertTrue(maxInFlight.get() >= 1, "Sanity: at least one dispatch ran"); + } + } + + @Test + void closeIsIdempotentAndPreventsFurtherDispatch() throws Exception { + HiveSyncConfig config = configWithEmptyHiveConf(); + HiveDriverPool.DriverFactory factory = (db) -> mock(Driver.class); + HiveDriverPool pool = new HiveDriverPool(config, 2, factory); + pool.close(); + pool.close(); + assertThrows(IllegalStateException.class, + () -> pool.runAll(Arrays.asList("anything"))); + } + + @Test + void invalidSizeRejected() { + HiveSyncConfig config = configWithEmptyHiveConf(); + HiveDriverPool.DriverFactory factory = (db) -> mock(Driver.class); + assertThrows(IllegalArgumentException.class, + () -> new HiveDriverPool(config, 0, factory)); + } + + /** + * runOnEachWorker must execute the setup SQL on every worker (each on its bound + * thread) before {@code runAll()} fans the partition statements out. Without this, + * Hive 2.x's SET LOCATION would silently route to the wrong database on the workers + * that never saw the leading USE statement. + */ + @Test + void runOnEachWorkerRunsSetupOnEveryWorker() throws Exception { + HiveSyncConfig config = configWithEmptyHiveConf(); + ConcurrentHashMap> sqlsByDriver = new ConcurrentHashMap<>(); + HiveDriverPool.DriverFactory factory = (db) -> { + Driver d = mock(Driver.class); + sqlsByDriver.put(d, java.util.Collections.synchronizedList(new java.util.ArrayList<>())); + doAnswer((InvocationOnMock inv) -> { + sqlsByDriver.get(d).add(inv.getArgument(0)); + return null; + }).when(d).run(anyString()); + return d; + }; + try (HiveDriverPool pool = new HiveDriverPool(config, 3, factory)) { + pool.runOnEachWorker(Arrays.asList("USE `db1`")); + List> futures = pool.runAll(Arrays.asList("ALTER 1", "ALTER 2", "ALTER 3")); + pool.awaitAll(futures); + + assertEquals(3, sqlsByDriver.size(), "Expected one Driver per worker"); + for (Map.Entry> e : sqlsByDriver.entrySet()) { + List seen = e.getValue(); + assertTrue(!seen.isEmpty() && seen.get(0).equals("USE `db1`"), + "Each worker must see USE first; saw " + seen); + } + } + } + + /** + * On the first failure, awaitAll must throw the original cause and cancel any + * futures that have not started yet. Futures already in-flight are not interrupted + * (per the cancel-with-mayInterruptIfRunning=false contract). + */ + @Test + void awaitAllCancelsPendingFuturesOnFirstError() throws Exception { + HiveSyncConfig config = configWithEmptyHiveConf(); + // Single-worker pool so SQLs run strictly sequentially → the 2nd SQL is + // pending when the 1st errors, and must be cancelled. + CountDownLatch fired = new CountDownLatch(1); + HiveDriverPool.DriverFactory factory = (db) -> { + Driver d = mock(Driver.class); + doAnswer(inv -> { + String sql = inv.getArgument(0); + if (sql.equals("FAIL")) { + fired.countDown(); + throw new RuntimeException("boom"); + } + return null; + }).when(d).run(anyString()); + return d; + }; + try (HiveDriverPool pool = new HiveDriverPool(config, 1, factory)) { + List> futures = pool.runAll(Arrays.asList("FAIL", "PENDING_A", "PENDING_B")); + HoodieHiveSyncException ex = assertThrows(HoodieHiveSyncException.class, + () -> pool.awaitAll(futures)); + assertTrue(ex.getCause() != null && ex.getCause().getMessage().contains("boom")); + // The first future failed, so it's done (not cancelled). The remaining two + // were pending behind it on the single worker and should now be cancelled. + assertTrue(fired.await(1, TimeUnit.SECONDS), "Failing SQL must have run"); + assertTrue(futures.get(1).isCancelled(), "Pending future after error must be cancelled"); + assertTrue(futures.get(2).isCancelled(), "Pending future after error must be cancelled"); + } + } +} From 5e3c1fdec6c9dd95e339a2027ca5be20002dea8c Mon Sep 17 00:00:00 2001 From: sivabalan Date: Tue, 23 Jun 2026 13:54:10 -0700 Subject: [PATCH 2/2] feat(hive-sync): parallelize DROP partitions in HiveQL sync mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to the HiveQL partition batching introduced in the prior commit (PR #18984). HiveQueryDDLExecutor.dropPartitionsToTable goes through IMetaStoreClient.dropPartition (Thrift), not the Hive Driver — so it can't reuse HiveDriverPool. This change wires an IMetaStoreClientPool into HiveQueryDDLExecutor and uses it to fan drop batches across the pool's worker threads. Behavior: - batching.enabled=false (default): unchanged. dropPartitionsToTable iterates the partition list sequentially on the session metastore client, exactly as before. - batching.enabled=true: partitions are split into batches of HIVE_BATCH_SYNC_PARTITION_NUM, batches fan out across the pool's workers (one IMetaStoreClient per worker), and first-error semantics match the existing HMS-mode implementation (first failure thrown, subsequent failures logged at WARN). HoodieHiveSyncClient now builds partitionClientPool for the HIVEQL mode branches (explicit `mode=hiveql` and the legacy default-mode branch). The pool is closed in HoodieHiveSyncClient.close() before Hive.closeCurrent() so the RetryingMetaStoreClient instances release their Thrift sockets without racing the ThreadLocal Hive cleanup. IMetaStoreClientPool is brought in as part of this PR. An analogous parallelization for HMS sync mode (separately tracked) would introduce the same class; whichever lands first owns the file going forward. Tests: - New end-to-end test: testHiveQLDropPartitionsWithBatching — creates 8 partitions, drops 4 through the parallel pool path with threads=3 and batch_num=2 (so multiple drop batches race), asserts the remaining partition set matches. Configs: no new configs. Reuses hoodie.datasource.hive_sync.batching.* from #18984. Related: #18331 Co-Authored-By: Claude Opus 4.7 (1M context) --- .../hudi/hive/HoodieHiveSyncClient.java | 41 +++- .../hudi/hive/ddl/HiveQueryDDLExecutor.java | 79 ++++++- .../hudi/hive/util/IMetaStoreClientPool.java | 207 ++++++++++++++++++ .../apache/hudi/hive/TestHiveSyncTool.java | 40 ++++ 4 files changed, 354 insertions(+), 13 deletions(-) create mode 100644 hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/IMetaStoreClientPool.java diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java index a83225ae63f10..021d8f1f91992 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java @@ -38,6 +38,7 @@ import org.apache.hudi.hive.ddl.JDBCBasedMetadataOperator; import org.apache.hudi.hive.ddl.JDBCExecutor; import org.apache.hudi.hive.util.HiveDriverPool; +import org.apache.hudi.hive.util.IMetaStoreClientPool; import org.apache.hudi.hive.util.IMetaStoreClientUtil; import org.apache.hudi.hive.util.PartitionFilterGenerator; import org.apache.hudi.sync.common.HoodieSyncClient; @@ -89,6 +90,11 @@ public class HoodieHiveSyncClient extends HoodieSyncClient { private final Map initialTableByName = new HashMap<>(); DDLExecutor ddlExecutor; private IMetaStoreClient client; + // Non-null only when HIVE_SYNC_BATCHING_ENABLED and sync mode is HIVEQL. Owned by + // this class; closed in close() before Hive.closeCurrent(). HiveQueryDDLExecutor + // uses it only for DROP (Hive Thrift, not Hive Driver) — see IMetaStoreClientPool + // javadoc. + private IMetaStoreClientPool partitionClientPool; // Present only when HIVE_SYNC_BATCHING_ENABLED and sync mode is HIVEQL (explicit // or legacy default). Owned by HiveQueryDDLExecutor; this field is kept for // reference only — close() is delegated through ddlExecutor.close(). @@ -132,7 +138,9 @@ public HoodieHiveSyncClient(HiveSyncConfig config, HoodieTableMetaClient metaCli break; case HIVEQL: this.partitionDriverPool = maybeBuildHiveDriverPool(config); - ddlExecutor = new HiveQueryDDLExecutor(config, this.client, this.partitionDriverPool); + this.partitionClientPool = maybeBuildPartitionClientPool(config); + ddlExecutor = new HiveQueryDDLExecutor(config, this.client, this.partitionDriverPool, + Option.ofNullable(this.partitionClientPool)); break; case JDBC: JDBCExecutor jdbcExecutor = new JDBCExecutor(config); @@ -151,7 +159,9 @@ public HoodieHiveSyncClient(HiveSyncConfig config, HoodieTableMetaClient metaCli jdbcExecutor.getConnection(), databaseName); } else { this.partitionDriverPool = maybeBuildHiveDriverPool(config); - ddlExecutor = new HiveQueryDDLExecutor(config, this.client, this.partitionDriverPool); + this.partitionClientPool = maybeBuildPartitionClientPool(config); + ddlExecutor = new HiveQueryDDLExecutor(config, this.client, this.partitionDriverPool, + Option.ofNullable(this.partitionClientPool)); } } } catch (Exception e) { @@ -210,6 +220,22 @@ private IMetaStoreClient createMetaStoreClient(HiveSyncConfig config) { } } + private IMetaStoreClientPool maybeBuildPartitionClientPool(HiveSyncConfig config) { + if (!config.getBooleanOrDefault(HIVE_SYNC_BATCHING_ENABLED)) { + return null; + } + if (config.getBooleanOrDefault(HIVE_SYNC_USE_SPARK_CATALOG)) { + // The Spark catalog client is constructed via reflection against a Spark-side + // class and isn't compatible with the direct RetryingMetaStoreClient pool path. + // Fall back to single-client sequential behavior rather than failing the sync. + log.warn("hive_sync.batching.enabled=true is not supported with use_spark_catalog=true; " + + "falling back to sequential partition sync."); + return null; + } + int size = config.getIntOrDefault(HIVE_SYNC_BATCHING_THREADS); + return new IMetaStoreClientPool(config, size); + } + private Option maybeBuildHiveDriverPool(HiveSyncConfig config) { if (!config.getBooleanOrDefault(HIVE_SYNC_BATCHING_ENABLED)) { return Option.empty(); @@ -614,6 +640,17 @@ public void deleteLastReplicatedTimeStamp(String tableName) { public void close() { try { ddlExecutor.close(); + // Close the partition client pool before Hive.closeCurrent() so the + // RetryingMetaStoreClient instances held by the pool release their Thrift + // sockets without racing the ThreadLocal Hive cleanup. + if (partitionClientPool != null) { + try { + partitionClientPool.close(); + } catch (Exception e) { + log.warn("Error closing IMetaStoreClient pool", e); + } + partitionClientPool = null; + } if (client != null) { Hive.closeCurrent(); client = null; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java index 8f52c099f1c02..13f6550b1c72f 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java @@ -18,12 +18,14 @@ package org.apache.hudi.hive.ddl; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveSyncException; import org.apache.hudi.hive.util.HiveDriverPool; import org.apache.hudi.hive.util.HivePartitionUtil; +import org.apache.hudi.hive.util.IMetaStoreClientPool; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.hive.metastore.IMetaStoreClient; @@ -44,6 +46,7 @@ import java.util.concurrent.Future; import java.util.stream.Collectors; +import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM; import static org.apache.hudi.sync.common.util.TableUtils.tableId; /** @@ -59,16 +62,22 @@ public class HiveQueryDDLExecutor extends QueryBasedDDLExecutor { // (createTable, schema evolution, single-statement runSQL callers) always uses the // session `hiveDriver` above. See HiveDriverPool javadoc. private final Option driverPool; + // When present, dropPartitionsToTable fans batches across this Thrift client pool. + // Owned by HoodieHiveSyncClient; close() is delegated through there. See + // IMetaStoreClientPool javadoc for the usage contract (partition-row ops only). + private final Option metaStoreClientPool; public HiveQueryDDLExecutor(HiveSyncConfig config, IMetaStoreClient metaStoreClient) { - this(config, metaStoreClient, Option.empty()); + this(config, metaStoreClient, Option.empty(), Option.empty()); } public HiveQueryDDLExecutor(HiveSyncConfig config, IMetaStoreClient metaStoreClient, - Option driverPool) { + Option driverPool, + Option metaStoreClientPool) { super(config); this.metaStoreClient = metaStoreClient; this.driverPool = driverPool; + this.metaStoreClientPool = metaStoreClientPool; try { this.sessionState = new SessionState(config.getHiveConf(), UserGroupInformation.getCurrentUser().getShortUserName()); @@ -187,21 +196,69 @@ public void dropPartitionsToTable(String tableName, List partitionsToDro log.info("Drop partitions {} on {}", partitionsToDrop.size(), tableName); try { - for (String dropPartition : partitionsToDrop) { - if (HivePartitionUtil.partitionExists(metaStoreClient, tableName, dropPartition, partitionValueExtractor, - config)) { - String partitionClause = - HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, config); - metaStoreClient.dropPartition(databaseName, tableName, partitionClause, false); - } - log.info("Drop partition {} on {}", dropPartition, tableName); - } + int batchSyncPartitionNum = config.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM); + List> batches = CollectionUtils.batches(partitionsToDrop, batchSyncPartitionNum); + runDropBatches(tableName, batches); } catch (Exception e) { log.error("{} drop partition failed", tableId(databaseName, tableName), e); throw new HoodieHiveSyncException(tableId(databaseName, tableName) + " drop partition failed", e); } } + /** + * Drops partitions one batch at a time. When {@link #metaStoreClientPool} is present, + * batches fan out across the pool's worker threads (each borrowing an independent + * IMetaStoreClient); otherwise batches are dispatched sequentially against the + * session client. Hive has no batch-drop primitive that matches dropPartition's + * semantics, so each worker still iterates its chunk one partition at a time — the + * win is fanning chunks across independent Thrift clients. + */ + private void runDropBatches(String tableName, List> batches) throws Exception { + if (!metaStoreClientPool.isPresent()) { + for (List batch : batches) { + applyDropBatch(metaStoreClient, tableName, batch); + } + return; + } + IMetaStoreClientPool pool = metaStoreClientPool.get(); + List> futures = new ArrayList<>(batches.size()); + for (List batch : batches) { + futures.add(pool.executor().submit(() -> + pool.run(poolClient -> { + applyDropBatch(poolClient, tableName, batch); + return null; + }) + )); + } + Exception firstError = null; + for (Future f : futures) { + try { + f.get(); + } catch (Exception e) { + if (firstError == null) { + firstError = e; + } else { + log.warn("Additional drop batch failed on {} (suppressed in favor of first error)", tableName, e); + } + } + } + if (firstError != null) { + throw firstError; + } + } + + private void applyDropBatch(IMetaStoreClient poolClient, String tableName, List batch) throws Exception { + for (String dropPartition : batch) { + if (HivePartitionUtil.partitionExists(poolClient, tableName, dropPartition, + partitionValueExtractor, config)) { + String partitionClause = + HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, config); + poolClient.dropPartition(databaseName, tableName, partitionClause, false); + } + log.info("Drop partition {} on {}", dropPartition, tableName); + } + } + @Override public void close() { // Close the pool first so the worker threads stop dispatching against their diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/IMetaStoreClientPool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/IMetaStoreClientPool.java new file mode 100644 index 0000000000000..930fb705cbbda --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/IMetaStoreClientPool.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.util; + +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hive.HiveSyncConfig; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Pool of {@link IMetaStoreClient} instances for parallel partition sync. + * + *

Each pooled client wraps an independent Thrift connection to the Hive Metastore. + * Callers borrow a client via {@link #run(ClientAction)}, which blocks until a client + * is available, executes the action, and returns the client to the pool. A worker + * thread pool of the same size is exposed via {@link #executor()} so callers can fan + * out their batches to match the number of available clients. + * + *

Usage contract: pool clients must be used only for partition-row + * operations — {@code add_partitions}, {@code alter_partitions}, {@code dropPartition}, + * {@code getPartition}. Table-row operations ({@code createTable}, {@code alter_table}, + * {@code getTable} used as the read half of a read-modify-write of table parameters) + * must continue to go through the session client held by + * {@code HoodieHiveSyncClient.client} on the sync driver thread. Mixing the two would + * risk lost updates on table parameters such as the last-commit-time-synced marker. + * + *

The pool is gated behind {@code hoodie.datasource.hive_sync.batching.enabled} and + * is constructed only for sync mode HMS. + */ +public class IMetaStoreClientPool implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(IMetaStoreClientPool.class); + + private final ArrayBlockingQueue available; + private final List all; + private final ExecutorService executor; + private final int size; + private volatile boolean closed; + + public IMetaStoreClientPool(HiveSyncConfig config, int size) { + this(buildClients(config, size), size); + } + + // Package-private for tests: accepts a pre-built list of clients so we can + // exercise borrow/return/close semantics without a live metastore. + IMetaStoreClientPool(List clients, int size) { + if (size < 1) { + throw new IllegalArgumentException("Pool size must be >= 1, got " + size); + } + if (clients.size() != size) { + throw new IllegalArgumentException("Expected " + size + " clients, got " + clients.size()); + } + this.size = size; + this.available = new ArrayBlockingQueue<>(size); + this.all = new ArrayList<>(clients); + this.available.addAll(clients); + this.executor = Executors.newFixedThreadPool(size, new PoolThreadFactory()); + LOG.info("Initialized IMetaStoreClient pool with {} clients", size); + } + + private static List buildClients(HiveSyncConfig config, int size) { + if (size < 1) { + throw new IllegalArgumentException("Pool size must be >= 1, got " + size); + } + HiveConf hiveConf = config.getHiveConf(); + List clients = new ArrayList<>(size); + try { + for (int i = 0; i < size; i++) { + clients.add(newClient(hiveConf)); + } + return clients; + } catch (Exception e) { + // Construction failed mid-way; close any clients we already built before + // surfacing the error so we don't leak Thrift sockets. + for (IMetaStoreClient c : clients) { + try { + c.close(); + } catch (Exception ignore) { + // intentional: best-effort cleanup during failure + } + } + throw new HoodieException("Failed to construct IMetaStoreClient pool of size " + size, e); + } + } + + private static IMetaStoreClient newClient(HiveConf hiveConf) { + try { + // RetryingMetaStoreClient.getProxy returns an independent IMetaStoreClient + // (one Thrift socket per call), bypassing the Hive thread-local cache that + // Hive.get(conf) would use. This is what gives us N truly independent clients. + return RetryingMetaStoreClient.getProxy(hiveConf, true); + } catch (Exception e) { + throw new HoodieException("Failed to create IMetaStoreClient for pool", e); + } + } + + /** + * Borrows a client, runs the action, and returns the client to the pool. + * Blocks if all clients are in use until one becomes available. + */ + public T run(ClientAction action) throws Exception { + if (closed) { + throw new IllegalStateException("Cannot borrow from a closed IMetaStoreClient pool"); + } + IMetaStoreClient client = available.take(); + try { + return action.apply(client); + } finally { + // Always return the client to the pool, even on failure. Thrift clients + // recover transparently from transactional errors at the HMS layer; + // RetryingMetaStoreClient handles transient socket failures internally. + if (!closed) { + available.offer(client); + } + } + } + + /** + * Worker thread pool sized to match the client pool. Use this to fan out + * batches so the number of in-flight Thrift calls cannot exceed the + * number of pooled clients. + */ + public ExecutorService executor() { + return executor; + } + + public int size() { + return size; + } + + @Override + public void close() { + if (closed) { + return; + } + closed = true; + executor.shutdown(); + try { + if (!executor.awaitTermination(30, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ie) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + closeQuietly(); + } + + private void closeQuietly() { + for (IMetaStoreClient client : all) { + try { + client.close(); + } catch (Exception e) { + LOG.warn("Error closing pooled IMetaStoreClient", e); + } + } + available.clear(); + all.clear(); + } + + @FunctionalInterface + public interface ClientAction { + T apply(IMetaStoreClient client) throws Exception; + } + + private static final class PoolThreadFactory implements ThreadFactory { + private static final AtomicInteger POOL_ID = new AtomicInteger(0); + private final AtomicInteger threadId = new AtomicInteger(0); + private final String namePrefix = "hudi-hive-sync-pool-" + POOL_ID.incrementAndGet() + "-"; + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, namePrefix + threadId.incrementAndGet()); + t.setDaemon(true); + return t; + } + } +} diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 221b52a1259be..4dea7f0543ce6 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -365,6 +365,46 @@ public void testHiveQLSyncWithBatchingEnabled() throws Exception { "Incremental add via parallel HiveQL batching should sync the new partitions"); } + /** + * Exercises the DROP path in HiveQL mode with batching on. DROP routes through + * IMetaStoreClient.dropPartition (Thrift, not Hive Driver), so when batching is + * enabled it fans out across IMetaStoreClientPool. Verifies the partition set + * shrinks as expected when batches drop in parallel. + */ + @Test + public void testHiveQLDropPartitionsWithBatching() throws Exception { + hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), HiveSyncMode.HIVEQL.name()); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_ENABLED.key(), "true"); + hiveSyncProps.setProperty(HIVE_SYNC_BATCHING_THREADS.key(), "3"); + // Small batch_num so we get multiple drop batches dispatched in parallel. + hiveSyncProps.setProperty(HIVE_BATCH_SYNC_PARTITION_NUM.key(), "2"); + + int partitionCount = 8; + HiveTestUtil.createCOWTable("100", partitionCount, true); + reInitHiveSyncClient(); + reSyncHiveTable(); + assertEquals(partitionCount, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(), + "All partitions should be added before drop test"); + + // Drop half the partitions through the parallel pool path. + List existing = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).stream() + .map(p -> getRelativePartitionPath(new Path(basePath), new Path(p.getStorageLocation()))) + .collect(Collectors.toList()); + List toDrop = existing.subList(0, partitionCount / 2); + hiveClient.dropPartitions(HiveTestUtil.TABLE_NAME, toDrop); + + List remaining = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); + assertEquals(partitionCount - toDrop.size(), remaining.size(), + "Parallel DROP should remove exactly the requested partitions"); + Set remainingPaths = remaining.stream() + .map(p -> getRelativePartitionPath(new Path(basePath), new Path(p.getStorageLocation()))) + .collect(Collectors.toSet()); + for (String dropped : toDrop) { + assertFalse(remainingPaths.contains(dropped), + "Dropped partition " + dropped + " must not appear in remaining set"); + } + } + /** * Exercises the SET_LOCATION path in HiveQL mode with batching on. SET_LOCATION * emits one ALTER PARTITION ... SET LOCATION statement per partition (Hive SQL