Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,23 @@ public class HiveSyncConfigHolder {
.defaultValue(1000)
.markAdvanced()
.withDocumentation("The number of partitions one batch when synchronous partitions to hive.");
public static final ConfigProperty<Boolean> HIVE_SYNC_BATCHING_ENABLED = ConfigProperty
.key("hoodie.datasource.hive_sync.batching.enabled")
.defaultValue(false)
.markAdvanced()
.sinceVersion("1.1.0")
.withDocumentation("When true, partition operations (add/update/touch/drop) in the active sync mode "
+ "are split into batches of `hoodie.datasource.hive_sync.batch_num` and dispatched in parallel "
+ "to a pool of workers. Table-level operations (create/alter table, last commit time, writer "
+ "version) continue to use the single session client/driver. Default off; existing behavior is "
+ "unchanged unless explicitly opted in.");
public static final ConfigProperty<Integer> HIVE_SYNC_BATCHING_THREADS = ConfigProperty
.key("hoodie.datasource.hive_sync.batching.threads")
.defaultValue(4)
.markAdvanced()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the doc says "Pool size (number of worker resources) and worker-thread count" — could you simplify to something like "Number of worker threads (and pooled metastore clients) for parallel partition sync when hoodie.datasource.hive_sync.batching.enabled is true"? The current wording lists "pool size" and "worker-thread count" as if they're distinct, but they're the same number.

⚠️ AI-generated; verify before applying. React 👍/👎 to flag quality.

.sinceVersion("1.1.0")
.withDocumentation("Pool size (number of worker resources) and worker-thread count used for parallel "
+ "partition sync when `hoodie.datasource.hive_sync.batching.enabled` is true.");
public static final ConfigProperty<String> HIVE_SYNC_MODE = ConfigProperty
.key("hoodie.datasource.hive_sync.mode")
.noDefaultValue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.hudi.hive.ddl.HiveSyncMode;
import org.apache.hudi.hive.ddl.JDBCBasedMetadataOperator;
import org.apache.hudi.hive.ddl.JDBCExecutor;
import org.apache.hudi.hive.util.HiveDriverPool;
import org.apache.hudi.hive.util.IMetaStoreClientPool;
import org.apache.hudi.hive.util.IMetaStoreClientUtil;
import org.apache.hudi.hive.util.PartitionFilterGenerator;
import org.apache.hudi.sync.common.HoodieSyncClient;
Expand Down Expand Up @@ -66,6 +68,8 @@
import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getInputFormatClassName;
import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getOutputFormatClassName;
import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getSerDeClassName;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BATCHING_ENABLED;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BATCHING_THREADS;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_MODE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_USE_SPARK_CATALOG;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_JDBC;
Expand All @@ -86,6 +90,15 @@ public class HoodieHiveSyncClient extends HoodieSyncClient {
private final Map<String, Table> initialTableByName = new HashMap<>();
DDLExecutor ddlExecutor;
private IMetaStoreClient client;
// Non-null only when HIVE_SYNC_BATCHING_ENABLED and sync mode is HIVEQL. Owned by
// this class; closed in close() before Hive.closeCurrent(). HiveQueryDDLExecutor
// uses it only for DROP (Hive Thrift, not Hive Driver) — see IMetaStoreClientPool
// javadoc.
private IMetaStoreClientPool partitionClientPool;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: partitionClientPool is a raw nullable while partitionDriverPool just below (line 101) is wrapped in Option<> — both represent the same "batching enabled" condition, so it might be worth making the container type consistent (either both Option<>, or both nullable with the same guard pattern) to avoid a future reader wondering if there's a semantic difference.

⚠️ AI-generated; verify before applying. React 👍/👎 to flag quality.

// Present only when HIVE_SYNC_BATCHING_ENABLED and sync mode is HIVEQL (explicit
// or legacy default). Owned by HiveQueryDDLExecutor; this field is kept for
// reference only — close() is delegated through ddlExecutor.close().
private Option<HiveDriverPool> partitionDriverPool = Option.empty();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the comment says this field is 'kept for reference only — close() is delegated through ddlExecutor.close()', which means it's never read after being passed into HiveQueryDDLExecutor. Could you make it a local variable in the constructor instead? A field that isn't read after construction will make future readers wonder what lifecycle role it plays.

- AI-generated; verify before applying. React 👍/👎 to flag quality.


/**
* JDBC-based metadata operator, lazily initialized on first Thrift
Expand Down Expand Up @@ -124,7 +137,10 @@ public HoodieHiveSyncClient(HiveSyncConfig config, HoodieTableMetaClient metaCli
ddlExecutor = new HMSDDLExecutor(config, this.client);
break;
case HIVEQL:
ddlExecutor = new HiveQueryDDLExecutor(config, this.client);
this.partitionDriverPool = maybeBuildHiveDriverPool(config);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 These two pools are built eagerly here, but if maybeBuildPartitionClientPool(...) or the HiveQueryDDLExecutor constructor on the next line throws, the outer catch (line ~148) just rethrows — and since the constructor failed, close() is never called, so the already-built pool's worker threads + Thrift connections leak. Could we close partitionDriverPool/partitionClientPool in the catch before rethrowing? Same pattern in the legacy default-mode branch below.

⚠️ AI-generated; verify before applying. React 👍/👎 to flag quality.

this.partitionClientPool = maybeBuildPartitionClientPool(config);
ddlExecutor = new HiveQueryDDLExecutor(config, this.client, this.partitionDriverPool,
Option.ofNullable(this.partitionClientPool));
break;
case JDBC:
JDBCExecutor jdbcExecutor = new JDBCExecutor(config);
Expand All @@ -142,7 +158,10 @@ public HoodieHiveSyncClient(HiveSyncConfig config, HoodieTableMetaClient metaCli
jdbcMetadataOperator = new JDBCBasedMetadataOperator(
jdbcExecutor.getConnection(), databaseName);
} else {
ddlExecutor = new HiveQueryDDLExecutor(config, this.client);
this.partitionDriverPool = maybeBuildHiveDriverPool(config);
this.partitionClientPool = maybeBuildPartitionClientPool(config);
ddlExecutor = new HiveQueryDDLExecutor(config, this.client, this.partitionDriverPool,
Option.ofNullable(this.partitionClientPool));
}
}
} catch (Exception e) {
Expand Down Expand Up @@ -201,6 +220,30 @@ private IMetaStoreClient createMetaStoreClient(HiveSyncConfig config) {
}
}

private IMetaStoreClientPool maybeBuildPartitionClientPool(HiveSyncConfig config) {
if (!config.getBooleanOrDefault(HIVE_SYNC_BATCHING_ENABLED)) {
return null;
}
if (config.getBooleanOrDefault(HIVE_SYNC_USE_SPARK_CATALOG)) {
// The Spark catalog client is constructed via reflection against a Spark-side
// class and isn't compatible with the direct RetryingMetaStoreClient pool path.
// Fall back to single-client sequential behavior rather than failing the sync.
log.warn("hive_sync.batching.enabled=true is not supported with use_spark_catalog=true; "
+ "falling back to sequential partition sync.");
return null;
}
int size = config.getIntOrDefault(HIVE_SYNC_BATCHING_THREADS);
return new IMetaStoreClientPool(config, size);
}

private Option<HiveDriverPool> maybeBuildHiveDriverPool(HiveSyncConfig config) {
if (!config.getBooleanOrDefault(HIVE_SYNC_BATCHING_ENABLED)) {
return Option.empty();
}
int size = config.getIntOrDefault(HIVE_SYNC_BATCHING_THREADS);
return Option.of(new HiveDriverPool(config, size));
}

private Table getInitialTable(String table) {
return initialTableByName.computeIfAbsent(table, t -> {
try {
Expand Down Expand Up @@ -597,6 +640,17 @@ public void deleteLastReplicatedTimeStamp(String tableName) {
public void close() {
try {
ddlExecutor.close();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 If ddlExecutor.close() throws, control jumps to the outer catch (Exception e) { log.error(...) } (which doesn't rethrow), so partitionClientPool.close() never runs and the Thrift sockets + worker threads leak silently. Could you wrap the pool cleanup in a try-finally so it runs regardless of whether ddlExecutor.close() succeeds? E.g. try { ddlExecutor.close(); } finally { if (partitionClientPool != null) { ... } }.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

// Close the partition client pool before Hive.closeCurrent() so the
// RetryingMetaStoreClient instances held by the pool release their Thrift
// sockets without racing the ThreadLocal Hive cleanup.
if (partitionClientPool != null) {
try {
partitionClientPool.close();
} catch (Exception e) {
log.warn("Error closing IMetaStoreClient pool", e);
}
partitionClientPool = null;
}
if (client != null) {
Hive.closeCurrent();
client = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@

package org.apache.hudi.hive.ddl;

import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveSyncException;
import org.apache.hudi.hive.util.HiveDriverPool;
import org.apache.hudi.hive.util.HivePartitionUtil;
import org.apache.hudi.hive.util.IMetaStoreClientPool;

import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
Expand All @@ -39,8 +43,10 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM;
import static org.apache.hudi.sync.common.util.TableUtils.tableId;

/**
Expand All @@ -52,10 +58,26 @@ public class HiveQueryDDLExecutor extends QueryBasedDDLExecutor {
private final IMetaStoreClient metaStoreClient;
private SessionState sessionState;
private Driver hiveDriver;
// When present, partition-phase SQL lists fan out across this pool; table-level SQL
// (createTable, schema evolution, single-statement runSQL callers) always uses the
// session `hiveDriver` above. See HiveDriverPool javadoc.
private final Option<HiveDriverPool> driverPool;
// When present, dropPartitionsToTable fans batches across this Thrift client pool.
// Owned by HoodieHiveSyncClient; close() is delegated through there. See
// IMetaStoreClientPool javadoc for the usage contract (partition-row ops only).
private final Option<IMetaStoreClientPool> metaStoreClientPool;

public HiveQueryDDLExecutor(HiveSyncConfig config, IMetaStoreClient metaStoreClient) {
this(config, metaStoreClient, Option.empty(), Option.empty());
}

public HiveQueryDDLExecutor(HiveSyncConfig config, IMetaStoreClient metaStoreClient,
Option<HiveDriverPool> driverPool,
Option<IMetaStoreClientPool> metaStoreClientPool) {
super(config);
this.metaStoreClient = metaStoreClient;
this.driverPool = driverPool;
this.metaStoreClientPool = metaStoreClientPool;
try {
this.sessionState = new SessionState(config.getHiveConf(),
UserGroupInformation.getCurrentUser().getShortUserName());
Expand All @@ -82,19 +104,61 @@ public void runSQL(String sql) {
updateHiveSQLs(Collections.singletonList(sql));
}

/**
* Partition-phase SQL fan-out. When the driver pool is present, any leading
* {@code USE database} statements are run on every worker (Hive 2.x's
* ALTER PARTITION SET LOCATION ignores db.table qualifiers and uses the
* connection's current database, so each worker needs to USE the right db
* before any partition ALTER). The remaining statements are then dispatched
* round-robin across the pool. Falls through to the sequential path on the
* session Driver when no pool is configured.
*/
@Override
protected void runSQLs(List<String> sqls) {
if (sqls.isEmpty()) {
return;
}
if (!driverPool.isPresent()) {
updateHiveSQLs(sqls);
return;
}
HiveDriverPool pool = driverPool.get();
int firstNonUse = 0;
while (firstNonUse < sqls.size() && isUseStatement(sqls.get(firstNonUse))) {
firstNonUse++;
}
if (firstNonUse > 0) {
List<String> setupStatements = sqls.subList(0, firstNonUse);
pool.runOnEachWorker(setupStatements);
}
List<String> partitionStatements = sqls.subList(firstNonUse, sqls.size());
if (partitionStatements.isEmpty()) {
return;
}
List<Future<?>> futures = pool.runAll(partitionStatements);
pool.awaitAll(futures);
}

// Strict 4-char prefix match on "USE ". Internal callers (constructPartitionAlterStatements)
// always emit the USE statement without leading whitespace; do not call with externally
// supplied SQL that might be padded.
private static boolean isUseStatement(String sql) {
return sql != null && sql.regionMatches(true, 0, "USE ", 0, 4);
}

private List<CommandProcessorResponse> updateHiveSQLs(List<String> sqls) {
List<CommandProcessorResponse> responses = new ArrayList<>();
HoodieTimer timer = HoodieTimer.start();
try {
for (String sql : sqls) {
if (hiveDriver != null) {
HoodieTimer timer = HoodieTimer.start();
responses.add(hiveDriver.run(sql));
log.info("Time taken to execute [{}]: {} ms", sql, timer.endTimer());
}
}
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed in executing SQL", e);
}
log.info("Executed {} SQL statements sequentially in {} ms", sqls.size(), timer.endTimer());
return responses;
}

Expand Down Expand Up @@ -132,23 +196,81 @@ public void dropPartitionsToTable(String tableName, List<String> partitionsToDro

log.info("Drop partitions {} on {}", partitionsToDrop.size(), tableName);
try {
for (String dropPartition : partitionsToDrop) {
if (HivePartitionUtil.partitionExists(metaStoreClient, tableName, dropPartition, partitionValueExtractor,
config)) {
String partitionClause =
HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, config);
metaStoreClient.dropPartition(databaseName, tableName, partitionClause, false);
}
log.info("Drop partition {} on {}", dropPartition, tableName);
}
int batchSyncPartitionNum = config.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM);
List<List<String>> batches = CollectionUtils.batches(partitionsToDrop, batchSyncPartitionNum);
runDropBatches(tableName, batches);
} catch (Exception e) {
log.error("{} drop partition failed", tableId(databaseName, tableName), e);
throw new HoodieHiveSyncException(tableId(databaseName, tableName) + " drop partition failed", e);
}
}

