Skip to content

fix(pulsar): adjust event finalization/acking#71

Draft
mdeltito wants to merge 1 commit intomasterfrom
feature/LOG-23596
Draft

fix(pulsar): adjust event finalization/acking#71
mdeltito wants to merge 1 commit intomasterfrom
feature/LOG-23596

Conversation

@mdeltito
Copy link
Copy Markdown
Member

@mdeltito mdeltito commented Apr 9, 2026

Three changes here:

  1. Always create the OrderedFinalizer and ack_stream

Previously this was only done when acks were enabled (either globally, through the sink configuration, or the legacy source option).

  1. Always route events through the finalizer, attaching BatchNotifier only when acknowledgements are configured

Instead of preemptively acking messages to the broker after parsing when sink acks are not enabled, we instead use the Drop of the finalizer to signal when the event is processed and should be acked. This forces the broker acknowledgement through the ack_stream in the same way regardless of the sink configuration, which provides natural throttling of the consumer. This also matches the kafka behavior where they appear to have learned the same lesson.

  1. Add biased; to the tokio::select! loop to prioritize ack processing over message consumption.

Again, this matches kafka behavior: src/sources/kafka.rs#L640-L645

Ref: LOG-23596

Three changes here:

1. Always create the OrderedFinalizer and `ack_stream`

Previously this was only done when acks were enabled (either globally,
through the sink configuration, or the legacy source option).

2. Always route events through the finalizer, attaching BatchNotifier
only when acknowledgements are configured

Instead of preemptively acking messages to the broker after parsing
when sink acks are **not** enabled, we instead use the `Drop` of the
finalizer to signal when the event is processed and should be acked.
This forces the broker acknowledgement through the `ack_stream` in
the same way regardless of the sink configuration, which provides
natural throttling of the consumer. This also matches the kafka
behavior where they appear to have learned the same lesson.

3. Add biased; to the `tokio::select!` loop to prioritize ack
processing over message consumption.

Again, this matches kafka behavior: `src/sources/kafka.rs#L640-L645`

Ref: LOG-23596
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant