diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml index 36742a585f81f..11ec387bb72be 100644 --- a/.github/workflows/bot.yml +++ b/.github/workflows/bot.yml @@ -311,14 +311,8 @@ jobs: strategy: matrix: include: - - scalaProfile: "scala-2.12" - sparkProfile: "spark3.3" - sparkModules: "hudi-spark-datasource/hudi-spark3.3.x" - - - scalaProfile: "scala-2.12" - sparkProfile: "spark3.4" - sparkModules: "hudi-spark-datasource/hudi-spark3.4.x" - + # Full suite runs only on the latest 3.x (Spark 3.5 / Scala 2.12 / Java 11). + # Older 3.x versions run the curated core set in test-spark-core-tests. - scalaProfile: "scala-2.12" sparkProfile: "spark3.5" sparkModules: "hudi-spark-datasource/hudi-spark3.5.x" @@ -368,14 +362,6 @@ jobs: strategy: matrix: include: - - scalaProfile: "scala-2.12" - sparkProfile: "spark3.3" - sparkModules: "hudi-spark-datasource/hudi-spark3.3.x" - - - scalaProfile: "scala-2.12" - sparkProfile: "spark3.4" - sparkModules: "hudi-spark-datasource/hudi-spark3.4.x" - - scalaProfile: "scala-2.12" sparkProfile: "spark3.5" sparkModules: "hudi-spark-datasource/hudi-spark3.5.x" @@ -440,14 +426,6 @@ jobs: strategy: matrix: include: - - scalaProfile: "scala-2.12" - sparkProfile: "spark3.3" - sparkModules: "hudi-spark-datasource/hudi-spark3.3.x" - - - scalaProfile: "scala-2.12" - sparkProfile: "spark3.4" - sparkModules: "hudi-spark-datasource/hudi-spark3.4.x" - - scalaProfile: "scala-2.12" sparkProfile: "spark3.5" sparkModules: "hudi-spark-datasource/hudi-spark3.5.x" @@ -505,14 +483,6 @@ jobs: strategy: matrix: include: - - scalaProfile: "scala-2.12" - sparkProfile: "spark3.3" - sparkModules: "hudi-spark-datasource/hudi-spark3.3.x" - - - scalaProfile: "scala-2.12" - sparkProfile: "spark3.4" - sparkModules: "hudi-spark-datasource/hudi-spark3.4.x" - - scalaProfile: "scala-2.12" sparkProfile: "spark3.5" sparkModules: "hudi-spark-datasource/hudi-spark3.5.x" @@ -570,14 +540,6 @@ jobs: strategy: matrix: include: - - scalaProfile: "scala-2.12" - sparkProfile: "spark3.3" - sparkModules: "hudi-spark-datasource/hudi-spark3.3.x" - - - scalaProfile: "scala-2.12" - sparkProfile: "spark3.4" - sparkModules: "hudi-spark-datasource/hudi-spark3.4.x" - - scalaProfile: "scala-2.12" sparkProfile: "spark3.5" sparkModules: "hudi-spark-datasource/hudi-spark3.5.x" @@ -685,15 +647,9 @@ jobs: strategy: matrix: include: - - scalaProfile: "scala-2.13" - sparkProfile: "spark3.5" - sparkModules: "hudi-spark-datasource/hudi-spark3.5.x" - - scalaProfile: "scala-2.13" - sparkProfile: "spark4.0" - sparkModules: "hudi-spark-datasource/hudi-spark4.0.x" - - scalaProfile: "scala-2.13" - sparkProfile: "spark4.1" - sparkModules: "hudi-spark-datasource/hudi-spark4.1.x" + # Full suite runs only on the latest 4.x (Spark 4.2 / Scala 2.13 / Java 17). + # Spark 3.5/4.0/4.1 on Java 17 run the curated core set in + # test-spark-core-tests. - scalaProfile: "scala-2.13" sparkProfile: "spark4.2" sparkModules: "hudi-spark-datasource/hudi-spark4.2.x" @@ -743,15 +699,6 @@ jobs: strategy: matrix: include: - - scalaProfile: "scala-2.13" - sparkProfile: "spark3.5" - sparkModules: "hudi-spark-datasource/hudi-spark3.5.x" - - scalaProfile: "scala-2.13" - sparkProfile: "spark4.0" - sparkModules: "hudi-spark-datasource/hudi-spark4.0.x" - - scalaProfile: "scala-2.13" - sparkProfile: "spark4.1" - sparkModules: "hudi-spark-datasource/hudi-spark4.1.x" - scalaProfile: "scala-2.13" sparkProfile: "spark4.2" sparkModules: "hudi-spark-datasource/hudi-spark4.2.x" @@ -816,15 +763,6 @@ jobs: strategy: matrix: include: - - scalaProfile: "scala-2.13" - sparkProfile: "spark3.5" - sparkModules: "hudi-spark-datasource/hudi-spark3.5.x" - - scalaProfile: "scala-2.13" - sparkProfile: "spark4.0" - sparkModules: "hudi-spark-datasource/hudi-spark4.0.x" - - scalaProfile: "scala-2.13" - sparkProfile: "spark4.1" - sparkModules: "hudi-spark-datasource/hudi-spark4.1.x" - scalaProfile: "scala-2.13" sparkProfile: "spark4.2" sparkModules: "hudi-spark-datasource/hudi-spark4.2.x" @@ -882,15 +820,6 @@ jobs: strategy: matrix: include: - - scalaProfile: "scala-2.13" - sparkProfile: "spark3.5" - sparkModules: "hudi-spark-datasource/hudi-spark3.5.x" - - scalaProfile: "scala-2.13" - sparkProfile: "spark4.0" - sparkModules: "hudi-spark-datasource/hudi-spark4.0.x" - - scalaProfile: "scala-2.13" - sparkProfile: "spark4.1" - sparkModules: "hudi-spark-datasource/hudi-spark4.1.x" - scalaProfile: "scala-2.13" sparkProfile: "spark4.2" sparkModules: "hudi-spark-datasource/hudi-spark4.2.x" @@ -948,15 +877,6 @@ jobs: strategy: matrix: include: - - scalaProfile: "scala-2.13" - sparkProfile: "spark3.5" - sparkModules: "hudi-spark-datasource/hudi-spark3.5.x" - - scalaProfile: "scala-2.13" - sparkProfile: "spark4.0" - sparkModules: "hudi-spark-datasource/hudi-spark4.0.x" - - scalaProfile: "scala-2.13" - sparkProfile: "spark4.1" - sparkModules: "hudi-spark-datasource/hudi-spark4.1.x" - scalaProfile: "scala-2.13" sparkProfile: "spark4.2" sparkModules: "hudi-spark-datasource/hudi-spark4.2.x" @@ -1008,6 +928,105 @@ jobs: flags: spark-scala-tests token: ${{ secrets.CODECOV_TOKEN }} + # Curated core flow subset (-Pcore-tests) on every Spark version, for both the + # Java 11 / Scala 2.12 and Java 17 / Scala 2.13 tracks in one matrix-driven job. + # The full unit/functional/scala suites above run only on the latest 3.x (spark3.5) + # and latest 4.x (spark4.2); these entries give the other versions a fast + # critical-path signal. The core set is excluded from the full suites + # (excludedGroups=core, tagsToExclude=SparkSQLCoreFlow), so no test runs twice. + # The per-entry mvnProfiles carries -Pjava17 for the Java 17 rows (empty for Java 11). + test-spark-core-tests: + name: test-spark-core-tests (${{ matrix.scalaProfile }}, ${{ matrix.sparkProfile }}, java${{ matrix.javaVersion }}) + runs-on: ubuntu-latest + needs: changes + strategy: + matrix: + include: + - scalaProfile: "scala-2.12" + sparkProfile: "spark3.3" + sparkModules: "hudi-spark-datasource/hudi-spark3.3.x" + javaVersion: "11" + mvnProfiles: "" + - scalaProfile: "scala-2.12" + sparkProfile: "spark3.4" + sparkModules: "hudi-spark-datasource/hudi-spark3.4.x" + javaVersion: "11" + mvnProfiles: "" + - scalaProfile: "scala-2.12" + sparkProfile: "spark3.5" + sparkModules: "hudi-spark-datasource/hudi-spark3.5.x" + javaVersion: "11" + mvnProfiles: "" + - scalaProfile: "scala-2.13" + sparkProfile: "spark3.5" + sparkModules: "hudi-spark-datasource/hudi-spark3.5.x" + javaVersion: "17" + mvnProfiles: "-Pjava17" + - scalaProfile: "scala-2.13" + sparkProfile: "spark4.0" + sparkModules: "hudi-spark-datasource/hudi-spark4.0.x" + javaVersion: "17" + mvnProfiles: "-Pjava17" + - scalaProfile: "scala-2.13" + sparkProfile: "spark4.1" + sparkModules: "hudi-spark-datasource/hudi-spark4.1.x" + javaVersion: "17" + mvnProfiles: "-Pjava17" + - scalaProfile: "scala-2.13" + sparkProfile: "spark4.2" + sparkModules: "hudi-spark-datasource/hudi-spark4.2.x" + javaVersion: "17" + mvnProfiles: "-Pjava17" + + steps: + - if: needs.changes.outputs.relevant == 'true' + uses: actions/checkout@v5 + - name: Set up JDK ${{ matrix.javaVersion }} + if: needs.changes.outputs.relevant == 'true' + uses: actions/setup-java@v5 + with: + java-version: "${{ matrix.javaVersion }}" + distribution: 'temurin' + architecture: x64 + cache: maven + - name: Build Project + if: needs.changes.outputs.relevant == 'true' + env: + SCALA_PROFILE: ${{ matrix.scalaProfile }} + SPARK_PROFILE: ${{ matrix.sparkProfile }} + SPARK_MODULES: ${{ matrix.sparkModules }} + MVN_PROFILES: ${{ matrix.mvnProfiles }} + run: + mvn clean install -T 2 $MVN_PROFILES -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DskipTests=true $MVN_ARGS -am -pl "hudi-examples/hudi-examples-spark,hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES" + - name: Core Tests - Common & Spark + if: needs.changes.outputs.relevant == 'true' + env: + SCALA_PROFILE: ${{ matrix.scalaProfile }} + SPARK_PROFILE: ${{ matrix.sparkProfile }} + SPARK_MODULES: ${{ matrix.sparkModules }} + MVN_PROFILES: ${{ matrix.mvnProfiles }} + run: + mvn test -Pcore-tests $MVN_PROFILES -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -fae -pl "hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS -Djacoco.skip=false + - name: Quickstart Test + if: needs.changes.outputs.relevant == 'true' + env: + SCALA_PROFILE: ${{ matrix.scalaProfile }} + SPARK_PROFILE: ${{ matrix.sparkProfile }} + MVN_PROFILES: ${{ matrix.mvnProfiles }} + run: + mvn test -Punit-tests $MVN_PROFILES -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -pl hudi-examples/hudi-examples-spark $MVN_ARGS -Djacoco.skip=false + - name: Generate merged coverage report + if: always() && needs.changes.outputs.relevant == 'true' + run: ./scripts/jacoco/generate_merged_coverage_report.sh $GITHUB_WORKSPACE + - name: Upload coverage to Codecov + if: always() && needs.changes.outputs.relevant == 'true' + uses: codecov/codecov-action@75cd11691c0faa626561e295848008c8a7dddffe # v5 + with: + files: ./jacoco-report.xml + disable_search: true + flags: spark-core-tests + token: ${{ secrets.CODECOV_TOKEN }} + test-flink-1: runs-on: ubuntu-latest needs: changes diff --git a/hudi-spark-datasource/hudi-spark/pom.xml b/hudi-spark-datasource/hudi-spark/pom.xml index efc0f33915236..ff045d782eb3e 100644 --- a/hudi-spark-datasource/hudi-spark/pom.xml +++ b/hudi-spark-datasource/hudi-spark/pom.xml @@ -116,7 +116,7 @@ org.scalatest scalatest-maven-plugin - org.apache.hudi.functional.SparkSQLCoreFlow + ${hoodie.scalatest.tagsToExclude} diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkOrcReader.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkOrcReader.java index 596ac5bfca240..63c991d53fb07 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkOrcReader.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkOrcReader.java @@ -31,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; @Tag("functional") +@Tag("core") public class TestSparkOrcReader extends TestBootstrapReadBase { @Test diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkParquetReader.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkParquetReader.java index 4b6ce42866fd3..8f2aa006cd12f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkParquetReader.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkParquetReader.java @@ -31,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; @Tag("functional") +@Tag("core") public class TestSparkParquetReader extends TestBootstrapReadBase { @Test diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java index b2c9d24b1b961..3151fe3918a4b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java @@ -42,6 +42,7 @@ import org.apache.spark.sql.types.StructType; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -56,6 +57,7 @@ /** * Unit tests {@link HoodieInternalRowParquetWriter}. */ +@Tag("core") public class TestHoodieInternalRowParquetWriter extends HoodieSparkClientTestHarness { @BeforeEach diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/CoreFlow.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/CoreFlow.scala new file mode 100644 index 0000000000000..2f0280d07ff76 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/CoreFlow.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.functional + +import org.scalatest.Tag + +/** + * ScalaTest tag for the curated "core flow" subset that runs on every Spark + * version in CI (via the {@code core-tests} Maven profile), as opposed to the + * full suite that runs only on the latest Spark major versions. + * + * The tag name matches the {@link SparkSQLCoreFlow} Java {@code @TagAnnotation} + * so that class-level {@code @SparkSQLCoreFlow} and per-test + * {@code taggedAs(CoreFlow)} are selected by the same scalatest + * {@code tagsToInclude}/{@code tagsToExclude} value. + */ +object CoreFlow extends Tag("org.apache.hudi.functional.SparkSQLCoreFlow") diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 599a105e6d001..c5b07d060b38b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -55,7 +55,7 @@ import org.apache.spark.sql.hudi.HoodieSparkSessionExtension import org.apache.spark.sql.types.{ArrayType, DataTypes, DateType, IntegerType, LongType, MapType, StringType, StructField, StructType, TimestampType} import org.joda.time.DateTime import org.joda.time.format.DateTimeFormat -import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test} +import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Tag, Test} import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertTrue, fail} import org.junit.jupiter.api.function.Executable import org.junit.jupiter.params.ParameterizedTest @@ -800,6 +800,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup assertEquals(schema, actualSchema) } + @Tag("core") @ParameterizedTest @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) def testCopyOnWriteDeletes(recordType: HoodieRecordType): Unit = { @@ -831,6 +832,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup assertEquals(snapshotDF2.count(), 80) } + @Tag("core") @Test def testCopyOnWriteUpserts(): Unit = { val recordType = HoodieRecordType.AVRO @@ -1217,6 +1219,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup assertEquals(hoodieIncViewDF2.count(), insert2NewKeyCnt) } + @Tag("core") @ParameterizedTest @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) def testComplexDataTypeWriteAndReadConsistency(recordType: HoodieRecordType): Unit = { @@ -1549,6 +1552,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup assertEquals(false, Metrics.isInitialized(basePath)) } + @Tag("core") @ParameterizedTest @CsvSource(Array( "true,false,AVRO", "true,true,AVRO", "false,true,AVRO", "false,false,AVRO" @@ -1967,6 +1971,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup } /** Test case to verify MAKE_NEW_COLUMNS_NULLABLE config parameter. */ + @Tag("core") @Test def testSchemaEvolutionWithNewColumn(): Unit = { val df1 = spark.sql("select '1' as event_id, '2' as ts, '3' as version, 'foo' as event_date") @@ -2263,6 +2268,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup * - v6: Uses requestedTime for commits, open_close incremental ranges * - v9: Uses completionTime for commits, close_close incremental ranges */ + @Tag("core") @ParameterizedTest @CsvSource(Array("6", "9")) def testIncrementalAndTimeTravelWithEventTimeOrdering(tableVersion: String): Unit = { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index 81d049d432439..b88a33f7d2d35 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -47,7 +47,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.hudi.HoodieSparkSessionExtension import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType} -import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, Test} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{Arguments, CsvSource, EnumSource, MethodSource, ValueSource} @@ -104,6 +104,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin JFunction.toJavaConsumer((receiver: SparkSessionExtensions) => new HoodieSparkSessionExtension().apply(receiver))) ) + @Tag("core") @ParameterizedTest @CsvSource(Array( // Inferred as COMMIT_TIME_ORDERING @@ -494,6 +495,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin HoodieMemoryConfig.DEFAULT_MR_COMPACTION_MEMORY_FRACTION) } + @Tag("core") @ParameterizedTest @CsvSource(value = Array("AVRO,6", "AVRO,8", "SPARK,6", "SPARK,8")) def testPayloadDelete(recordType: HoodieRecordType, tableVersion: Int) { @@ -573,6 +575,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin assertEquals(0, hudiSnapshotDF3.count()) // 100 records were deleted, 0 record to load } + @Tag("core") @ParameterizedTest @CsvSource(value = Array("AVRO,6", "AVRO,8", "SPARK,6", "SPARK,8")) def testPrunedFiltered(recordType: HoodieRecordType, tableVersion: Int) { @@ -683,6 +686,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin assertEquals(0, hudiSnapshotDF3.filter("rider = 'rider-003'").count()) } + @Tag("core") @ParameterizedTest @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) def testVectorizedReader(recordType: HoodieRecordType) { @@ -992,6 +996,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin assertEquals(partitionCounts("2021/03/03"), count7) } + @Tag("core") @ParameterizedTest @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) def testReadLogOnlyMergeOnReadTable(recordType: HoodieRecordType): Unit = { @@ -1311,6 +1316,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin * * The read-optimized query should read `fg1_dc1.parquet` only in this case. */ + @Tag("core") @Test def testReadOptimizedQueryAfterInflightCompactionAndCompletedDeltaCommit(): Unit = { val (tableName, tablePath) = ("hoodie_mor_ro_read_test_table", s"${basePath}_mor_test_table") @@ -1406,6 +1412,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin roDf.where(col(recordKeyField) === 0).select(dataField).collect()(0).getLong(0)) } + @Tag("core") @ParameterizedTest @ValueSource(ints = Array(6, 8)) def testSnapshotQueryAfterInflightDeltaCommit(tableVersion: Int): Unit = { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala index 2f37470ce208d..4b560897563f4 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala @@ -46,23 +46,19 @@ import scala.collection.JavaConverters._ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase { val colsToCompare = "timestamp, _row_key, partition_path, rider, driver, begin_lat, begin_lon, end_lat, end_lon, fare.amount, fare.currency, _hoodie_is_deleted" - //params for core flow tests + // Params for core flow tests. This suite is the cross-Spark-version "core flow" anchor + // (runs on every Spark version via the core-tests profile), so the matrix is trimmed to a + // representative spread of table type, metadata on/off, key generator, and index type. + // The dropped keygen/index permutations are Spark-version-independent and remain covered + // by the full suite that runs on the latest Spark versions. val params: List[String] = List( "COPY_ON_WRITE|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM", - "COPY_ON_WRITE|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM", - "COPY_ON_WRITE|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE", "COPY_ON_WRITE|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE", "COPY_ON_WRITE|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM", - "COPY_ON_WRITE|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM", - "COPY_ON_WRITE|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE", "COPY_ON_WRITE|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE", "MERGE_ON_READ|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM", - "MERGE_ON_READ|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM", - "MERGE_ON_READ|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE", "MERGE_ON_READ|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE", "MERGE_ON_READ|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM", - "MERGE_ON_READ|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM", - "MERGE_ON_READ|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE", "MERGE_ON_READ|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE" ) @@ -409,39 +405,13 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase { HoodieRecord.FILENAME_METADATA_FIELD) } - //params for immutable user flow + // Params for immutable user flow. Trimmed to cover both table types and both immutable + // write ops (insert, bulk_insert) with a representative metadata/keygen/index spread; see + // the note on `params` above for why the full permutation matrix is not needed here. val paramsForImmutable: List[String] = List( "COPY_ON_WRITE|insert|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM", - "COPY_ON_WRITE|insert|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM", - "COPY_ON_WRITE|insert|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE", - "COPY_ON_WRITE|insert|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE", - "COPY_ON_WRITE|insert|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM", - "COPY_ON_WRITE|insert|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM", - "COPY_ON_WRITE|insert|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE", - "COPY_ON_WRITE|insert|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE", - "MERGE_ON_READ|insert|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM", - "MERGE_ON_READ|insert|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM", - "MERGE_ON_READ|insert|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE", - "MERGE_ON_READ|insert|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE", - "MERGE_ON_READ|insert|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM", - "MERGE_ON_READ|insert|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM", - "MERGE_ON_READ|insert|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE", - "MERGE_ON_READ|insert|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE", - "COPY_ON_WRITE|bulk_insert|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM", - "COPY_ON_WRITE|bulk_insert|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM", - "COPY_ON_WRITE|bulk_insert|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE", - "COPY_ON_WRITE|bulk_insert|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE", - "COPY_ON_WRITE|bulk_insert|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM", - "COPY_ON_WRITE|bulk_insert|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM", - "COPY_ON_WRITE|bulk_insert|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE", "COPY_ON_WRITE|bulk_insert|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE", - "MERGE_ON_READ|bulk_insert|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM", - "MERGE_ON_READ|bulk_insert|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM", - "MERGE_ON_READ|bulk_insert|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE", - "MERGE_ON_READ|bulk_insert|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE", - "MERGE_ON_READ|bulk_insert|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM", - "MERGE_ON_READ|bulk_insert|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM", - "MERGE_ON_READ|bulk_insert|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE", + "MERGE_ON_READ|insert|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM", "MERGE_ON_READ|bulk_insert|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE" ) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala index 88ec4b7fb4c36..bf5e06dd1352a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala @@ -24,6 +24,7 @@ import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.util.PartitionPathEncodeUtils.escapePathName import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.functional.CoreFlow import org.apache.hudi.hadoop.fs.HadoopFSUtils import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat import org.apache.hudi.keygen.constant.KeyGeneratorType @@ -47,7 +48,7 @@ import scala.collection.JavaConverters._ class TestCreateTable extends HoodieSparkSqlTestBase { - test("Test Create Managed Hoodie Table") { + test("Test Create Managed Hoodie Table", CoreFlow) { val databaseName = "hudi_database" spark.sql(s"create database if not exists $databaseName") spark.sql(s"use $databaseName") @@ -101,7 +102,7 @@ class TestCreateTable extends HoodieSparkSqlTestBase { spark.sql("use default") } - test("Test Create Hoodie Table With Options") { + test("Test Create Hoodie Table With Options", CoreFlow) { val tableName = generateTableName spark.sql( s""" @@ -154,7 +155,7 @@ class TestCreateTable extends HoodieSparkSqlTestBase { assertFalse(tableConfig.contains(OPERATION.key())) } - test("Test Create External Hoodie Table") { + test("Test Create External Hoodie Table", CoreFlow) { withTempDir { tmp => // Test create cow table. val tableName = generateTableName @@ -236,7 +237,7 @@ class TestCreateTable extends HoodieSparkSqlTestBase { } } - test("Test Create External Hoodie Table with data") { + test("Test Create External Hoodie Table with data", CoreFlow) { withTempDir { tmp => val options = Map(DataSourceWriteOptions.TABLE_TYPE.key -> HoodieTableType.MERGE_ON_READ.name(), HoodieTableConfig.ORDERING_FIELDS.key -> "ordering", diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala index ebaa13770ed44..155ab028171c3 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala @@ -23,6 +23,7 @@ import org.apache.hudi.HoodieSparkUtils import org.apache.hudi.common.config.HoodieStorageConfig import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType import org.apache.hudi.exception.{HoodieDuplicateKeyException, HoodieException} +import org.apache.hudi.functional.CoreFlow import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient import org.apache.spark.sql.hudi.HoodieSqlCommonUtils @@ -218,7 +219,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } } - test("Test Insert Into with values") { + test("Test Insert Into with values", CoreFlow) { withRecordType()(withTempDir { tmp => val tableName = generateTableName // Create a partitioned table @@ -305,7 +306,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } } - test("Test Insert Into with dynamic partition") { + test("Test Insert Into with dynamic partition", CoreFlow) { Seq("cow", "mor").foreach { tableType => withTempDir { tmp => val tableName = generateTableName @@ -413,7 +414,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } } - test("Test Insert Into Non Partitioned Table") { + test("Test Insert Into Non Partitioned Table", CoreFlow) { withRecordType(Seq(HoodieRecordType.AVRO, HoodieRecordType.SPARK))(withTempDir { tmp => val tableName = generateTableName withSQLConf("hoodie.datasource.insert.dup.policy" -> "fail") { @@ -632,7 +633,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { }) } - test("Test Insert Overwrite") { + test("Test Insert Overwrite", CoreFlow) { withTempDir { tmp => Seq("cow", "mor").foreach { tableType => withTable(generateTableName) { tableName => diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestDeleteTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestDeleteTable.scala index 1bbf05557201f..99008ea9752fc 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestDeleteTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestDeleteTable.scala @@ -22,13 +22,14 @@ package org.apache.spark.sql.hudi.dml.others import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.functional.CoreFlow import org.apache.spark.sql.SaveMode import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase class TestDeleteTable extends HoodieSparkSqlTestBase { - test("Test Delete Table") { + test("Test Delete Table", CoreFlow) { withTempDir { tmp => Seq(true, false).foreach { sparkSqlOptimizedWrites => Seq("cow", "mor").foreach { tableType => @@ -187,7 +188,7 @@ class TestDeleteTable extends HoodieSparkSqlTestBase { } } - test("Test Delete Table On Non-PK Condition") { + test("Test Delete Table On Non-PK Condition", CoreFlow) { withTempDir { tmp => Seq("cow", "mor").foreach { tableType => @@ -277,7 +278,7 @@ class TestDeleteTable extends HoodieSparkSqlTestBase { } } - test("Test Delete Table with op upsert") { + test("Test Delete Table with op upsert", CoreFlow) { withTempDir { tmp => Seq("cow", "mor").foreach { tableType => val tableName = generateTableName diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala index 6f480eae6134f..a600863eb0598 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala @@ -26,6 +26,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline import org.apache.hudi.common.testutils.HoodieTestUtils import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig} import org.apache.hudi.config.HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT +import org.apache.hudi.functional.CoreFlow import org.apache.hudi.storage.StoragePath import org.apache.hudi.testutils.DataSourceTestUtils import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient @@ -40,7 +41,7 @@ import org.slf4j.LoggerFactory class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSupport { private val log = LoggerFactory.getLogger(getClass) - test("Test MergeInto Basic") { + test("Test MergeInto Basic", CoreFlow) { Seq(true, false).foreach { sparkSqlOptimizedWrites => withRecordType()(withTempDir { tmp => withSparkSqlSessionConfig("hoodie.payload.combined.schema.validate" -> "false", @@ -474,7 +475,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo } } - test("Test MergeInto for MOR table") { + test("Test MergeInto for MOR table", CoreFlow) { withTempDir { tmp => withSQLConf("hoodie.payload.combined.schema.validate" -> "true", MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key() -> "0") { val tableName = generateTableName @@ -614,7 +615,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo } } - test("Test MergeInto with insert only") { + test("Test MergeInto with insert only", CoreFlow) { withTempDir { tmp => withSQLConf("hoodie.payload.combined.schema.validate" -> "true", MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key() -> "0") { // Create a partitioned mor table @@ -915,7 +916,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo } } - test("Test MergeInto with combination of delete update insert") { + test("Test MergeInto with combination of delete update insert", CoreFlow) { withRecordType()(withTempDir { tmp => withSQLConf("hoodie.payload.combined.schema.validate" -> "true") { val sourceTable = generateTableName diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestUpdateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestUpdateTable.scala index 9d67da0db5cee..cd487fb4db4f9 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestUpdateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestUpdateTable.scala @@ -25,6 +25,7 @@ import org.apache.hudi.HoodieSparkUtils.gteqSpark3_4 import org.apache.hudi.common.model.HoodieTableType import org.apache.hudi.common.table.timeline.HoodieInstant import org.apache.hudi.common.util.{Option => HOption} +import org.apache.hudi.functional.CoreFlow import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient import org.apache.spark.sql.{AnalysisException, Row} @@ -33,7 +34,7 @@ import org.junit.jupiter.api.Assertions.assertEquals class TestUpdateTable extends HoodieSparkSqlTestBase { - test("Test Update Table") { + test("Test Update Table", CoreFlow) { withRecordType()(withTempDir { tmp => Seq(true, false).foreach { sparkSqlOptimizedWrites => Seq("cow", "mor").foreach { tableType => @@ -144,7 +145,7 @@ class TestUpdateTable extends HoodieSparkSqlTestBase { }) } - test("Test Update Table On Non-PK Condition") { + test("Test Update Table On Non-PK Condition", CoreFlow) { withRecordType()(withTempDir { tmp => Seq("cow", "mor").foreach {tableType => /** non-partitioned table */ @@ -233,7 +234,7 @@ class TestUpdateTable extends HoodieSparkSqlTestBase { }) } - test("Test ignoring case for Update Table") { + test("Test ignoring case for Update Table", CoreFlow) { withTempDir { tmp => Seq("cow", "mor").foreach { tableType => val tableName = generateTableName diff --git a/pom.xml b/pom.xml index f142ebf009b85..38a2a2965bd93 100644 --- a/pom.xml +++ b/pom.xml @@ -210,6 +210,10 @@ ${skipTests} ${skipTests} ${skipTests} + + org.apache.hudi.functional.SparkSQLCoreFlow UTF-8 ${project.basedir} provided @@ -2088,7 +2092,7 @@ ${skipUTs} 120 - functional,functional-b,functional-c + functional,functional-b,functional-c,core **/IT*.java **/testsuite/**/Test*.java @@ -2141,6 +2145,7 @@ 1 true functional + core @@ -2189,6 +2194,7 @@ 1 true functional-b + core @@ -2237,6 +2243,7 @@ 1 true functional-c + core @@ -2267,6 +2274,70 @@ + + + core-tests + + false + true + true + + + + + + org.apache.maven.plugins + maven-surefire-plugin + ${maven-surefire-plugin.version} + + ${skipUTs} + 120 + core + + **/IT*.java + **/testsuite/**/Test*.java + + + + + org.scalatest + scalatest-maven-plugin + + org.apache.hudi.functional.SparkSQLCoreFlow + + + + org.jacoco + jacoco-maven-plugin + + + + prepare-agent + + + ${project.build.directory}/jacoco-agent/${jacoco.agent.dest.filename} + + + + post-unit-tests + test + + report + + + ${project.build.directory}/jacoco-agent/${jacoco.agent.dest.filename} + ${project.reporting.outputDirectory}/jacoco-ut + + + + + + + hudi-platform-service