diff --git a/pramen/core/src/main/resources/reference.conf b/pramen/core/src/main/resources/reference.conf index b775429a..82a34d5e 100644 --- a/pramen/core/src/main/resources/reference.conf +++ b/pramen/core/src/main/resources/reference.conf @@ -234,6 +234,9 @@ pramen { connection.retries.default = 3 connection.backoff.min.ms = 10000 connection.backoff.max.ms = 60000 + + # If true, restores the legacy behavior when sink exceptions were wrapped in "Unable to write to sink" exception. + wrap.sink.exceptions = false } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/config/Keys.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/config/Keys.scala index ebfe453a..681dee03 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/config/Keys.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/config/Keys.scala @@ -53,6 +53,8 @@ object Keys { val LOG_EXECUTOR_NODES = "pramen.log.executor.nodes" val LOG_EFFECTIVE_CONFIG = "pramen.log.effective.config" + val WRAP_SINK_EXCEPTION = "pramen.internal.wrap.sink.exceptions" + final val KEYS_TO_REDACT: Set[String] = Set("password", "secret", "pwd", "access.key", "api.key", "api_key", "session.token", "access_key", "session_token", "auth.user.info") final val CONFIG_KEYS_TO_REDACT = Set( diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationSplitter.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationSplitter.scala index 987872c1..f18401ce 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationSplitter.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationSplitter.scala @@ -103,9 +103,9 @@ class OperationSplitter(conf: Config, if (operationDef.schedule == Schedule.Incremental) { val latestOffsets = bookkeeper.getOffsetManager.getMaxInfoDateAndOffset(outputTable.name, None) - new TransferJob(operationDef, metastore, bookkeeper, notificationTargets, latestOffsets, batchId, sourceName, source, transferTable, outputTable, sinkName, sink, specialCharacters, temporaryDirectory, disableCountQuery) + new TransferJob(operationDef, metastore, bookkeeper, notificationTargets, latestOffsets, batchId, sourceName, source, transferTable, outputTable, sinkName, sink, specialCharacters, temporaryDirectory, disableCountQuery, conf) } else { - new TransferJob(operationDef, metastore, bookkeeper, notificationTargets, None, batchId, sourceName, source, transferTable, outputTable, sinkName, sink, specialCharacters, temporaryDirectory, disableCountQuery) + new TransferJob(operationDef, metastore, bookkeeper, notificationTargets, None, batchId, sourceName, source, transferTable, outputTable, sinkName, sink, specialCharacters, temporaryDirectory, disableCountQuery, conf) } }) } @@ -171,7 +171,7 @@ class OperationSplitter(conf: Config, bookkeeper.getOffsetManager.getMaxInfoDateAndOffset(outputTableName, None).map(_.maximumInfoDate) } else None - new SinkJob(operationDef, metastore, bookkeeper, notificationTargets, latestInfoDateOpt, outputTable, sinkName, sink, sinkTable) + new SinkJob(operationDef, metastore, bookkeeper, notificationTargets, latestInfoDateOpt, outputTable, sinkName, sink, sinkTable, conf) }) } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala index dc1f16f6..f3999ad5 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala @@ -22,6 +22,7 @@ import za.co.absa.pramen.api.jobdef.SinkTable import za.co.absa.pramen.api.status.{DependencyWarning, JobType, TaskRunReason} import za.co.absa.pramen.api.{DataFormat, MetastoreReader, Reason, Sink} import za.co.absa.pramen.core.bookkeeper.Bookkeeper +import za.co.absa.pramen.core.config.Keys import za.co.absa.pramen.core.exceptions.LazyJobErrorWrapper import za.co.absa.pramen.core.metastore.model.{MetaTable, ReaderMode} import za.co.absa.pramen.core.metastore.{MetaTableStats, Metastore, MetastoreReaderIncremental} @@ -42,7 +43,8 @@ class SinkJob(operationDef: OperationDef, outputTable: MetaTable, sinkName: String, sink: Sink, - sinkTable: SinkTable) + sinkTable: SinkTable, + workflowConf: Config) (implicit spark: SparkSession) extends JobBase(operationDef, metastore, bookkeeper, notificationTargets, outputTable) { import JobBase._ @@ -182,7 +184,8 @@ class SinkJob(operationDef: OperationDef, val stats = MetaTableStats(Option(sinkResult.recordsSent)) SaveResult(stats, sinkResult.filesSent, sinkResult.hiveTables, sinkResult.warnings ++ tooLongWarnings) } catch { - case NonFatal(ex) => throw new IllegalStateException("Unable to write to the sink.", ex) + case NonFatal(ex) => + throw getSinkException(ex) } finally { Try { sink.close() @@ -190,6 +193,17 @@ class SinkJob(operationDef: OperationDef, } } + /** The wrapping of the exception here is redundant since users already know that the sink has failed. But + * the behavior made configurable for backwards compatibility. */ + private def getSinkException(cause: Throwable): Throwable = { + val wrapException = ConfigUtils.getOptionBoolean(workflowConf, Keys.WRAP_SINK_EXCEPTION).getOrElse(false) + if (wrapException) { + new IllegalStateException("Unable to write to the sink.", cause) + } else { + cause + } + } + private def getDataDf(infoDate: LocalDate, metastoreReader: MetastoreReader): DataFrame = { try { if (isIncremental) { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransferJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransferJob.scala index a99049be..c01727dd 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransferJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransferJob.scala @@ -43,7 +43,8 @@ class TransferJob(operationDef: OperationDef, sink: Sink, specialCharacters: String, tempDirectory: Option[String], - disableCountQuery: Boolean) + disableCountQuery: Boolean, + workflowConf: Config) (implicit spark: SparkSession) extends JobBase(operationDef, metastore, bookkeeper, notificationTargets, bookkeepingMetaTable) { @@ -57,7 +58,7 @@ class TransferJob(operationDef: OperationDef, } } - val sinkJob: SinkJob = new SinkJob(operationDef, metastore, bookkeeper, notificationTargets, latestInfoDate, bookkeepingMetaTable, sinkName, sink, TransferTableParser.getSinkTable(table)) + val sinkJob: SinkJob = new SinkJob(operationDef, metastore, bookkeeper, notificationTargets, latestInfoDate, bookkeepingMetaTable, sinkName, sink, TransferTableParser.getSinkTable(table), workflowConf) override val scheduleStrategy: ScheduleStrategy = ingestionJob.scheduleStrategy diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SinkJobSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SinkJobSuite.scala index fe1bc91c..a929a941 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SinkJobSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SinkJobSuite.scala @@ -16,7 +16,7 @@ package za.co.absa.pramen.core.pipeline -import com.typesafe.config.{ConfigFactory, ConfigValueFactory} +import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} import org.apache.spark.sql.functions.col import org.apache.spark.sql.{AnalysisException, DataFrame} import org.scalatest.wordspec.AnyWordSpec @@ -221,12 +221,16 @@ class SinkJobSuite extends AnyWordSpec with SparkTestBase with TextComparisonFix assert(ex.getCause.getMessage == "Dummy Exception") } - "throw an exception when write() throws" in { + "throw an exception when write() throws with wrapper" in { val sink = new SinkSpy(writeException = new RuntimeException("Dummy Exception")) + val conf = ConfigFactory.parseString( + """ + |pramen.internal.wrap.sink.exceptions = true + |""".stripMargin) - val (job, _) = getUseCase(tableDf = exampleDf, sink = sink) + val (job, _) = getUseCase(tableDf = exampleDf, sink = sink, conf = conf) - val ex = intercept[IllegalStateException] { + val ex = intercept[RuntimeException] { job.save(exampleDf, infoDate, runReason, conf, Instant.now(), Some(10L)) } @@ -237,6 +241,21 @@ class SinkJobSuite extends AnyWordSpec with SparkTestBase with TextComparisonFix assert(ex.getCause.getMessage == "Dummy Exception") } + "throw an exception when write() throws without wrapper" in { + val sink = new SinkSpy(writeException = new RuntimeException("Dummy Exception")) + + val (job, _) = getUseCase(tableDf = exampleDf, sink = sink) + + val ex = intercept[RuntimeException] { + job.save(exampleDf, infoDate, runReason, conf, Instant.now(), Some(10L)) + } + + assert(sink.connectCalled == 1) + assert(sink.writeCalled == 1) + assert(sink.closeCalled == 1) + assert(ex.getMessage == "Dummy Exception") + } + "ignore if close() throws" in { val sink = new SinkSpy(closeException = new RuntimeException("Dummy Exception")) @@ -253,7 +272,8 @@ class SinkJobSuite extends AnyWordSpec with SparkTestBase with TextComparisonFix def getUseCase(sinkTable: SinkTable = SinkTableFactory.getDummySinkTable(), tableDf: DataFrame = null, tableException: Throwable = null, - sink: Sink = SinkSpy(conf, "", spark)): (SinkJob, SyncBookkeeperMock) = { + sink: Sink = SinkSpy(conf, "", spark), + conf: Config = ConfigFactory.empty()): (SinkJob, SyncBookkeeperMock) = { val operation = OperationDefFactory.getDummyOperationDef(extraOptions = Map[String, String]("value" -> "7")) val bk = new SyncBookkeeperMock @@ -262,7 +282,7 @@ class SinkJobSuite extends AnyWordSpec with SparkTestBase with TextComparisonFix val outputTable = MetaTableFactory.getDummyMetaTable(name = "table1->mysink") - (new SinkJob(operation, metastore, bk, Nil, None, outputTable, "sink_name", sink, sinkTable), bk) + (new SinkJob(operation, metastore, bk, Nil, None, outputTable, "sink_name", sink, sinkTable, conf), bk) } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/TransferJobSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/TransferJobSuite.scala index 98dcb459..fec52dce 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/TransferJobSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/TransferJobSuite.scala @@ -273,7 +273,7 @@ class TransferJobSuite extends AnyWordSpec with SparkTestBase with TextCompariso val outputTable = TransferTableParser.getMetaTable(transferTable) - (new TransferJob(operation, metastore, bk, Nil, None, 123L, "testSource", source, transferTable, outputTable, "sink_name", sink, " ", tempDirectory, disableCountQuery), bk) + (new TransferJob(operation, metastore, bk, Nil, None, 123L, "testSource", source, transferTable, outputTable, "sink_name", sink, " ", tempDirectory, disableCountQuery, sourceConfig), bk) } }