Skip to content

downstream: re-architect workers support#11775

Open
edsiper wants to merge 12 commits intomasterfrom
downstream-workers
Open

downstream: re-architect workers support#11775
edsiper wants to merge 12 commits intomasterfrom
downstream-workers

Conversation

@edsiper
Copy link
Copy Markdown
Member

@edsiper edsiper commented May 4, 2026

fixes #11757

Adds shared downstream worker support and enables workers for downstream-based inputs:

  • in_tcp
  • in_udp
  • in_forward

Also refactors flb_http_server to use the same shared downstream worker runtime, avoiding
duplicated worker lifecycle code across HTTP and non-HTTP downstream listeners.

Scope

  • Adds flb_downstream_worker helper for worker lifecycle:
    • thread startup/shutdown
    • per-worker event loop
    • DNS context setup
    • custom event dispatch
    • startup synchronization
    • safe join/cleanup
    • maintenance callbacks
  • Adds workers config option to:
    • in_tcp
    • in_udp
    • in_forward
  • Keeps default behavior unchanged:
    • workers defaults to 1
  • Adds integration coverage for:
    • concurrent TCP workers
    • concurrent UDP workers
    • concurrent Forward workers
    • dropped/partial connections
    • malformed UDP datagrams
    • TCP TLS workers
    • Forward TLS workers
    • bad TLS handshakes followed by valid traffic

Compatibility Notes

  • Default behavior remains single listener worker.
  • in_forward workers are supported for TCP listeners; Unix socket mode still runs without worker
    fan-out.
  • in_udp has no TLS worker test because the plugin does not expose TLS support.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

  • New Features

    • TCP, UDP and Forward inputs gain configurable multi-worker listeners (workers option) for higher concurrent throughput and improved resilience.
    • Core listener/worker handling unified to improve stability of multi-worker input processing.
  • Tests

    • New integration tests covering multi-worker concurrency, TLS and non‑TLS scenarios, and recovery from partial/invalid connections.

edsiper added 6 commits May 4, 2026 17:24
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 4, 2026

📝 Walkthrough

Walkthrough

Adds a downstream worker runtime API and implementation, and integrates it into TCP, UDP, and Forward input plugins and the HTTP server. Plugins gain a workers configuration, per-worker event-loop listeners, ingress-queue routing helpers, and tests for concurrent/multi-worker scenarios.

Changes

Multi-Worker Input Framework & Plugin Integration

