Conversation
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
📝 WalkthroughWalkthroughAdds a downstream worker runtime API and implementation, and integrates it into TCP, UDP, and Forward input plugins and the HTTP server. Plugins gain a ChangesMulti-Worker Input Framework & Plugin Integration
Sequence Diagram(s)sequenceDiagram
participant Client
participant Runtime as Downstream_Runtime
participant Worker as Worker(Thread/EventLoop)
participant Engine
participant Input as Plugin_Input
Client->>Runtime: connect/send packet (socket)
Runtime->>Worker: deliver socket event (worker event loop)
Worker->>Input: invoke collect handler (in_*_collect_ctx)
Input->>Input: decode/encode record
Input->>Input: call ingest helper (fw/tcp/udp_ingest_*)
alt use_ingress_queue
Input->>Engine: flb_input_ingress_queue_* (enqueue)
else direct_append
Input->>Engine: flb_input_*_append (direct append)
end
Engine->>Worker: FLB_ENGINE_EV_CUSTOM dispatch (event loop)
Worker->>Runtime: maintenance callbacks (periodic)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 9e576e909b
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if (ctx->runtime != NULL) { | ||
| flb_downstream_worker_runtime_foreach(ctx->runtime, | ||
| in_tcp_worker_pause, | ||
| NULL); | ||
| return; |
There was a problem hiding this comment.
Close active TCP worker connections during pause
With workers > 1, in_tcp_pause exits after dispatching in_tcp_worker_pause, and that callback only sets flb_downstream_pause on each listener. Unlike the single-worker path, no worker connection list is drained (tcp_conn_del / pending_close), so already-accepted sockets keep their event handlers and can continue ingesting records during backpressure pauses.
Useful? React with 👍 / 👎.
| if (ctx->downstream != NULL) { | ||
| flb_downstream_pause(ctx->downstream); | ||
| ctx->is_paused = FLB_TRUE; | ||
| ctx->state = FW_INSTANCE_STATE_PAUSED; | ||
| } |
There was a problem hiding this comment.
Drop worker Forward connections when pausing ingestion
In worker mode, the pause callback only marks each worker context as paused and pauses the downstream listener, but it never closes existing worker connections. Because those connections remain registered, established clients can continue sending traffic while paused, which breaks the plugin’s own pause contract to close active connections under backpressure.
Useful? React with 👍 / 👎.
| runtime->workers[i].should_exit = FLB_TRUE; | ||
| if (runtime->workers[i].thread_created == FLB_TRUE) { | ||
| pthread_join(runtime->workers[i].thread, NULL); | ||
| } | ||
| downstream_worker_context_cleanup(&runtime->workers[i]); |
There was a problem hiding this comment.
Skip pthread cleanup for workers never initialized
Startup failure paths call flb_downstream_worker_runtime_stop, which currently destroys mutex/cond objects for every slot in worker_count. However, only workers that reached downstream_worker_context_reset were pthread-initialized; later slots were merely zeroed by calloc, so destroying them is undefined behavior and can crash when worker creation/init fails partway through startup.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
plugins/in_forward/fw.c (1)
159-172:⚠️ Potential issue | 🟠 Major | ⚡ Quick winMove the trace log before
flb_downstream_conn_releaseto avoid logging a closed file descriptor.At line 168,
connection->fdis dereferenced afterflb_downstream_conn_release(connection)is called on line 167. While the connection struct remains in memory,prepare_destroy_conncloses the underlying socket viaflb_socket_close(connection->fd), making the fd a closed resource. Logging the fd after closure is semantically incorrect and should be moved before the release.Suggested fix
if(ctx->is_paused) { + flb_plg_trace(ctx->ins, "TCP connection will be closed FD=%i", connection->fd); flb_downstream_conn_release(connection); - flb_plg_trace(ctx->ins, "TCP connection will be closed FD=%i", connection->fd); ctx->state = state_backup; return -1; }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@plugins/in_forward/fw.c` around lines 159 - 172, The trace log is emitted after flb_downstream_conn_release(connection) which can close connection->fd; move the flb_plg_trace call so it runs before flb_downstream_conn_release to avoid logging a closed FD. Specifically, in the branch that checks ctx->is_paused, call flb_plg_trace(ctx->ins, "TCP connection will be closed FD=%i", connection->fd) before flb_downstream_conn_release(connection), then set ctx->state = state_backup and return -1, preserving the same behavior but ensuring the FD is logged while still valid.
🧹 Nitpick comments (3)
tests/integration/scenarios/in_udp/tests/test_in_udp_001.py (2)
75-81: 💤 Low valueMinor:
flattened_records()is evaluated twice per poll.The lambda in
wait_for_record_countcallsself.flattened_records()twice on every poll cycle (once for the length check, once to return the list). For larger payload counts this doubles the iteration work. Caching the call result avoids it.♻️ Optional refactor
def wait_for_record_count(self, minimum_count, timeout=10): + def _check(): + records = self.flattened_records() + return records if len(records) >= minimum_count else None + return self.service.wait_for_condition( - lambda: self.flattened_records() if len(self.flattened_records()) >= minimum_count else None, + _check, timeout=timeout, interval=0.2, description=f"{minimum_count} forwarded UDP payloads", )🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/integration/scenarios/in_udp/tests/test_in_udp_001.py` around lines 75 - 81, The lambda passed to wait_for_condition in wait_for_record_count calls self.flattened_records() twice per poll, wasting work; modify the lambda so it calls self.flattened_records() once (e.g., assign to a local variable like records inside the lambda), then check len(records) against minimum_count and return records when ready. Update the lambda used in wait_for_record_count to reuse that single records value and keep the same timeout/interval/description parameters for wait_for_condition.
148-174: ⚡ Quick winThe config correctly uses
format: jsonto drop malformed datagrams.The
in_udp_json_workers.yamlfile usesformat: json(confirmed), which drops invalid records rather than forwarding them as fallback log lines. The test will work as intended—malformed datagrams are discarded and only valid records reach the assertion, so theKeyErrorrisk is not present here.That said, adding an explicit
assert len(records) == valid_recordscheck before the values assertion would provide a clearer failure message if the count unexpectedly diverges, improving test robustness.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/integration/scenarios/in_udp/tests/test_in_udp_001.py` around lines 148 - 174, The test test_in_udp_workers_drop_malformed_datagrams_and_continue relies on wait_for_record_count but lacks an explicit check that the number of records equals valid_records; add an assertion like assert len(records) == valid_records immediately after records = service.wait_for_record_count(...) to produce a clearer failure message if counts differ before extracting values from records.tests/integration/scenarios/in_forward/tests/test_in_forward_001.py (1)
424-432: ⚡ Quick winConsolidate duplicate TCP-send helpers.
_drop_partial_tcp_payloadand_drop_raw_tls_connectionare byte-for-byte identical to the existing_send_tcp_payload— three copies of the same two-line helper. The differentiation only lives in the names (and_drop_raw_tls_connection's name is also misleading: it doesn't speak TLS, it just dumps raw bytes against a TLS port). Either drop the new helpers and call_send_tcp_payloadat the test sites with a brief inline comment, or keep one helper whose name explains the negative-path intent.♻️ Proposed consolidation
-def _drop_partial_tcp_payload(port, payload): - with socket.create_connection(("127.0.0.1", port), timeout=5) as sock: - sock.sendall(payload) - - def _send_unix_payload(path, payload): @@ -def _drop_raw_tls_connection(port, payload): - with socket.create_connection(("127.0.0.1", port), timeout=5) as sock: - sock.sendall(payload) - - def _recv_msgpack_value(sock):…and update the two call sites:
- lambda _: _drop_partial_tcp_payload(service.flb_listener_port, b"\x93\xa4test"), + # Send a truncated msgpack array and close to exercise partial-payload handling. + lambda _: _send_tcp_payload(service.flb_listener_port, b"\x93\xa4test"), @@ - lambda i: _drop_raw_tls_connection(service.flb_listener_port, - f"not-tls-{i}".encode("utf-8")), + # Send raw (non-TLS) bytes to a TLS port to exercise bad-handshake recovery. + lambda i: _send_tcp_payload(service.flb_listener_port, + f"not-tls-{i}".encode("utf-8")),Also applies to: 448-451
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/integration/scenarios/in_forward/tests/test_in_forward_001.py` around lines 424 - 432, Duplicate two-line TCP helpers _drop_partial_tcp_payload and _drop_raw_tls_connection should be consolidated with the existing _send_tcp_payload (or replace all three with a single clearer-named helper like _send_raw_bytes_or_abort) — remove the duplicate functions, update all call sites that currently call _drop_partial_tcp_payload or _drop_raw_tls_connection to call the chosen single helper (_send_tcp_payload or the new name), and add a brief inline comment at those call sites indicating the negative-path intent (e.g., "send raw bytes and close to simulate partial/drop"). Ensure references to _send_tcp_payload, _drop_partial_tcp_payload and _drop_raw_tls_connection in tests are updated accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@plugins/in_forward/fw_prot.c`:
- Around line 1140-1151: The error paths for metrics and traces call
cmt_decode_msgpack_destroy(cmt) unconditionally and can double-free when
ownership was transferred to the ingress queue; update both error branches (the
metrics path around the fw_ingest_metrics call and the traces path around the
corresponding fw_ingest_traces call) to only call
cmt_decode_msgpack_destroy(cmt) when conn->ctx->use_ingress_queue == FLB_FALSE
(matching the success path), ensuring that when ingress queue ownership is used
the ingress event/queue cleanup (e.g., flb_input_ingress_event_destroy) remains
responsible for freeing cmt.
In `@plugins/in_tcp/tcp_conn.c`:
- Around line 143-145: The tcp_ingest_logs() calls currently ignore its return
value and can drop records; update both call sites (the two tcp_ingest_logs(ctx,
ctx->log_encoder->output_buffer, ctx->log_encoder->output_length) invocations)
to check the returned status, log a failure with context (include the returned
error code and any relevant ctx identifiers), and propagate the error out of the
enclosing function instead of silently continuing—i.e., if tcp_ingest_logs()
returns non-zero, call the logger on ctx (or appropriate logger) with a
descriptive message and return/forward a non-success code (or set the enclosing
function’s error return) so upstream code can handle the failure.
In `@plugins/in_tcp/tcp.c`:
- Around line 206-220: The worker pause path (in_tcp_worker_pause) currently
only calls flb_downstream_pause(ctx->downstream) and fails to walk
ctx->connections to drain/close worker-owned client sessions; update
in_tcp_worker_pause (and the analogous block around lines 361-366) to iterate
ctx->connections and for each connection perform the same shutdown/close steps
the single-listener path uses (e.g., stop reads, flush/drain pending data and
then close/destroy the connection using the existing connection shutdown/close
helpers), ensuring connections cannot continue reading after pause.
In `@plugins/in_udp/udp.c`:
- Around line 88-110: The startup path creates ctx->dummy_conn via udp_conn_add
but if collector registration fails (flb_input_set_collector_socket or
flb_input_collector_get_event returns NULL) the current error path calls
udp_config_destroy(ctx) without first destroying the dummy connection; modify
the failure branches in the block that sets the collector (the code around
flb_input_set_collector_socket/in_udp_collect and the subsequent
ctx->collector_event check) to call in_udp_dummy_conn_destroy(ctx) (or otherwise
free ctx->dummy_conn) before returning/calling udp_config_destroy so the dummy
connection is cleaned up; apply the same fix to the similar failure path around
lines 288-291.
In `@src/flb_downstream_worker.c`:
- Around line 84-96: The loop is reading worker->should_exit while
flb_downstream_worker_runtime_stop() writes it from another thread, causing a
data race; change the shutdown flag to be synchronized (either replace
worker->should_exit with an atomic type and use atomic load/store, or guard
accesses with the worker runtime mutex), update the writer in
flb_downstream_worker_runtime_stop() to use the same atomic/store or mutex, and
also fix the other occurrence noted (the second read/write pair around the later
shutdown code) so all reads/writes to the shutdown flag use the same
synchronization primitive.
- Around line 147-180: The shutdown path calls
flb_downstream_worker_runtime_stop() which unconditionally runs
downstream_worker_context_cleanup() over all runtime->workers, potentially
destroying mutex/cond for slots never initialized; fix by tracking how many
slots were actually set up and only cleaning those: either set
runtime->worker_count (or a new field like runtime->active_workers) to i before
calling flb_downstream_worker_runtime_stop(), or change
flb_downstream_worker_runtime_stop() to inspect each
runtime->workers[k].thread_created or runtime->workers[k].initialized and skip
cleanup for entries that are false so downstream_worker_context_cleanup() is
only called for properly reset/initialized workers (references:
downstream_worker_context_reset, downstream_worker_context_cleanup,
flb_downstream_worker_runtime_stop, runtime->workers, thread_created,
initialized).
In `@src/http_server/flb_http_server.c`:
- Around line 483-486: flb_http_server_start() failure path leaks the
partially-initialized per-worker server because flb_http_server_worker_init()
only assigns *worker_context on success and the normal
flb_http_server_worker_exit() won't be invoked; before freeing context on start
failure, explicitly call the worker cleanup routine (flb_http_server_worker_exit
or the server-destroy helper that releases context->server) to tear down
context->server, then free context, and make the identical change for the other
symmetric failure block that also frees context after a failed
flb_http_server_start().
---
Outside diff comments:
In `@plugins/in_forward/fw.c`:
- Around line 159-172: The trace log is emitted after
flb_downstream_conn_release(connection) which can close connection->fd; move the
flb_plg_trace call so it runs before flb_downstream_conn_release to avoid
logging a closed FD. Specifically, in the branch that checks ctx->is_paused,
call flb_plg_trace(ctx->ins, "TCP connection will be closed FD=%i",
connection->fd) before flb_downstream_conn_release(connection), then set
ctx->state = state_backup and return -1, preserving the same behavior but
ensuring the FD is logged while still valid.
---
Nitpick comments:
In `@tests/integration/scenarios/in_forward/tests/test_in_forward_001.py`:
- Around line 424-432: Duplicate two-line TCP helpers _drop_partial_tcp_payload
and _drop_raw_tls_connection should be consolidated with the existing
_send_tcp_payload (or replace all three with a single clearer-named helper like
_send_raw_bytes_or_abort) — remove the duplicate functions, update all call
sites that currently call _drop_partial_tcp_payload or _drop_raw_tls_connection
to call the chosen single helper (_send_tcp_payload or the new name), and add a
brief inline comment at those call sites indicating the negative-path intent
(e.g., "send raw bytes and close to simulate partial/drop"). Ensure references
to _send_tcp_payload, _drop_partial_tcp_payload and _drop_raw_tls_connection in
tests are updated accordingly.
In `@tests/integration/scenarios/in_udp/tests/test_in_udp_001.py`:
- Around line 75-81: The lambda passed to wait_for_condition in
wait_for_record_count calls self.flattened_records() twice per poll, wasting
work; modify the lambda so it calls self.flattened_records() once (e.g., assign
to a local variable like records inside the lambda), then check len(records)
against minimum_count and return records when ready. Update the lambda used in
wait_for_record_count to reuse that single records value and keep the same
timeout/interval/description parameters for wait_for_condition.
- Around line 148-174: The test
test_in_udp_workers_drop_malformed_datagrams_and_continue relies on
wait_for_record_count but lacks an explicit check that the number of records
equals valid_records; add an assertion like assert len(records) == valid_records
immediately after records = service.wait_for_record_count(...) to produce a
clearer failure message if counts differ before extracting values from records.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 71fad1c8-6e23-49bd-8a3f-8580866af9a0
📒 Files selected for processing (26)
include/fluent-bit/flb_downstream_worker.hinclude/fluent-bit/http_server/flb_http_server.hplugins/in_forward/fw.cplugins/in_forward/fw.hplugins/in_forward/fw_config.cplugins/in_forward/fw_prot.cplugins/in_tcp/tcp.cplugins/in_tcp/tcp.hplugins/in_tcp/tcp_config.cplugins/in_tcp/tcp_conn.cplugins/in_udp/udp.cplugins/in_udp/udp.hplugins/in_udp/udp_config.cplugins/in_udp/udp_conn.csrc/CMakeLists.txtsrc/flb_downstream_worker.csrc/http_server/flb_http_server.ctests/integration/scenarios/in_forward/config/in_forward_tls_workers.yamltests/integration/scenarios/in_forward/config/in_forward_workers.yamltests/integration/scenarios/in_forward/tests/test_in_forward_001.pytests/integration/scenarios/in_tcp/config/in_tcp_parser_json_tls_workers.yamltests/integration/scenarios/in_tcp/config/in_tcp_parser_json_workers.yamltests/integration/scenarios/in_tcp/tests/test_in_tcp_001.pytests/integration/scenarios/in_udp/config/in_udp_json_workers.yamltests/integration/scenarios/in_udp/config/in_udp_parser_json_workers.yamltests/integration/scenarios/in_udp/tests/test_in_udp_001.py
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@tests/integration/scenarios/in_forward/tests/test_in_forward_001.py`:
- Around line 619-620: The startup log wait call
service.wait_for_log_message("with 4 workers") (and the other similar
wait_for_log_message calls) must be moved inside the try/finally that surrounds
the test runtime so service.stop() always runs; modify each affected test to
enter the try block first, call wait_for_log_message(...) inside that try, then
keep service.stop() in the finally block (ensuring the same unique symbols
service.wait_for_log_message and service.stop are used and no early returns
occur before the finally).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: de10bd2e-7153-43d6-96b3-d5d3bec44ca9
📒 Files selected for processing (10)
include/fluent-bit/flb_downstream_worker.hplugins/in_forward/fw.cplugins/in_forward/fw_prot.cplugins/in_tcp/tcp.cplugins/in_tcp/tcp_conn.cplugins/in_udp/udp.csrc/flb_downstream_worker.csrc/http_server/flb_http_server.ctests/integration/scenarios/in_forward/tests/test_in_forward_001.pytests/integration/scenarios/in_udp/tests/test_in_udp_001.py
✅ Files skipped from review due to trivial changes (2)
- include/fluent-bit/flb_downstream_worker.h
- plugins/in_forward/fw.c
🚧 Files skipped from review as they are similar to previous changes (5)
- plugins/in_forward/fw_prot.c
- plugins/in_udp/udp.c
- tests/integration/scenarios/in_udp/tests/test_in_udp_001.py
- src/http_server/flb_http_server.c
- plugins/in_tcp/tcp.c
| service.wait_for_log_message("with 4 workers", timeout=10) | ||
|
|
There was a problem hiding this comment.
Move startup log wait inside try/finally to guarantee teardown.
At Line 619, Line 644, Line 860, and Line 886, wait_for_log_message(...) runs before try. If it times out, service.stop() is never called and the test can leak a running Fluent Bit instance.
Suggested fix pattern
def test_in_forward_workers_concurrent_message_mode_records():
total_records = 16
service = Service("in_forward_workers.yaml")
service.start()
- service.wait_for_log_message("with 4 workers", timeout=10)
try:
+ service.wait_for_log_message("with 4 workers", timeout=10)
payloads = [
_message_mode_payload(TEST_TAG, {"message": f"worker-{i}", "value": i})
for i in range(total_records)
]
with ThreadPoolExecutor(max_workers=total_records) as executor:Apply the same placement change to the other three worker tests.
Also applies to: 644-645, 860-861, 886-887
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/integration/scenarios/in_forward/tests/test_in_forward_001.py` around
lines 619 - 620, The startup log wait call service.wait_for_log_message("with 4
workers") (and the other similar wait_for_log_message calls) must be moved
inside the try/finally that surrounds the test runtime so service.stop() always
runs; modify each affected test to enter the try block first, call
wait_for_log_message(...) inside that try, then keep service.stop() in the
finally block (ensuring the same unique symbols service.wait_for_log_message and
service.stop are used and no early returns occur before the finally).
fixes #11757
Adds shared downstream worker support and enables workers for downstream-based inputs:
Also refactors flb_http_server to use the same shared downstream worker runtime, avoiding
duplicated worker lifecycle code across HTTP and non-HTTP downstream listeners.
Scope
Compatibility Notes
fan-out.
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit
New Features
Tests