diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml
index 36742a585f81f..11ec387bb72be 100644
--- a/.github/workflows/bot.yml
+++ b/.github/workflows/bot.yml
@@ -311,14 +311,8 @@ jobs:
strategy:
matrix:
include:
- - scalaProfile: "scala-2.12"
- sparkProfile: "spark3.3"
- sparkModules: "hudi-spark-datasource/hudi-spark3.3.x"
-
- - scalaProfile: "scala-2.12"
- sparkProfile: "spark3.4"
- sparkModules: "hudi-spark-datasource/hudi-spark3.4.x"
-
+ # Full suite runs only on the latest 3.x (Spark 3.5 / Scala 2.12 / Java 11).
+ # Older 3.x versions run the curated core set in test-spark-core-tests.
- scalaProfile: "scala-2.12"
sparkProfile: "spark3.5"
sparkModules: "hudi-spark-datasource/hudi-spark3.5.x"
@@ -368,14 +362,6 @@ jobs:
strategy:
matrix:
include:
- - scalaProfile: "scala-2.12"
- sparkProfile: "spark3.3"
- sparkModules: "hudi-spark-datasource/hudi-spark3.3.x"
-
- - scalaProfile: "scala-2.12"
- sparkProfile: "spark3.4"
- sparkModules: "hudi-spark-datasource/hudi-spark3.4.x"
-
- scalaProfile: "scala-2.12"
sparkProfile: "spark3.5"
sparkModules: "hudi-spark-datasource/hudi-spark3.5.x"
@@ -440,14 +426,6 @@ jobs:
strategy:
matrix:
include:
- - scalaProfile: "scala-2.12"
- sparkProfile: "spark3.3"
- sparkModules: "hudi-spark-datasource/hudi-spark3.3.x"
-
- - scalaProfile: "scala-2.12"
- sparkProfile: "spark3.4"
- sparkModules: "hudi-spark-datasource/hudi-spark3.4.x"
-
- scalaProfile: "scala-2.12"
sparkProfile: "spark3.5"
sparkModules: "hudi-spark-datasource/hudi-spark3.5.x"
@@ -505,14 +483,6 @@ jobs:
strategy:
matrix:
include:
- - scalaProfile: "scala-2.12"
- sparkProfile: "spark3.3"
- sparkModules: "hudi-spark-datasource/hudi-spark3.3.x"
-
- - scalaProfile: "scala-2.12"
- sparkProfile: "spark3.4"
- sparkModules: "hudi-spark-datasource/hudi-spark3.4.x"
-
- scalaProfile: "scala-2.12"
sparkProfile: "spark3.5"
sparkModules: "hudi-spark-datasource/hudi-spark3.5.x"
@@ -570,14 +540,6 @@ jobs:
strategy:
matrix:
include:
- - scalaProfile: "scala-2.12"
- sparkProfile: "spark3.3"
- sparkModules: "hudi-spark-datasource/hudi-spark3.3.x"
-
- - scalaProfile: "scala-2.12"
- sparkProfile: "spark3.4"
- sparkModules: "hudi-spark-datasource/hudi-spark3.4.x"
-
- scalaProfile: "scala-2.12"
sparkProfile: "spark3.5"
sparkModules: "hudi-spark-datasource/hudi-spark3.5.x"
@@ -685,15 +647,9 @@ jobs:
strategy:
matrix:
include:
- - scalaProfile: "scala-2.13"
- sparkProfile: "spark3.5"
- sparkModules: "hudi-spark-datasource/hudi-spark3.5.x"
- - scalaProfile: "scala-2.13"
- sparkProfile: "spark4.0"
- sparkModules: "hudi-spark-datasource/hudi-spark4.0.x"
- - scalaProfile: "scala-2.13"
- sparkProfile: "spark4.1"
- sparkModules: "hudi-spark-datasource/hudi-spark4.1.x"
+ # Full suite runs only on the latest 4.x (Spark 4.2 / Scala 2.13 / Java 17).
+ # Spark 3.5/4.0/4.1 on Java 17 run the curated core set in
+ # test-spark-core-tests.
- scalaProfile: "scala-2.13"
sparkProfile: "spark4.2"
sparkModules: "hudi-spark-datasource/hudi-spark4.2.x"
@@ -743,15 +699,6 @@ jobs:
strategy:
matrix:
include:
- - scalaProfile: "scala-2.13"
- sparkProfile: "spark3.5"
- sparkModules: "hudi-spark-datasource/hudi-spark3.5.x"
- - scalaProfile: "scala-2.13"
- sparkProfile: "spark4.0"
- sparkModules: "hudi-spark-datasource/hudi-spark4.0.x"
- - scalaProfile: "scala-2.13"
- sparkProfile: "spark4.1"
- sparkModules: "hudi-spark-datasource/hudi-spark4.1.x"
- scalaProfile: "scala-2.13"
sparkProfile: "spark4.2"
sparkModules: "hudi-spark-datasource/hudi-spark4.2.x"
@@ -816,15 +763,6 @@ jobs:
strategy:
matrix:
include:
- - scalaProfile: "scala-2.13"
- sparkProfile: "spark3.5"
- sparkModules: "hudi-spark-datasource/hudi-spark3.5.x"
- - scalaProfile: "scala-2.13"
- sparkProfile: "spark4.0"
- sparkModules: "hudi-spark-datasource/hudi-spark4.0.x"
- - scalaProfile: "scala-2.13"
- sparkProfile: "spark4.1"
- sparkModules: "hudi-spark-datasource/hudi-spark4.1.x"
- scalaProfile: "scala-2.13"
sparkProfile: "spark4.2"
sparkModules: "hudi-spark-datasource/hudi-spark4.2.x"
@@ -882,15 +820,6 @@ jobs:
strategy:
matrix:
include:
- - scalaProfile: "scala-2.13"
- sparkProfile: "spark3.5"
- sparkModules: "hudi-spark-datasource/hudi-spark3.5.x"
- - scalaProfile: "scala-2.13"
- sparkProfile: "spark4.0"
- sparkModules: "hudi-spark-datasource/hudi-spark4.0.x"
- - scalaProfile: "scala-2.13"
- sparkProfile: "spark4.1"
- sparkModules: "hudi-spark-datasource/hudi-spark4.1.x"
- scalaProfile: "scala-2.13"
sparkProfile: "spark4.2"
sparkModules: "hudi-spark-datasource/hudi-spark4.2.x"
@@ -948,15 +877,6 @@ jobs:
strategy:
matrix:
include:
- - scalaProfile: "scala-2.13"
- sparkProfile: "spark3.5"
- sparkModules: "hudi-spark-datasource/hudi-spark3.5.x"
- - scalaProfile: "scala-2.13"
- sparkProfile: "spark4.0"
- sparkModules: "hudi-spark-datasource/hudi-spark4.0.x"
- - scalaProfile: "scala-2.13"
- sparkProfile: "spark4.1"
- sparkModules: "hudi-spark-datasource/hudi-spark4.1.x"
- scalaProfile: "scala-2.13"
sparkProfile: "spark4.2"
sparkModules: "hudi-spark-datasource/hudi-spark4.2.x"
@@ -1008,6 +928,105 @@ jobs:
flags: spark-scala-tests
token: ${{ secrets.CODECOV_TOKEN }}
+ # Curated core flow subset (-Pcore-tests) on every Spark version, for both the
+ # Java 11 / Scala 2.12 and Java 17 / Scala 2.13 tracks in one matrix-driven job.
+ # The full unit/functional/scala suites above run only on the latest 3.x (spark3.5)
+ # and latest 4.x (spark4.2); these entries give the other versions a fast
+ # critical-path signal. The core set is excluded from the full suites
+ # (excludedGroups=core, tagsToExclude=SparkSQLCoreFlow), so no test runs twice.
+ # The per-entry mvnProfiles carries -Pjava17 for the Java 17 rows (empty for Java 11).
+ test-spark-core-tests:
+ name: test-spark-core-tests (${{ matrix.scalaProfile }}, ${{ matrix.sparkProfile }}, java${{ matrix.javaVersion }})
+ runs-on: ubuntu-latest
+ needs: changes
+ strategy:
+ matrix:
+ include:
+ - scalaProfile: "scala-2.12"
+ sparkProfile: "spark3.3"
+ sparkModules: "hudi-spark-datasource/hudi-spark3.3.x"
+ javaVersion: "11"
+ mvnProfiles: ""
+ - scalaProfile: "scala-2.12"
+ sparkProfile: "spark3.4"
+ sparkModules: "hudi-spark-datasource/hudi-spark3.4.x"
+ javaVersion: "11"
+ mvnProfiles: ""
+ - scalaProfile: "scala-2.12"
+ sparkProfile: "spark3.5"
+ sparkModules: "hudi-spark-datasource/hudi-spark3.5.x"
+ javaVersion: "11"
+ mvnProfiles: ""
+ - scalaProfile: "scala-2.13"
+ sparkProfile: "spark3.5"
+ sparkModules: "hudi-spark-datasource/hudi-spark3.5.x"
+ javaVersion: "17"
+ mvnProfiles: "-Pjava17"
+ - scalaProfile: "scala-2.13"
+ sparkProfile: "spark4.0"
+ sparkModules: "hudi-spark-datasource/hudi-spark4.0.x"
+ javaVersion: "17"
+ mvnProfiles: "-Pjava17"
+ - scalaProfile: "scala-2.13"
+ sparkProfile: "spark4.1"
+ sparkModules: "hudi-spark-datasource/hudi-spark4.1.x"
+ javaVersion: "17"
+ mvnProfiles: "-Pjava17"
+ - scalaProfile: "scala-2.13"
+ sparkProfile: "spark4.2"
+ sparkModules: "hudi-spark-datasource/hudi-spark4.2.x"
+ javaVersion: "17"
+ mvnProfiles: "-Pjava17"
+
+ steps:
+ - if: needs.changes.outputs.relevant == 'true'
+ uses: actions/checkout@v5
+ - name: Set up JDK ${{ matrix.javaVersion }}
+ if: needs.changes.outputs.relevant == 'true'
+ uses: actions/setup-java@v5
+ with:
+ java-version: "${{ matrix.javaVersion }}"
+ distribution: 'temurin'
+ architecture: x64
+ cache: maven
+ - name: Build Project
+ if: needs.changes.outputs.relevant == 'true'
+ env:
+ SCALA_PROFILE: ${{ matrix.scalaProfile }}
+ SPARK_PROFILE: ${{ matrix.sparkProfile }}
+ SPARK_MODULES: ${{ matrix.sparkModules }}
+ MVN_PROFILES: ${{ matrix.mvnProfiles }}
+ run:
+ mvn clean install -T 2 $MVN_PROFILES -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DskipTests=true $MVN_ARGS -am -pl "hudi-examples/hudi-examples-spark,hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES"
+ - name: Core Tests - Common & Spark
+ if: needs.changes.outputs.relevant == 'true'
+ env:
+ SCALA_PROFILE: ${{ matrix.scalaProfile }}
+ SPARK_PROFILE: ${{ matrix.sparkProfile }}
+ SPARK_MODULES: ${{ matrix.sparkModules }}
+ MVN_PROFILES: ${{ matrix.mvnProfiles }}
+ run:
+ mvn test -Pcore-tests $MVN_PROFILES -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -fae -pl "hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS -Djacoco.skip=false
+ - name: Quickstart Test
+ if: needs.changes.outputs.relevant == 'true'
+ env:
+ SCALA_PROFILE: ${{ matrix.scalaProfile }}
+ SPARK_PROFILE: ${{ matrix.sparkProfile }}
+ MVN_PROFILES: ${{ matrix.mvnProfiles }}
+ run:
+ mvn test -Punit-tests $MVN_PROFILES -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -pl hudi-examples/hudi-examples-spark $MVN_ARGS -Djacoco.skip=false
+ - name: Generate merged coverage report
+ if: always() && needs.changes.outputs.relevant == 'true'
+ run: ./scripts/jacoco/generate_merged_coverage_report.sh $GITHUB_WORKSPACE
+ - name: Upload coverage to Codecov
+ if: always() && needs.changes.outputs.relevant == 'true'
+ uses: codecov/codecov-action@75cd11691c0faa626561e295848008c8a7dddffe # v5
+ with:
+ files: ./jacoco-report.xml
+ disable_search: true
+ flags: spark-core-tests
+ token: ${{ secrets.CODECOV_TOKEN }}
+
test-flink-1:
runs-on: ubuntu-latest
needs: changes
diff --git a/hudi-spark-datasource/hudi-spark/pom.xml b/hudi-spark-datasource/hudi-spark/pom.xml
index efc0f33915236..ff045d782eb3e 100644
--- a/hudi-spark-datasource/hudi-spark/pom.xml
+++ b/hudi-spark-datasource/hudi-spark/pom.xml
@@ -116,7 +116,7 @@
org.scalatest
scalatest-maven-plugin
- org.apache.hudi.functional.SparkSQLCoreFlow
+ ${hoodie.scalatest.tagsToExclude}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkOrcReader.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkOrcReader.java
index 596ac5bfca240..63c991d53fb07 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkOrcReader.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkOrcReader.java
@@ -31,6 +31,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
@Tag("functional")
+@Tag("core")
public class TestSparkOrcReader extends TestBootstrapReadBase {
@Test
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkParquetReader.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkParquetReader.java
index 4b6ce42866fd3..8f2aa006cd12f 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkParquetReader.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkParquetReader.java
@@ -31,6 +31,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
@Tag("functional")
+@Tag("core")
public class TestSparkParquetReader extends TestBootstrapReadBase {
@Test
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java
index b2c9d24b1b961..3151fe3918a4b 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java
@@ -42,6 +42,7 @@
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -56,6 +57,7 @@
/**
* Unit tests {@link HoodieInternalRowParquetWriter}.
*/
+@Tag("core")
public class TestHoodieInternalRowParquetWriter extends HoodieSparkClientTestHarness {
@BeforeEach
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/CoreFlow.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/CoreFlow.scala
new file mode 100644
index 0000000000000..2f0280d07ff76
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/CoreFlow.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.scalatest.Tag
+
+/**
+ * ScalaTest tag for the curated "core flow" subset that runs on every Spark
+ * version in CI (via the {@code core-tests} Maven profile), as opposed to the
+ * full suite that runs only on the latest Spark major versions.
+ *
+ * The tag name matches the {@link SparkSQLCoreFlow} Java {@code @TagAnnotation}
+ * so that class-level {@code @SparkSQLCoreFlow} and per-test
+ * {@code taggedAs(CoreFlow)} are selected by the same scalatest
+ * {@code tagsToInclude}/{@code tagsToExclude} value.
+ */
+object CoreFlow extends Tag("org.apache.hudi.functional.SparkSQLCoreFlow")
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 599a105e6d001..c5b07d060b38b 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -55,7 +55,7 @@ import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
import org.apache.spark.sql.types.{ArrayType, DataTypes, DateType, IntegerType, LongType, MapType, StringType, StructField, StructType, TimestampType}
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Tag, Test}
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertTrue, fail}
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.params.ParameterizedTest
@@ -800,6 +800,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup
assertEquals(schema, actualSchema)
}
+ @Tag("core")
@ParameterizedTest
@EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK"))
def testCopyOnWriteDeletes(recordType: HoodieRecordType): Unit = {
@@ -831,6 +832,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup
assertEquals(snapshotDF2.count(), 80)
}
+ @Tag("core")
@Test
def testCopyOnWriteUpserts(): Unit = {
val recordType = HoodieRecordType.AVRO
@@ -1217,6 +1219,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup
assertEquals(hoodieIncViewDF2.count(), insert2NewKeyCnt)
}
+ @Tag("core")
@ParameterizedTest
@EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK"))
def testComplexDataTypeWriteAndReadConsistency(recordType: HoodieRecordType): Unit = {
@@ -1549,6 +1552,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup
assertEquals(false, Metrics.isInitialized(basePath))
}
+ @Tag("core")
@ParameterizedTest
@CsvSource(Array(
"true,false,AVRO", "true,true,AVRO", "false,true,AVRO", "false,false,AVRO"
@@ -1967,6 +1971,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup
}
/** Test case to verify MAKE_NEW_COLUMNS_NULLABLE config parameter. */
+ @Tag("core")
@Test
def testSchemaEvolutionWithNewColumn(): Unit = {
val df1 = spark.sql("select '1' as event_id, '2' as ts, '3' as version, 'foo' as event_date")
@@ -2263,6 +2268,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup
* - v6: Uses requestedTime for commits, open_close incremental ranges
* - v9: Uses completionTime for commits, close_close incremental ranges
*/
+ @Tag("core")
@ParameterizedTest
@CsvSource(Array("6", "9"))
def testIncrementalAndTimeTravelWithEventTimeOrdering(tableVersion: String): Unit = {
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 81d049d432439..b88a33f7d2d35 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -47,7 +47,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, Test}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, CsvSource, EnumSource, MethodSource, ValueSource}
@@ -104,6 +104,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin
JFunction.toJavaConsumer((receiver: SparkSessionExtensions) => new HoodieSparkSessionExtension().apply(receiver)))
)
+ @Tag("core")
@ParameterizedTest
@CsvSource(Array(
// Inferred as COMMIT_TIME_ORDERING
@@ -494,6 +495,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin
HoodieMemoryConfig.DEFAULT_MR_COMPACTION_MEMORY_FRACTION)
}
+ @Tag("core")
@ParameterizedTest
@CsvSource(value = Array("AVRO,6", "AVRO,8", "SPARK,6", "SPARK,8"))
def testPayloadDelete(recordType: HoodieRecordType, tableVersion: Int) {
@@ -573,6 +575,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin
assertEquals(0, hudiSnapshotDF3.count()) // 100 records were deleted, 0 record to load
}
+ @Tag("core")
@ParameterizedTest
@CsvSource(value = Array("AVRO,6", "AVRO,8", "SPARK,6", "SPARK,8"))
def testPrunedFiltered(recordType: HoodieRecordType, tableVersion: Int) {
@@ -683,6 +686,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin
assertEquals(0, hudiSnapshotDF3.filter("rider = 'rider-003'").count())
}
+ @Tag("core")
@ParameterizedTest
@EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK"))
def testVectorizedReader(recordType: HoodieRecordType) {
@@ -992,6 +996,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin
assertEquals(partitionCounts("2021/03/03"), count7)
}
+ @Tag("core")
@ParameterizedTest
@EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK"))
def testReadLogOnlyMergeOnReadTable(recordType: HoodieRecordType): Unit = {
@@ -1311,6 +1316,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin
*
* The read-optimized query should read `fg1_dc1.parquet` only in this case.
*/
+ @Tag("core")
@Test
def testReadOptimizedQueryAfterInflightCompactionAndCompletedDeltaCommit(): Unit = {
val (tableName, tablePath) = ("hoodie_mor_ro_read_test_table", s"${basePath}_mor_test_table")
@@ -1406,6 +1412,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin
roDf.where(col(recordKeyField) === 0).select(dataField).collect()(0).getLong(0))
}
+ @Tag("core")
@ParameterizedTest
@ValueSource(ints = Array(6, 8))
def testSnapshotQueryAfterInflightDeltaCommit(tableVersion: Int): Unit = {
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala
index 2f37470ce208d..4b560897563f4 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala
@@ -46,23 +46,19 @@ import scala.collection.JavaConverters._
class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase {
val colsToCompare = "timestamp, _row_key, partition_path, rider, driver, begin_lat, begin_lon, end_lat, end_lon, fare.amount, fare.currency, _hoodie_is_deleted"
- //params for core flow tests
+ // Params for core flow tests. This suite is the cross-Spark-version "core flow" anchor
+ // (runs on every Spark version via the core-tests profile), so the matrix is trimmed to a
+ // representative spread of table type, metadata on/off, key generator, and index type.
+ // The dropped keygen/index permutations are Spark-version-independent and remain covered
+ // by the full suite that runs on the latest Spark versions.
val params: List[String] = List(
"COPY_ON_WRITE|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM",
- "COPY_ON_WRITE|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM",
- "COPY_ON_WRITE|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE",
"COPY_ON_WRITE|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE",
"COPY_ON_WRITE|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM",
- "COPY_ON_WRITE|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM",
- "COPY_ON_WRITE|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE",
"COPY_ON_WRITE|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE",
"MERGE_ON_READ|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM",
- "MERGE_ON_READ|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM",
- "MERGE_ON_READ|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE",
"MERGE_ON_READ|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE",
"MERGE_ON_READ|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM",
- "MERGE_ON_READ|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM",
- "MERGE_ON_READ|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE",
"MERGE_ON_READ|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE"
)
@@ -409,39 +405,13 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase {
HoodieRecord.FILENAME_METADATA_FIELD)
}
- //params for immutable user flow
+ // Params for immutable user flow. Trimmed to cover both table types and both immutable
+ // write ops (insert, bulk_insert) with a representative metadata/keygen/index spread; see
+ // the note on `params` above for why the full permutation matrix is not needed here.
val paramsForImmutable: List[String] = List(
"COPY_ON_WRITE|insert|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM",
- "COPY_ON_WRITE|insert|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM",
- "COPY_ON_WRITE|insert|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE",
- "COPY_ON_WRITE|insert|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE",
- "COPY_ON_WRITE|insert|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM",
- "COPY_ON_WRITE|insert|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM",
- "COPY_ON_WRITE|insert|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE",
- "COPY_ON_WRITE|insert|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE",
- "MERGE_ON_READ|insert|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM",
- "MERGE_ON_READ|insert|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM",
- "MERGE_ON_READ|insert|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE",
- "MERGE_ON_READ|insert|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE",
- "MERGE_ON_READ|insert|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM",
- "MERGE_ON_READ|insert|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM",
- "MERGE_ON_READ|insert|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE",
- "MERGE_ON_READ|insert|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE",
- "COPY_ON_WRITE|bulk_insert|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM",
- "COPY_ON_WRITE|bulk_insert|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM",
- "COPY_ON_WRITE|bulk_insert|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE",
- "COPY_ON_WRITE|bulk_insert|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE",
- "COPY_ON_WRITE|bulk_insert|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM",
- "COPY_ON_WRITE|bulk_insert|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM",
- "COPY_ON_WRITE|bulk_insert|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE",
"COPY_ON_WRITE|bulk_insert|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE",
- "MERGE_ON_READ|bulk_insert|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM",
- "MERGE_ON_READ|bulk_insert|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM",
- "MERGE_ON_READ|bulk_insert|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE",
- "MERGE_ON_READ|bulk_insert|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE",
- "MERGE_ON_READ|bulk_insert|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM",
- "MERGE_ON_READ|bulk_insert|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM",
- "MERGE_ON_READ|bulk_insert|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE",
+ "MERGE_ON_READ|insert|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM",
"MERGE_ON_READ|bulk_insert|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE"
)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala
index 88ec4b7fb4c36..bf5e06dd1352a 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala
@@ -24,6 +24,7 @@ import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.PartitionPathEncodeUtils.escapePathName
import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.functional.CoreFlow
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat
import org.apache.hudi.keygen.constant.KeyGeneratorType
@@ -47,7 +48,7 @@ import scala.collection.JavaConverters._
class TestCreateTable extends HoodieSparkSqlTestBase {
- test("Test Create Managed Hoodie Table") {
+ test("Test Create Managed Hoodie Table", CoreFlow) {
val databaseName = "hudi_database"
spark.sql(s"create database if not exists $databaseName")
spark.sql(s"use $databaseName")
@@ -101,7 +102,7 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
spark.sql("use default")
}
- test("Test Create Hoodie Table With Options") {
+ test("Test Create Hoodie Table With Options", CoreFlow) {
val tableName = generateTableName
spark.sql(
s"""
@@ -154,7 +155,7 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
assertFalse(tableConfig.contains(OPERATION.key()))
}
- test("Test Create External Hoodie Table") {
+ test("Test Create External Hoodie Table", CoreFlow) {
withTempDir { tmp =>
// Test create cow table.
val tableName = generateTableName
@@ -236,7 +237,7 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
}
}
- test("Test Create External Hoodie Table with data") {
+ test("Test Create External Hoodie Table with data", CoreFlow) {
withTempDir { tmp =>
val options = Map(DataSourceWriteOptions.TABLE_TYPE.key -> HoodieTableType.MERGE_ON_READ.name(),
HoodieTableConfig.ORDERING_FIELDS.key -> "ordering",
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
index ebaa13770ed44..155ab028171c3 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala
@@ -23,6 +23,7 @@ import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.common.config.HoodieStorageConfig
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
import org.apache.hudi.exception.{HoodieDuplicateKeyException, HoodieException}
+import org.apache.hudi.functional.CoreFlow
import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
@@ -218,7 +219,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
}
}
- test("Test Insert Into with values") {
+ test("Test Insert Into with values", CoreFlow) {
withRecordType()(withTempDir { tmp =>
val tableName = generateTableName
// Create a partitioned table
@@ -305,7 +306,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
}
}
- test("Test Insert Into with dynamic partition") {
+ test("Test Insert Into with dynamic partition", CoreFlow) {
Seq("cow", "mor").foreach { tableType =>
withTempDir { tmp =>
val tableName = generateTableName
@@ -413,7 +414,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
}
}
- test("Test Insert Into Non Partitioned Table") {
+ test("Test Insert Into Non Partitioned Table", CoreFlow) {
withRecordType(Seq(HoodieRecordType.AVRO, HoodieRecordType.SPARK))(withTempDir { tmp =>
val tableName = generateTableName
withSQLConf("hoodie.datasource.insert.dup.policy" -> "fail") {
@@ -632,7 +633,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
})
}
- test("Test Insert Overwrite") {
+ test("Test Insert Overwrite", CoreFlow) {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
withTable(generateTableName) { tableName =>
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestDeleteTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestDeleteTable.scala
index 1bbf05557201f..99008ea9752fc 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestDeleteTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestDeleteTable.scala
@@ -22,13 +22,14 @@ package org.apache.spark.sql.hudi.dml.others
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.functional.CoreFlow
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
class TestDeleteTable extends HoodieSparkSqlTestBase {
- test("Test Delete Table") {
+ test("Test Delete Table", CoreFlow) {
withTempDir { tmp =>
Seq(true, false).foreach { sparkSqlOptimizedWrites =>
Seq("cow", "mor").foreach { tableType =>
@@ -187,7 +188,7 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
}
}
- test("Test Delete Table On Non-PK Condition") {
+ test("Test Delete Table On Non-PK Condition", CoreFlow) {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
@@ -277,7 +278,7 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
}
}
- test("Test Delete Table with op upsert") {
+ test("Test Delete Table with op upsert", CoreFlow) {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala
index 6f480eae6134f..a600863eb0598 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala
@@ -26,6 +26,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.common.testutils.HoodieTestUtils
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
import org.apache.hudi.config.HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT
+import org.apache.hudi.functional.CoreFlow
import org.apache.hudi.storage.StoragePath
import org.apache.hudi.testutils.DataSourceTestUtils
import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
@@ -40,7 +41,7 @@ import org.slf4j.LoggerFactory
class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSupport {
private val log = LoggerFactory.getLogger(getClass)
- test("Test MergeInto Basic") {
+ test("Test MergeInto Basic", CoreFlow) {
Seq(true, false).foreach { sparkSqlOptimizedWrites =>
withRecordType()(withTempDir { tmp =>
withSparkSqlSessionConfig("hoodie.payload.combined.schema.validate" -> "false",
@@ -474,7 +475,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo
}
}
- test("Test MergeInto for MOR table") {
+ test("Test MergeInto for MOR table", CoreFlow) {
withTempDir { tmp =>
withSQLConf("hoodie.payload.combined.schema.validate" -> "true", MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key() -> "0") {
val tableName = generateTableName
@@ -614,7 +615,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo
}
}
- test("Test MergeInto with insert only") {
+ test("Test MergeInto with insert only", CoreFlow) {
withTempDir { tmp =>
withSQLConf("hoodie.payload.combined.schema.validate" -> "true", MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key() -> "0") {
// Create a partitioned mor table
@@ -915,7 +916,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo
}
}
- test("Test MergeInto with combination of delete update insert") {
+ test("Test MergeInto with combination of delete update insert", CoreFlow) {
withRecordType()(withTempDir { tmp =>
withSQLConf("hoodie.payload.combined.schema.validate" -> "true") {
val sourceTable = generateTableName
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestUpdateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestUpdateTable.scala
index 9d67da0db5cee..cd487fb4db4f9 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestUpdateTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestUpdateTable.scala
@@ -25,6 +25,7 @@ import org.apache.hudi.HoodieSparkUtils.gteqSpark3_4
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.util.{Option => HOption}
+import org.apache.hudi.functional.CoreFlow
import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
import org.apache.spark.sql.{AnalysisException, Row}
@@ -33,7 +34,7 @@ import org.junit.jupiter.api.Assertions.assertEquals
class TestUpdateTable extends HoodieSparkSqlTestBase {
- test("Test Update Table") {
+ test("Test Update Table", CoreFlow) {
withRecordType()(withTempDir { tmp =>
Seq(true, false).foreach { sparkSqlOptimizedWrites =>
Seq("cow", "mor").foreach { tableType =>
@@ -144,7 +145,7 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
})
}
- test("Test Update Table On Non-PK Condition") {
+ test("Test Update Table On Non-PK Condition", CoreFlow) {
withRecordType()(withTempDir { tmp =>
Seq("cow", "mor").foreach {tableType =>
/** non-partitioned table */
@@ -233,7 +234,7 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
})
}
- test("Test ignoring case for Update Table") {
+ test("Test ignoring case for Update Table", CoreFlow) {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
diff --git a/pom.xml b/pom.xml
index f142ebf009b85..38a2a2965bd93 100644
--- a/pom.xml
+++ b/pom.xml
@@ -210,6 +210,10 @@
${skipTests}
${skipTests}
${skipTests}
+
+ org.apache.hudi.functional.SparkSQLCoreFlow
UTF-8
${project.basedir}
provided
@@ -2088,7 +2092,7 @@
${skipUTs}
120
- functional,functional-b,functional-c
+ functional,functional-b,functional-c,core
**/IT*.java
**/testsuite/**/Test*.java
@@ -2141,6 +2145,7 @@
1
true
functional
+ core
@@ -2189,6 +2194,7 @@
1
true
functional-b
+ core
@@ -2237,6 +2243,7 @@
1
true
functional-c
+ core
@@ -2267,6 +2274,70 @@
+
+
+ core-tests
+
+ false
+ true
+ true
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ ${maven-surefire-plugin.version}
+
+ ${skipUTs}
+ 120
+ core
+
+ **/IT*.java
+ **/testsuite/**/Test*.java
+
+
+
+
+ org.scalatest
+ scalatest-maven-plugin
+
+ org.apache.hudi.functional.SparkSQLCoreFlow
+
+
+
+ org.jacoco
+ jacoco-maven-plugin
+
+
+
+ prepare-agent
+
+
+ ${project.build.directory}/jacoco-agent/${jacoco.agent.dest.filename}
+
+
+
+ post-unit-tests
+ test
+
+ report
+
+
+ ${project.build.directory}/jacoco-agent/${jacoco.agent.dest.filename}
+ ${project.reporting.outputDirectory}/jacoco-ut
+
+
+
+
+
+
+
hudi-platform-service