Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 30 additions & 38 deletions src/sources/pulsar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -623,14 +623,18 @@ async fn pulsar_source(
broker_redelivery_enabled: bool,
) -> Result<(), ()> {
let (finalizer, mut ack_stream) =
OrderedFinalizer::<FinalizerEntry>::maybe_new(acknowledgements, Some(shutdown.clone()));
OrderedFinalizer::<FinalizerEntry>::new(Some(shutdown.clone()));

let bytes_received = register!(BytesReceived::from(Protocol::TCP));
let events_received = register!(EventsReceived);
let pulsar_error_events = register!(PulsarErrorEvent);

loop {
let msg_result = tokio::select! {
// Prioritize ack processing over consuming new messages to prevent
// unbounded memory growth when acks are processed slower than
// incoming messages.
biased;
_ = &mut shutdown => break,
entry = ack_stream.next() => {
if let Some((status, entry)) = entry {
Expand All @@ -651,10 +655,9 @@ async fn pulsar_source(
&decoder,
&finalizer,
&mut out,
&mut consumer,
acknowledgements,
log_namespace,
&events_received,
&pulsar_error_events,
)
.await;
}
Expand All @@ -680,12 +683,11 @@ async fn pulsar_source(
async fn parse_message(
msg: Message<String>,
decoder: &Decoder,
finalizer: &Option<OrderedFinalizer<FinalizerEntry>>,
finalizer: &OrderedFinalizer<FinalizerEntry>,
out: &mut SourceSender,
consumer: &mut Consumer<String, TokioExecutor>,
acknowledgements: bool,
log_namespace: LogNamespace,
events_received: &Registered<EventsReceived>,
pulsar_error_events: &Registered<PulsarErrorEvent>,
) {
let publish_time = i64::try_from(msg.payload.metadata.publish_time)
.ok()
Expand Down Expand Up @@ -796,54 +798,44 @@ async fn parse_message(
.boxed();

finalize_event_stream(
consumer,
finalizer,
out,
stream,
msg.topic.clone(),
msg.message_id().clone(),
pulsar_error_events,
acknowledgements,
)
.await;
}

/// Send the event stream created by the framed read to the `out` stream.
/// All events flow through the finalizer for throttling. The batch notifier is
/// only attached to events when sink acknowledgements are enabled; otherwise it
/// is dropped after send, resolving the receiver immediately.
async fn finalize_event_stream(
consumer: &mut Consumer<String, TokioExecutor>,
finalizer: &Option<OrderedFinalizer<FinalizerEntry>>,
finalizer: &OrderedFinalizer<FinalizerEntry>,
out: &mut SourceSender,
mut stream: std::pin::Pin<Box<dyn futures_util::Stream<Item = Event> + Send + '_>>,
stream: std::pin::Pin<Box<dyn futures_util::Stream<Item = Event> + Send + '_>>,
topic: String,
message_id: MessageIdData,
pulsar_error_events: &Registered<PulsarErrorEvent>,
acknowledgements: bool,
) {
match finalizer {
Some(finalizer) => {
let (batch, receiver) = BatchNotifier::new_with_receiver();
let mut stream = stream.map(|event| event.with_batch_notifier(&batch));

match out.send_event_stream(&mut stream).await {
Err(_error) => {
emit!(StreamClosedError { count: 1 });
}
Ok(_) => {
finalizer.add(FinalizerEntry { topic, message_id }, receiver);
}
}
let (batch, receiver) = BatchNotifier::new_with_receiver();
let mut stream = stream.map(|event| {
if acknowledgements {
event.with_batch_notifier(&batch)
} else {
event
}
});

match out.send_event_stream(&mut stream).await {
Err(_error) => {
emit!(StreamClosedError { count: 1 });
}
Ok(_) => {
finalizer.add(FinalizerEntry { topic, message_id }, receiver);
}
None => match out.send_event_stream(&mut stream).await {
Err(_error) => {
emit!(StreamClosedError { count: 1 });
}
Ok(_) => {
if let Err(error) = consumer.ack_with_id(topic.as_str(), message_id).await {
pulsar_error_events.emit(PulsarErrorEventData {
msg: error.to_string(),
error_type: PulsarErrorEventType::Ack,
});
}
}
},
}
}

Expand Down