/**
* Drops partitions one batch at a time. When {@link #metaStoreClientPool} is present,
* batches fan out across the pool's worker threads (each borrowing an independent
* IMetaStoreClient); otherwise batches are dispatched sequentially against the
* session client. Hive has no batch-drop primitive that matches dropPartition's
* semantics, so each worker still iterates its chunk one partition at a time — the
* win is fanning chunks across independent Thrift clients.
*/
private void runDropBatches(String tableName, List<List<String>> batches) throws Exception {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the parallel-path error-collection loop inside runDropBatches (Exception firstError = null; for (Future<Void> f : futures) …) is nearly identical to the one in HMSDDLExecutor.runBatches. Since HiveDriverPool already has a well-factored awaitAll(List<Future<?>>) for exactly this pattern, it might be worth adding a similar awaitAll(List<Future<Void>>) to IMetaStoreClientPool so both callers can delegate to it instead.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

if (!metaStoreClientPool.isPresent()) {
for (List<String> batch : batches) {
applyDropBatch(metaStoreClient, tableName, batch);
}
return;
}
IMetaStoreClientPool pool = metaStoreClientPool.get();
List<Future<Void>> futures = new ArrayList<>(batches.size());
for (List<String> batch : batches) {
futures.add(pool.executor().submit(() ->
pool.run(poolClient -> {
applyDropBatch(poolClient, tableName, batch);
return null;
})
));
}
Exception firstError = null;
for (Future<Void> f : futures) {
try {
f.get();
} catch (Exception e) {
if (firstError == null) {
firstError = e;
} else {
log.warn("Additional drop batch failed on {} (suppressed in favor of first error)", tableName, e);
}
}
}
if (firstError != null) {
throw firstError;
}
}

private void applyDropBatch(IMetaStoreClient poolClient, String tableName, List<String> batch) throws Exception {
for (String dropPartition : batch) {
if (HivePartitionUtil.partitionExists(poolClient, tableName, dropPartition,
partitionValueExtractor, config)) {
String partitionClause =
HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, config);
poolClient.dropPartition(databaseName, tableName, partitionClause, false);
}
log.info("Drop partition {} on {}", dropPartition, tableName);
}
}

@Override
public void close() {
// Close the pool first so the worker threads stop dispatching against their
// Drivers before we tear down anything else. The pool's close() runs
// Driver/SessionState cleanup on each worker's own thread.
driverPool.ifPresent(pool -> {
try {
pool.close();
} catch (Exception e) {
log.warn("Error closing HiveDriverPool", e);
}
});
if (metaStoreClient != null) {
Hive.closeCurrent();
}
Expand Down
Loading
Loading