Layer / File(s) Summary
Public API
include/fluent-bit/flb_downstream_worker.h
New public header: callback typedefs (init, exit, maintenance, foreach), flb_downstream_worker & flb_downstream_worker_options structs, and runtime APIs start, stop, foreach.
Core Runtime
src/flb_downstream_worker.c, src/CMakeLists.txt
Implements downstream worker runtime: per-worker event loop, init/maintenance/exit sequencing, startup synchronization, shutdown/join logic; added to build.
HTTP Server Migration
src/http_server/flb_http_server.c, include/fluent-bit/http_server/flb_http_server.h
Replaced pthread-per-worker implementation with downstream-worker runtime callbacks (init/exit/maintenance); runtime field updated to downstream runtime type.
Plugin Headers & Helpers
plugins/in_forward/fw.h, plugins/in_tcp/tcp.h, plugins/in_udp/udp.h
Add runtime pointer and worker/listener state fields (workers, worker_id, use_ingress_queue, listener_registered, listener_event, event_loop); add inline ingestion helpers (fw_ingest_*, udp_ingest_logs, reuse tcp_ingest_logs).
Config Init / Destroy
plugins/in_forward/fw_config.c, plugins/in_tcp/tcp_config.c, plugins/in_udp/udp_config.c
Initialize workers to 1; on destroy, conditionally unregister listener event from worker event loop via mk_event_del() when registered.
Plugin Startup / Wiring
plugins/in_forward/fw.c, plugins/in_tcp/tcp.c, plugins/in_udp/udp.c
Add workers config option (INT, default 1). Startup chooses multi-worker runtime (enable ingress, start downstream runtime) when workers > 1, otherwise use single collector-socket listener; add worker init/exit/maintenance and pause/resume dispatching.
Data Path Changes
plugins/in_forward/fw_prot.c, plugins/in_tcp/tcp_conn.c, plugins/in_udp/udp_conn.c
Replace direct flb_input_*_append() calls with plugin-specific ingestion helpers that route via ingress queue when enabled; adjust ownership/cleanup of decoded metric/trace objects accordingly.
Integration Tests & Scenarios
tests/integration/scenarios/in_*/config/*workers*.yaml, tests/integration/scenarios/in_*/tests/test_in_*_001.py
Add worker-enabled YAML scenarios and tests for TCP/UDP/Forward: concurrent sends, partial/malformed connection resilience, TLS handshake failure cases; test helpers for log polling and record-count waiting.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Runtime as Downstream_Runtime
    participant Worker as Worker(Thread/EventLoop)
    participant Engine
    participant Input as Plugin_Input

    Client->>Runtime: connect/send packet (socket)
    Runtime->>Worker: deliver socket event (worker event loop)
    Worker->>Input: invoke collect handler (in_*_collect_ctx)
    Input->>Input: decode/encode record
    Input->>Input: call ingest helper (fw/tcp/udp_ingest_*)
    alt use_ingress_queue
        Input->>Engine: flb_input_ingress_queue_* (enqueue)
    else direct_append
        Input->>Engine: flb_input_*_append (direct append)
    end
    Engine->>Worker: FLB_ENGINE_EV_CUSTOM dispatch (event loop)
    Worker->>Runtime: maintenance callbacks (periodic)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

  • fluent/fluent-bit#11114: Modifies in_forward connection/collection logic and flb_in_fw_config state — similar areas modified here.
  • fluent/fluent-bit#10790: Adds/changes downstream pause/resume lifecycle handling used by input plugins.

Suggested reviewers

  • cosmo0920
  • pwhelan
  • fujimotos

Poem

