Skip to content

#748 Add configuration option to control wrapping of sink exceptions.#749

Merged
yruslan merged 2 commits into
mainfrom
feature/748-remove-sink-exception-wrapper
May 19, 2026
Merged

#748 Add configuration option to control wrapping of sink exceptions.#749
yruslan merged 2 commits into
mainfrom
feature/748-remove-sink-exception-wrapper

Conversation

@yruslan
Copy link
Copy Markdown
Collaborator

@yruslan yruslan commented May 18, 2026

Summary by CodeRabbit

  • New Features

    • Added a configuration option to control whether sink errors are wrapped in a generic "Unable to write to the sink" message.
  • Bug Fixes

    • Restored an option to preserve legacy behavior where original sink exceptions are reported unchanged.
  • Tests

    • Updated tests to cover both wrapped and unwrapped sink-exception behaviors.

Review Change Stack

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 18, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 415b19f5-7ea7-4dcf-9bdc-e1b518d63ac1

📥 Commits

Reviewing files that changed from the base of the PR and between 6eb22ee and b8a4326.

📒 Files selected for processing (1)
  • pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SinkJobSuite.scala

Walkthrough

This PR adds a workflow-level config flag pramen.internal.wrap.sink.exceptions, threads the top-level Config through OperationSplitter → TransferJob → SinkJob, and makes SinkJob conditionally wrap sink write exceptions based on that flag; tests were updated to assert both wrapped and unwrapped behaviors.

Changes

Sink exception wrapping control

Layer / File(s) Summary
Configuration and key constant
pramen/core/src/main/resources/reference.conf, pramen/core/src/main/scala/za/co/absa/pramen/core/config/Keys.scala
Adds pramen.internal.wrap.sink.exceptions (default: false) and the WRAP_SINK_EXCEPTION key constant.
SinkJob exception wrapping implementation
pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala
Extends SinkJob to accept workflowConf, adds Keys import, replaces fixed wrapper throw with getSinkException that conditionally wraps or rethrows based on the config key.
Configuration threading through job hierarchy
pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationSplitter.scala, pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransferJob.scala
Updates OperationSplitter to pass conf into TransferJob, and TransferJob to pass workflowConf into SinkJob so SinkJob can read the flag.
Test fixture and expectation updates
pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SinkJobSuite.scala, pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/TransferJobSuite.scala
Updates test helpers to accept/pass Config into job constructors and adds/adjusts tests asserting both wrapped ("Unable to write to the sink.") and unwrapped ("Dummy Exception") behaviors.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related PRs

  • AbsaOSS/pramen#665: Concurrent modifications to the same job constructor signatures and wiring in OperationSplitter, TransferJob, and SinkJob.

Suggested reviewers

  • jozefbakus

Poem

🐇 I nudge a flag, then hop away,
Config threads guide errors' play,
Sinks may hide or show the cause,
Tests now check both sides because—
A rabbit cheers the clearer day!

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main change: adding a configuration option to control sink exception wrapping behavior across multiple modified files.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feature/748-remove-sink-exception-wrapper

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SinkJobSuite.scala (1)

229-237: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Fix the assertion contract for the unwrapped exception path.

On Line 229 you switched to intercept[RuntimeException], but on Lines 236-237 the test still asserts wrapped-exception fields. With ConfigFactory.empty() on Line 265, wrapping is off, so this should assert the original exception message/cause shape.

Suggested fix
-      assert(ex.getMessage == "Unable to write to the sink.")
-      assert(ex.getCause.getMessage == "Dummy Exception")
+      assert(ex.getMessage == "Dummy Exception")
+      assert(ex.getCause == null)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SinkJobSuite.scala`
around lines 229 - 237, The test uses intercept[RuntimeException] around
job.save(...) but still asserts wrapped-exception messages (assert(ex.getMessage
== "Unable to write to the sink.") and assert(ex.getCause.getMessage == "Dummy
Exception")), while the config (ConfigFactory.empty()) disables wrapping; update
the assertions to reflect the unwrapped exception shape by asserting the
original exception message (e.g., "Dummy Exception") and any direct cause or
null as appropriate—locate the intercept block around job.save(...) and replace
the wrapped-exception assertions with checks against the unwrapped exception
message and cause.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SinkJobSuite.scala`:
- Around line 229-237: The test uses intercept[RuntimeException] around
job.save(...) but still asserts wrapped-exception messages (assert(ex.getMessage
== "Unable to write to the sink.") and assert(ex.getCause.getMessage == "Dummy
Exception")), while the config (ConfigFactory.empty()) disables wrapping; update
the assertions to reflect the unwrapped exception shape by asserting the
original exception message (e.g., "Dummy Exception") and any direct cause or
null as appropriate—locate the intercept block around job.save(...) and replace
the wrapped-exception assertions with checks against the unwrapped exception
message and cause.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 88a1b425-7a4e-4859-810a-4c99e9844de6

📥 Commits

Reviewing files that changed from the base of the PR and between a4cc52d and 6eb22ee.

📒 Files selected for processing (7)
  • pramen/core/src/main/resources/reference.conf
  • pramen/core/src/main/scala/za/co/absa/pramen/core/config/Keys.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationSplitter.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransferJob.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SinkJobSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/TransferJobSuite.scala

@github-actions
Copy link
Copy Markdown

Unit Test Coverage

Overall Project 77.25% 🍏
Files changed 98.51% 🍏

Module Coverage
pramen:core Jacoco Report 78.29% 🍏
Files
Module File Coverage
pramen:core Jacoco Report OperationSplitter.scala 93.86% 🍏
SinkJob.scala 85.88% -0.13% 🍏
TransferJob.scala 76.97% 🍏

@yruslan yruslan merged commit 48b1e83 into main May 19, 2026
7 checks passed
@yruslan yruslan deleted the feature/748-remove-sink-exception-wrapper branch May 19, 2026 07:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant