-
Notifications
You must be signed in to change notification settings - Fork 2.5k
feat(hive-sync): parallelize DROP partitions in HiveQL sync mode #19033
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -37,6 +37,8 @@ | |
| import org.apache.hudi.hive.ddl.HiveSyncMode; | ||
| import org.apache.hudi.hive.ddl.JDBCBasedMetadataOperator; | ||
| import org.apache.hudi.hive.ddl.JDBCExecutor; | ||
| import org.apache.hudi.hive.util.HiveDriverPool; | ||
| import org.apache.hudi.hive.util.IMetaStoreClientPool; | ||
| import org.apache.hudi.hive.util.IMetaStoreClientUtil; | ||
| import org.apache.hudi.hive.util.PartitionFilterGenerator; | ||
| import org.apache.hudi.sync.common.HoodieSyncClient; | ||
|
|
@@ -66,6 +68,8 @@ | |
| import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getInputFormatClassName; | ||
| import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getOutputFormatClassName; | ||
| import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getSerDeClassName; | ||
| import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BATCHING_ENABLED; | ||
| import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BATCHING_THREADS; | ||
| import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_MODE; | ||
| import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_USE_SPARK_CATALOG; | ||
| import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_JDBC; | ||
|
|
@@ -86,6 +90,15 @@ public class HoodieHiveSyncClient extends HoodieSyncClient { | |
| private final Map<String, Table> initialTableByName = new HashMap<>(); | ||
| DDLExecutor ddlExecutor; | ||
| private IMetaStoreClient client; | ||
| // Non-null only when HIVE_SYNC_BATCHING_ENABLED and sync mode is HIVEQL. Owned by | ||
| // this class; closed in close() before Hive.closeCurrent(). HiveQueryDDLExecutor | ||
| // uses it only for DROP (Hive Thrift, not Hive Driver) — see IMetaStoreClientPool | ||
| // javadoc. | ||
| private IMetaStoreClientPool partitionClientPool; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤖 nit: |
||
| // Present only when HIVE_SYNC_BATCHING_ENABLED and sync mode is HIVEQL (explicit | ||
| // or legacy default). Owned by HiveQueryDDLExecutor; this field is kept for | ||
| // reference only — close() is delegated through ddlExecutor.close(). | ||
| private Option<HiveDriverPool> partitionDriverPool = Option.empty(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤖 nit: the comment says this field is 'kept for reference only — close() is delegated through ddlExecutor.close()', which means it's never read after being passed into - AI-generated; verify before applying. React 👍/👎 to flag quality. |
||
|
|
||
| /** | ||
| * JDBC-based metadata operator, lazily initialized on first Thrift | ||
|
|
@@ -124,7 +137,10 @@ public HoodieHiveSyncClient(HiveSyncConfig config, HoodieTableMetaClient metaCli | |
| ddlExecutor = new HMSDDLExecutor(config, this.client); | ||
| break; | ||
| case HIVEQL: | ||
| ddlExecutor = new HiveQueryDDLExecutor(config, this.client); | ||
| this.partitionDriverPool = maybeBuildHiveDriverPool(config); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤖 These two pools are built eagerly here, but if |
||
| this.partitionClientPool = maybeBuildPartitionClientPool(config); | ||
| ddlExecutor = new HiveQueryDDLExecutor(config, this.client, this.partitionDriverPool, | ||
| Option.ofNullable(this.partitionClientPool)); | ||
| break; | ||
| case JDBC: | ||
| JDBCExecutor jdbcExecutor = new JDBCExecutor(config); | ||
|
|
@@ -142,7 +158,10 @@ public HoodieHiveSyncClient(HiveSyncConfig config, HoodieTableMetaClient metaCli | |
| jdbcMetadataOperator = new JDBCBasedMetadataOperator( | ||
| jdbcExecutor.getConnection(), databaseName); | ||
| } else { | ||
| ddlExecutor = new HiveQueryDDLExecutor(config, this.client); | ||
| this.partitionDriverPool = maybeBuildHiveDriverPool(config); | ||
| this.partitionClientPool = maybeBuildPartitionClientPool(config); | ||
| ddlExecutor = new HiveQueryDDLExecutor(config, this.client, this.partitionDriverPool, | ||
| Option.ofNullable(this.partitionClientPool)); | ||
| } | ||
| } | ||
| } catch (Exception e) { | ||
|
|
@@ -201,6 +220,30 @@ private IMetaStoreClient createMetaStoreClient(HiveSyncConfig config) { | |
| } | ||
| } | ||
|
|
||
| private IMetaStoreClientPool maybeBuildPartitionClientPool(HiveSyncConfig config) { | ||
| if (!config.getBooleanOrDefault(HIVE_SYNC_BATCHING_ENABLED)) { | ||
| return null; | ||
| } | ||
| if (config.getBooleanOrDefault(HIVE_SYNC_USE_SPARK_CATALOG)) { | ||
| // The Spark catalog client is constructed via reflection against a Spark-side | ||
| // class and isn't compatible with the direct RetryingMetaStoreClient pool path. | ||
| // Fall back to single-client sequential behavior rather than failing the sync. | ||
| log.warn("hive_sync.batching.enabled=true is not supported with use_spark_catalog=true; " | ||
| + "falling back to sequential partition sync."); | ||
| return null; | ||
| } | ||
| int size = config.getIntOrDefault(HIVE_SYNC_BATCHING_THREADS); | ||
| return new IMetaStoreClientPool(config, size); | ||
| } | ||
|
|
||
| private Option<HiveDriverPool> maybeBuildHiveDriverPool(HiveSyncConfig config) { | ||
| if (!config.getBooleanOrDefault(HIVE_SYNC_BATCHING_ENABLED)) { | ||
| return Option.empty(); | ||
| } | ||
| int size = config.getIntOrDefault(HIVE_SYNC_BATCHING_THREADS); | ||
| return Option.of(new HiveDriverPool(config, size)); | ||
| } | ||
|
|
||
| private Table getInitialTable(String table) { | ||
| return initialTableByName.computeIfAbsent(table, t -> { | ||
| try { | ||
|
|
@@ -597,6 +640,17 @@ public void deleteLastReplicatedTimeStamp(String tableName) { | |
| public void close() { | ||
| try { | ||
| ddlExecutor.close(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤖 If - AI-generated; verify before applying. React 👍/👎 to flag quality. |
||
| // Close the partition client pool before Hive.closeCurrent() so the | ||
| // RetryingMetaStoreClient instances held by the pool release their Thrift | ||
| // sockets without racing the ThreadLocal Hive cleanup. | ||
| if (partitionClientPool != null) { | ||
| try { | ||
| partitionClientPool.close(); | ||
| } catch (Exception e) { | ||
| log.warn("Error closing IMetaStoreClient pool", e); | ||
| } | ||
| partitionClientPool = null; | ||
| } | ||
| if (client != null) { | ||
| Hive.closeCurrent(); | ||
| client = null; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,10 +18,14 @@ | |
|
|
||
| package org.apache.hudi.hive.ddl; | ||
|
|
||
| import org.apache.hudi.common.util.CollectionUtils; | ||
| import org.apache.hudi.common.util.HoodieTimer; | ||
| import org.apache.hudi.common.util.Option; | ||
| import org.apache.hudi.hive.HiveSyncConfig; | ||
| import org.apache.hudi.hive.HoodieHiveSyncException; | ||
| import org.apache.hudi.hive.util.HiveDriverPool; | ||
| import org.apache.hudi.hive.util.HivePartitionUtil; | ||
| import org.apache.hudi.hive.util.IMetaStoreClientPool; | ||
|
|
||
| import lombok.extern.slf4j.Slf4j; | ||
| import org.apache.hadoop.hive.metastore.IMetaStoreClient; | ||
|
|
@@ -39,8 +43,10 @@ | |
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.concurrent.Future; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM; | ||
| import static org.apache.hudi.sync.common.util.TableUtils.tableId; | ||
|
|
||
| /** | ||
|
|
@@ -52,10 +58,26 @@ public class HiveQueryDDLExecutor extends QueryBasedDDLExecutor { | |
| private final IMetaStoreClient metaStoreClient; | ||
| private SessionState sessionState; | ||
| private Driver hiveDriver; | ||
| // When present, partition-phase SQL lists fan out across this pool; table-level SQL | ||
| // (createTable, schema evolution, single-statement runSQL callers) always uses the | ||
| // session `hiveDriver` above. See HiveDriverPool javadoc. | ||
| private final Option<HiveDriverPool> driverPool; | ||
| // When present, dropPartitionsToTable fans batches across this Thrift client pool. | ||
| // Owned by HoodieHiveSyncClient; close() is delegated through there. See | ||
| // IMetaStoreClientPool javadoc for the usage contract (partition-row ops only). | ||
| private final Option<IMetaStoreClientPool> metaStoreClientPool; | ||
|
|
||
| public HiveQueryDDLExecutor(HiveSyncConfig config, IMetaStoreClient metaStoreClient) { | ||
| this(config, metaStoreClient, Option.empty(), Option.empty()); | ||
| } | ||
|
|
||
| public HiveQueryDDLExecutor(HiveSyncConfig config, IMetaStoreClient metaStoreClient, | ||
| Option<HiveDriverPool> driverPool, | ||
| Option<IMetaStoreClientPool> metaStoreClientPool) { | ||
| super(config); | ||
| this.metaStoreClient = metaStoreClient; | ||
| this.driverPool = driverPool; | ||
| this.metaStoreClientPool = metaStoreClientPool; | ||
| try { | ||
| this.sessionState = new SessionState(config.getHiveConf(), | ||
| UserGroupInformation.getCurrentUser().getShortUserName()); | ||
|
|
@@ -82,19 +104,61 @@ public void runSQL(String sql) { | |
| updateHiveSQLs(Collections.singletonList(sql)); | ||
| } | ||
|
|
||
| /** | ||
| * Partition-phase SQL fan-out. When the driver pool is present, any leading | ||
| * {@code USE database} statements are run on every worker (Hive 2.x's | ||
| * ALTER PARTITION SET LOCATION ignores db.table qualifiers and uses the | ||
| * connection's current database, so each worker needs to USE the right db | ||
| * before any partition ALTER). The remaining statements are then dispatched | ||
| * round-robin across the pool. Falls through to the sequential path on the | ||
| * session Driver when no pool is configured. | ||
| */ | ||
| @Override | ||
| protected void runSQLs(List<String> sqls) { | ||
| if (sqls.isEmpty()) { | ||
| return; | ||
| } | ||
| if (!driverPool.isPresent()) { | ||
| updateHiveSQLs(sqls); | ||
| return; | ||
| } | ||
| HiveDriverPool pool = driverPool.get(); | ||
| int firstNonUse = 0; | ||
| while (firstNonUse < sqls.size() && isUseStatement(sqls.get(firstNonUse))) { | ||
| firstNonUse++; | ||
| } | ||
| if (firstNonUse > 0) { | ||
| List<String> setupStatements = sqls.subList(0, firstNonUse); | ||
| pool.runOnEachWorker(setupStatements); | ||
| } | ||
| List<String> partitionStatements = sqls.subList(firstNonUse, sqls.size()); | ||
| if (partitionStatements.isEmpty()) { | ||
| return; | ||
| } | ||
| List<Future<?>> futures = pool.runAll(partitionStatements); | ||
| pool.awaitAll(futures); | ||
| } | ||
|
|
||
| // Strict 4-char prefix match on "USE ". Internal callers (constructPartitionAlterStatements) | ||
| // always emit the USE statement without leading whitespace; do not call with externally | ||
| // supplied SQL that might be padded. | ||
| private static boolean isUseStatement(String sql) { | ||
| return sql != null && sql.regionMatches(true, 0, "USE ", 0, 4); | ||
| } | ||
|
|
||
| private List<CommandProcessorResponse> updateHiveSQLs(List<String> sqls) { | ||
| List<CommandProcessorResponse> responses = new ArrayList<>(); | ||
| HoodieTimer timer = HoodieTimer.start(); | ||
| try { | ||
| for (String sql : sqls) { | ||
| if (hiveDriver != null) { | ||
| HoodieTimer timer = HoodieTimer.start(); | ||
| responses.add(hiveDriver.run(sql)); | ||
| log.info("Time taken to execute [{}]: {} ms", sql, timer.endTimer()); | ||
| } | ||
| } | ||
| } catch (Exception e) { | ||
| throw new HoodieHiveSyncException("Failed in executing SQL", e); | ||
| } | ||
| log.info("Executed {} SQL statements sequentially in {} ms", sqls.size(), timer.endTimer()); | ||
| return responses; | ||
| } | ||
|
|
||
|
|
@@ -132,23 +196,81 @@ public void dropPartitionsToTable(String tableName, List<String> partitionsToDro | |
|
|
||
| log.info("Drop partitions {} on {}", partitionsToDrop.size(), tableName); | ||
| try { | ||
| for (String dropPartition : partitionsToDrop) { | ||
| if (HivePartitionUtil.partitionExists(metaStoreClient, tableName, dropPartition, partitionValueExtractor, | ||
| config)) { | ||
| String partitionClause = | ||
| HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, config); | ||
| metaStoreClient.dropPartition(databaseName, tableName, partitionClause, false); | ||
| } | ||
| log.info("Drop partition {} on {}", dropPartition, tableName); | ||
| } | ||
| int batchSyncPartitionNum = config.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM); | ||
| List<List<String>> batches = CollectionUtils.batches(partitionsToDrop, batchSyncPartitionNum); | ||
| runDropBatches(tableName, batches); | ||
| } catch (Exception e) { | ||
| log.error("{} drop partition failed", tableId(databaseName, tableName), e); | ||
| throw new HoodieHiveSyncException(tableId(databaseName, tableName) + " drop partition failed", e); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Drops partitions one batch at a time. When {@link #metaStoreClientPool} is present, | ||
| * batches fan out across the pool's worker threads (each borrowing an independent | ||
| * IMetaStoreClient); otherwise batches are dispatched sequentially against the | ||
| * session client. Hive has no batch-drop primitive that matches dropPartition's | ||
| * semantics, so each worker still iterates its chunk one partition at a time — the | ||
| * win is fanning chunks across independent Thrift clients. | ||
| */ | ||
| private void runDropBatches(String tableName, List<List<String>> batches) throws Exception { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤖 nit: the parallel-path error-collection loop inside - AI-generated; verify before applying. React 👍/👎 to flag quality. |
||
| if (!metaStoreClientPool.isPresent()) { | ||
| for (List<String> batch : batches) { | ||
| applyDropBatch(metaStoreClient, tableName, batch); | ||
| } | ||
| return; | ||
| } | ||
| IMetaStoreClientPool pool = metaStoreClientPool.get(); | ||
| List<Future<Void>> futures = new ArrayList<>(batches.size()); | ||
| for (List<String> batch : batches) { | ||
| futures.add(pool.executor().submit(() -> | ||
| pool.run(poolClient -> { | ||
| applyDropBatch(poolClient, tableName, batch); | ||
| return null; | ||
| }) | ||
| )); | ||
| } | ||
| Exception firstError = null; | ||
| for (Future<Void> f : futures) { | ||
| try { | ||
| f.get(); | ||
| } catch (Exception e) { | ||
| if (firstError == null) { | ||
| firstError = e; | ||
| } else { | ||
| log.warn("Additional drop batch failed on {} (suppressed in favor of first error)", tableName, e); | ||
| } | ||
| } | ||
| } | ||
| if (firstError != null) { | ||
| throw firstError; | ||
| } | ||
| } | ||
|
|
||
| private void applyDropBatch(IMetaStoreClient poolClient, String tableName, List<String> batch) throws Exception { | ||
| for (String dropPartition : batch) { | ||
| if (HivePartitionUtil.partitionExists(poolClient, tableName, dropPartition, | ||
| partitionValueExtractor, config)) { | ||
| String partitionClause = | ||
| HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, config); | ||
| poolClient.dropPartition(databaseName, tableName, partitionClause, false); | ||
| } | ||
| log.info("Drop partition {} on {}", dropPartition, tableName); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| // Close the pool first so the worker threads stop dispatching against their | ||
| // Drivers before we tear down anything else. The pool's close() runs | ||
| // Driver/SessionState cleanup on each worker's own thread. | ||
| driverPool.ifPresent(pool -> { | ||
| try { | ||
| pool.close(); | ||
| } catch (Exception e) { | ||
| log.warn("Error closing HiveDriverPool", e); | ||
| } | ||
| }); | ||
| if (metaStoreClient != null) { | ||
| Hive.closeCurrent(); | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤖 nit: the doc says "Pool size (number of worker resources) and worker-thread count" — could you simplify to something like "Number of worker threads (and pooled metastore clients) for parallel partition sync when
hoodie.datasource.hive_sync.batching.enabledis true"? The current wording lists "pool size" and "worker-thread count" as if they're distinct, but they're the same number.