🐰 I stitched event loops, one by one,
Workers hop beneath the sun,
Packets sorted, queues align,
TCP, UDP, Forward — all fine.
Hooray for threaded hops and fun!

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 5.88% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'downstream: re-architect workers support' accurately describes the primary change: introducing/refactoring shared downstream worker infrastructure and enabling multi-worker support across plugins.
Linked Issues check ✅ Passed All coding requirements from #11757 are met: workers config added to in_tcp, in_udp, and in_forward (#11757); default=1 preserves backward compatibility (#11757); documentation and integration tests provided.
Out of Scope Changes check ✅ Passed All changes align with PR objectives: new flb_downstream_worker infrastructure for worker lifecycle, plugin integration (in_tcp/in_udp/in_forward), HTTP server refactoring to use shared workers, and comprehensive integration tests.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch downstream-workers

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@edsiper edsiper added this to the Fluent Bit v5.1 milestone May 4, 2026
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 9e576e909b

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread plugins/in_tcp/tcp.c
Comment on lines +361 to +365
if (ctx->runtime != NULL) {
flb_downstream_worker_runtime_foreach(ctx->runtime,
in_tcp_worker_pause,
NULL);
return;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Close active TCP worker connections during pause

With workers > 1, in_tcp_pause exits after dispatching in_tcp_worker_pause, and that callback only sets flb_downstream_pause on each listener. Unlike the single-worker path, no worker connection list is drained (tcp_conn_del / pending_close), so already-accepted sockets keep their event handlers and can continue ingesting records during backpressure pauses.

Useful? React with 👍 / 👎.

Comment thread plugins/in_forward/fw.c
Comment on lines +469 to +473
if (ctx->downstream != NULL) {
flb_downstream_pause(ctx->downstream);
ctx->is_paused = FLB_TRUE;
ctx->state = FW_INSTANCE_STATE_PAUSED;
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Drop worker Forward connections when pausing ingestion

In worker mode, the pause callback only marks each worker context as paused and pauses the downstream listener, but it never closes existing worker connections. Because those connections remain registered, established clients can continue sending traffic while paused, which breaks the plugin’s own pause contract to close active connections under backpressure.

Useful? React with 👍 / 👎.

Comment thread src/flb_downstream_worker.c Outdated
Comment on lines +195 to +199
runtime->workers[i].should_exit = FLB_TRUE;
if (runtime->workers[i].thread_created == FLB_TRUE) {
pthread_join(runtime->workers[i].thread, NULL);
}
downstream_worker_context_cleanup(&runtime->workers[i]);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Skip pthread cleanup for workers never initialized

Startup failure paths call flb_downstream_worker_runtime_stop, which currently destroys mutex/cond objects for every slot in worker_count. However, only workers that reached downstream_worker_context_reset were pthread-initialized; later slots were merely zeroed by calloc, so destroying them is undefined behavior and can crash when worker creation/init fails partway through startup.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
plugins/in_forward/fw.c (1)

159-172: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Move the trace log before flb_downstream_conn_release to avoid logging a closed file descriptor.

At line 168, connection->fd is dereferenced after flb_downstream_conn_release(connection) is called on line 167. While the connection struct remains in memory, prepare_destroy_conn closes the underlying socket via flb_socket_close(connection->fd), making the fd a closed resource. Logging the fd after closure is semantically incorrect and should be moved before the release.

Suggested fix
    if(ctx->is_paused) {
+       flb_plg_trace(ctx->ins, "TCP connection will be closed FD=%i", connection->fd);
        flb_downstream_conn_release(connection);
-       flb_plg_trace(ctx->ins, "TCP connection will be closed FD=%i", connection->fd);
        ctx->state = state_backup;

        return -1;
    }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@plugins/in_forward/fw.c` around lines 159 - 172, The trace log is emitted
after flb_downstream_conn_release(connection) which can close connection->fd;
move the flb_plg_trace call so it runs before flb_downstream_conn_release to
avoid logging a closed FD. Specifically, in the branch that checks
ctx->is_paused, call flb_plg_trace(ctx->ins, "TCP connection will be closed
FD=%i", connection->fd) before flb_downstream_conn_release(connection), then set
ctx->state = state_backup and return -1, preserving the same behavior but
ensuring the FD is logged while still valid.
🧹 Nitpick comments (3)
tests/integration/scenarios/in_udp/tests/test_in_udp_001.py (2)

75-81: 💤 Low value

Minor: flattened_records() is evaluated twice per poll.

The lambda in wait_for_record_count calls self.flattened_records() twice on every poll cycle (once for the length check, once to return the list). For larger payload counts this doubles the iteration work. Caching the call result avoids it.

♻️ Optional refactor
     def wait_for_record_count(self, minimum_count, timeout=10):
+        def _check():
+            records = self.flattened_records()
+            return records if len(records) >= minimum_count else None
+
         return self.service.wait_for_condition(
-            lambda: self.flattened_records() if len(self.flattened_records()) >= minimum_count else None,
+            _check,
             timeout=timeout,
             interval=0.2,
             description=f"{minimum_count} forwarded UDP payloads",
         )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/integration/scenarios/in_udp/tests/test_in_udp_001.py` around lines 75
- 81, The lambda passed to wait_for_condition in wait_for_record_count calls
self.flattened_records() twice per poll, wasting work; modify the lambda so it
calls self.flattened_records() once (e.g., assign to a local variable like
records inside the lambda), then check len(records) against minimum_count and
return records when ready. Update the lambda used in wait_for_record_count to
reuse that single records value and keep the same timeout/interval/description
parameters for wait_for_condition.

148-174: ⚡ Quick win

The config correctly uses format: json to drop malformed datagrams.

The in_udp_json_workers.yaml file uses format: json (confirmed), which drops invalid records rather than forwarding them as fallback log lines. The test will work as intended—malformed datagrams are discarded and only valid records reach the assertion, so the KeyError risk is not present here.

That said, adding an explicit assert len(records) == valid_records check before the values assertion would provide a clearer failure message if the count unexpectedly diverges, improving test robustness.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/integration/scenarios/in_udp/tests/test_in_udp_001.py` around lines 148
- 174, The test test_in_udp_workers_drop_malformed_datagrams_and_continue relies
on wait_for_record_count but lacks an explicit check that the number of records
equals valid_records; add an assertion like assert len(records) == valid_records
immediately after records = service.wait_for_record_count(...) to produce a
clearer failure message if counts differ before extracting values from records.
tests/integration/scenarios/in_forward/tests/test_in_forward_001.py (1)

424-432: ⚡ Quick win

Consolidate duplicate TCP-send helpers.

_drop_partial_tcp_payload and _drop_raw_tls_connection are byte-for-byte identical to the existing _send_tcp_payload — three copies of the same two-line helper. The differentiation only lives in the names (and _drop_raw_tls_connection's name is also misleading: it doesn't speak TLS, it just dumps raw bytes against a TLS port). Either drop the new helpers and call _send_tcp_payload at the test sites with a brief inline comment, or keep one helper whose name explains the negative-path intent.

♻️ Proposed consolidation
-def _drop_partial_tcp_payload(port, payload):
-    with socket.create_connection(("127.0.0.1", port), timeout=5) as sock:
-        sock.sendall(payload)
-
-
 def _send_unix_payload(path, payload):
@@
-def _drop_raw_tls_connection(port, payload):
-    with socket.create_connection(("127.0.0.1", port), timeout=5) as sock:
-        sock.sendall(payload)
-
-
 def _recv_msgpack_value(sock):

…and update the two call sites:

-                lambda _: _drop_partial_tcp_payload(service.flb_listener_port, b"\x93\xa4test"),
+                # Send a truncated msgpack array and close to exercise partial-payload handling.
+                lambda _: _send_tcp_payload(service.flb_listener_port, b"\x93\xa4test"),
@@
-                lambda i: _drop_raw_tls_connection(service.flb_listener_port,
-                                                   f"not-tls-{i}".encode("utf-8")),
+                # Send raw (non-TLS) bytes to a TLS port to exercise bad-handshake recovery.
+                lambda i: _send_tcp_payload(service.flb_listener_port,
+                                            f"not-tls-{i}".encode("utf-8")),

Also applies to: 448-451

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/integration/scenarios/in_forward/tests/test_in_forward_001.py` around
lines 424 - 432, Duplicate two-line TCP helpers _drop_partial_tcp_payload and
_drop_raw_tls_connection should be consolidated with the existing
_send_tcp_payload (or replace all three with a single clearer-named helper like
_send_raw_bytes_or_abort) — remove the duplicate functions, update all call
sites that currently call _drop_partial_tcp_payload or _drop_raw_tls_connection
to call the chosen single helper (_send_tcp_payload or the new name), and add a
brief inline comment at those call sites indicating the negative-path intent
(e.g., "send raw bytes and close to simulate partial/drop"). Ensure references
to _send_tcp_payload, _drop_partial_tcp_payload and _drop_raw_tls_connection in
tests are updated accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@plugins/in_forward/fw_prot.c`:
- Around line 1140-1151: The error paths for metrics and traces call
cmt_decode_msgpack_destroy(cmt) unconditionally and can double-free when
ownership was transferred to the ingress queue; update both error branches (the
metrics path around the fw_ingest_metrics call and the traces path around the
corresponding fw_ingest_traces call) to only call
cmt_decode_msgpack_destroy(cmt) when conn->ctx->use_ingress_queue == FLB_FALSE
(matching the success path), ensuring that when ingress queue ownership is used
the ingress event/queue cleanup (e.g., flb_input_ingress_event_destroy) remains
responsible for freeing cmt.

In `@plugins/in_tcp/tcp_conn.c`:
- Around line 143-145: The tcp_ingest_logs() calls currently ignore its return
value and can drop records; update both call sites (the two tcp_ingest_logs(ctx,
ctx->log_encoder->output_buffer, ctx->log_encoder->output_length) invocations)
to check the returned status, log a failure with context (include the returned
error code and any relevant ctx identifiers), and propagate the error out of the
enclosing function instead of silently continuing—i.e., if tcp_ingest_logs()
returns non-zero, call the logger on ctx (or appropriate logger) with a
descriptive message and return/forward a non-success code (or set the enclosing
function’s error return) so upstream code can handle the failure.

In `@plugins/in_tcp/tcp.c`:
- Around line 206-220: The worker pause path (in_tcp_worker_pause) currently
only calls flb_downstream_pause(ctx->downstream) and fails to walk
ctx->connections to drain/close worker-owned client sessions; update
in_tcp_worker_pause (and the analogous block around lines 361-366) to iterate
ctx->connections and for each connection perform the same shutdown/close steps
the single-listener path uses (e.g., stop reads, flush/drain pending data and
then close/destroy the connection using the existing connection shutdown/close
helpers), ensuring connections cannot continue reading after pause.

In `@plugins/in_udp/udp.c`:
- Around line 88-110: The startup path creates ctx->dummy_conn via udp_conn_add
but if collector registration fails (flb_input_set_collector_socket or
flb_input_collector_get_event returns NULL) the current error path calls
udp_config_destroy(ctx) without first destroying the dummy connection; modify
the failure branches in the block that sets the collector (the code around
flb_input_set_collector_socket/in_udp_collect and the subsequent
ctx->collector_event check) to call in_udp_dummy_conn_destroy(ctx) (or otherwise
free ctx->dummy_conn) before returning/calling udp_config_destroy so the dummy
connection is cleaned up; apply the same fix to the similar failure path around
lines 288-291.

In `@src/flb_downstream_worker.c`:
- Around line 84-96: The loop is reading worker->should_exit while
flb_downstream_worker_runtime_stop() writes it from another thread, causing a
data race; change the shutdown flag to be synchronized (either replace
worker->should_exit with an atomic type and use atomic load/store, or guard
accesses with the worker runtime mutex), update the writer in
flb_downstream_worker_runtime_stop() to use the same atomic/store or mutex, and
also fix the other occurrence noted (the second read/write pair around the later
shutdown code) so all reads/writes to the shutdown flag use the same
synchronization primitive.
- Around line 147-180: The shutdown path calls
flb_downstream_worker_runtime_stop() which unconditionally runs
downstream_worker_context_cleanup() over all runtime->workers, potentially
destroying mutex/cond for slots never initialized; fix by tracking how many
slots were actually set up and only cleaning those: either set
runtime->worker_count (or a new field like runtime->active_workers) to i before
calling flb_downstream_worker_runtime_stop(), or change
flb_downstream_worker_runtime_stop() to inspect each
runtime->workers[k].thread_created or runtime->workers[k].initialized and skip
cleanup for entries that are false so downstream_worker_context_cleanup() is
only called for properly reset/initialized workers (references:
downstream_worker_context_reset, downstream_worker_context_cleanup,
flb_downstream_worker_runtime_stop, runtime->workers, thread_created,
initialized).

In `@src/http_server/flb_http_server.c`:
- Around line 483-486: flb_http_server_start() failure path leaks the
partially-initialized per-worker server because flb_http_server_worker_init()
only assigns *worker_context on success and the normal
flb_http_server_worker_exit() won't be invoked; before freeing context on start
failure, explicitly call the worker cleanup routine (flb_http_server_worker_exit
or the server-destroy helper that releases context->server) to tear down
context->server, then free context, and make the identical change for the other
symmetric failure block that also frees context after a failed
flb_http_server_start().

---

Outside diff comments:
In `@plugins/in_forward/fw.c`:
- Around line 159-172: The trace log is emitted after
flb_downstream_conn_release(connection) which can close connection->fd; move the
flb_plg_trace call so it runs before flb_downstream_conn_release to avoid
logging a closed FD. Specifically, in the branch that checks ctx->is_paused,
call flb_plg_trace(ctx->ins, "TCP connection will be closed FD=%i",
connection->fd) before flb_downstream_conn_release(connection), then set
ctx->state = state_backup and return -1, preserving the same behavior but
ensuring the FD is logged while still valid.

---

Nitpick comments:
In `@tests/integration/scenarios/in_forward/tests/test_in_forward_001.py`:
- Around line 424-432: Duplicate two-line TCP helpers _drop_partial_tcp_payload
and _drop_raw_tls_connection should be consolidated with the existing
_send_tcp_payload (or replace all three with a single clearer-named helper like
_send_raw_bytes_or_abort) — remove the duplicate functions, update all call
sites that currently call _drop_partial_tcp_payload or _drop_raw_tls_connection
to call the chosen single helper (_send_tcp_payload or the new name), and add a
brief inline comment at those call sites indicating the negative-path intent
(e.g., "send raw bytes and close to simulate partial/drop"). Ensure references
to _send_tcp_payload, _drop_partial_tcp_payload and _drop_raw_tls_connection in
tests are updated accordingly.

In `@tests/integration/scenarios/in_udp/tests/test_in_udp_001.py`:
- Around line 75-81: The lambda passed to wait_for_condition in
wait_for_record_count calls self.flattened_records() twice per poll, wasting
work; modify the lambda so it calls self.flattened_records() once (e.g., assign
to a local variable like records inside the lambda), then check len(records)
against minimum_count and return records when ready. Update the lambda used in
wait_for_record_count to reuse that single records value and keep the same
timeout/interval/description parameters for wait_for_condition.
- Around line 148-174: The test
test_in_udp_workers_drop_malformed_datagrams_and_continue relies on
wait_for_record_count but lacks an explicit check that the number of records
equals valid_records; add an assertion like assert len(records) == valid_records
immediately after records = service.wait_for_record_count(...) to produce a
clearer failure message if counts differ before extracting values from records.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 71fad1c8-6e23-49bd-8a3f-8580866af9a0

📥 Commits

Reviewing files that changed from the base of the PR and between 86ec64a and 9e576e9.

📒 Files selected for processing (26)
  • include/fluent-bit/flb_downstream_worker.h
  • include/fluent-bit/http_server/flb_http_server.h
  • plugins/in_forward/fw.c
  • plugins/in_forward/fw.h
  • plugins/in_forward/fw_config.c
  • plugins/in_forward/fw_prot.c
  • plugins/in_tcp/tcp.c
  • plugins/in_tcp/tcp.h
  • plugins/in_tcp/tcp_config.c
  • plugins/in_tcp/tcp_conn.c
  • plugins/in_udp/udp.c
  • plugins/in_udp/udp.h
  • plugins/in_udp/udp_config.c
  • plugins/in_udp/udp_conn.c
  • src/CMakeLists.txt
  • src/flb_downstream_worker.c
  • src/http_server/flb_http_server.c
  • tests/integration/scenarios/in_forward/config/in_forward_tls_workers.yaml
  • tests/integration/scenarios/in_forward/config/in_forward_workers.yaml
  • tests/integration/scenarios/in_forward/tests/test_in_forward_001.py
  • tests/integration/scenarios/in_tcp/config/in_tcp_parser_json_tls_workers.yaml
  • tests/integration/scenarios/in_tcp/config/in_tcp_parser_json_workers.yaml
  • tests/integration/scenarios/in_tcp/tests/test_in_tcp_001.py
  • tests/integration/scenarios/in_udp/config/in_udp_json_workers.yaml
  • tests/integration/scenarios/in_udp/config/in_udp_parser_json_workers.yaml
  • tests/integration/scenarios/in_udp/tests/test_in_udp_001.py

Comment thread plugins/in_forward/fw_prot.c
Comment thread plugins/in_tcp/tcp_conn.c Outdated
Comment thread plugins/in_tcp/tcp.c
Comment thread plugins/in_udp/udp.c
Comment thread src/flb_downstream_worker.c Outdated
Comment thread src/flb_downstream_worker.c
Comment thread src/http_server/flb_http_server.c
edsiper added 6 commits May 5, 2026 09:34
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@tests/integration/scenarios/in_forward/tests/test_in_forward_001.py`:
- Around line 619-620: The startup log wait call
service.wait_for_log_message("with 4 workers") (and the other similar
wait_for_log_message calls) must be moved inside the try/finally that surrounds
the test runtime so service.stop() always runs; modify each affected test to
enter the try block first, call wait_for_log_message(...) inside that try, then
keep service.stop() in the finally block (ensuring the same unique symbols
service.wait_for_log_message and service.stop are used and no early returns
occur before the finally).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: de10bd2e-7153-43d6-96b3-d5d3bec44ca9

📥 Commits

Reviewing files that changed from the base of the PR and between 9e576e9 and 22a2ceb.

📒 Files selected for processing (10)
  • include/fluent-bit/flb_downstream_worker.h
  • plugins/in_forward/fw.c
  • plugins/in_forward/fw_prot.c
  • plugins/in_tcp/tcp.c
  • plugins/in_tcp/tcp_conn.c
  • plugins/in_udp/udp.c
  • src/flb_downstream_worker.c
  • src/http_server/flb_http_server.c
  • tests/integration/scenarios/in_forward/tests/test_in_forward_001.py
  • tests/integration/scenarios/in_udp/tests/test_in_udp_001.py
✅ Files skipped from review due to trivial changes (2)
  • include/fluent-bit/flb_downstream_worker.h
  • plugins/in_forward/fw.c
🚧 Files skipped from review as they are similar to previous changes (5)
  • plugins/in_forward/fw_prot.c
  • plugins/in_udp/udp.c
  • tests/integration/scenarios/in_udp/tests/test_in_udp_001.py
  • src/http_server/flb_http_server.c
  • plugins/in_tcp/tcp.c

Comment on lines +619 to +620
service.wait_for_log_message("with 4 workers", timeout=10)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Move startup log wait inside try/finally to guarantee teardown.

At Line 619, Line 644, Line 860, and Line 886, wait_for_log_message(...) runs before try. If it times out, service.stop() is never called and the test can leak a running Fluent Bit instance.

Suggested fix pattern
 def test_in_forward_workers_concurrent_message_mode_records():
     total_records = 16
     service = Service("in_forward_workers.yaml")
     service.start()
-    service.wait_for_log_message("with 4 workers", timeout=10)

     try:
+        service.wait_for_log_message("with 4 workers", timeout=10)
         payloads = [
             _message_mode_payload(TEST_TAG, {"message": f"worker-{i}", "value": i})
             for i in range(total_records)
         ]
         with ThreadPoolExecutor(max_workers=total_records) as executor:

Apply the same placement change to the other three worker tests.

Also applies to: 644-645, 860-861, 886-887

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/integration/scenarios/in_forward/tests/test_in_forward_001.py` around
lines 619 - 620, The startup log wait call service.wait_for_log_message("with 4
workers") (and the other similar wait_for_log_message calls) must be moved
inside the try/finally that surrounds the test runtime so service.stop() always
runs; modify each affected test to enter the try block first, call
wait_for_log_message(...) inside that try, then keep service.stop() in the
finally block (ensuring the same unique symbols service.wait_for_log_message and
service.stop are used and no early returns occur before the finally).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add workers to tcp, udp, and forward input plugins

1 participant