http_server: run internal HTTP server on dedicated worker thread#11770
http_server: run internal HTTP server on dedicated worker thread#11770jackfletch wants to merge 2 commits intofluent:masterfrom
Conversation
The internal HTTP server was hardcoded to run on the main engine event loop (`use_caller_event_loop=FLB_TRUE`). This causes a deadlock when the exec input plugin curls the server's own endpoints, since the event loop blocks waiting for the child process while the server cannot respond on the same blocked loop. Before the v5.0 migration from Monkey to `flb_http_server`, Monkey ran on its own thread via `mk_start()`, so this pattern worked fine. Give the internal HTTP server its own worker thread by setting `use_caller_event_loop=FLB_FALSE`, using the same runtime path that HTTP input plugins already use. Fixes fluent#11769 Signed-off-by: Jack Fletcher <jacksondfletcher@gmail.com>
Add a test that verifies the internal HTTP server remains responsive after the exec input plugin makes a request to its own endpoints. Ref fluent#11769 Signed-off-by: Jack Fletcher <jacksondfletcher@gmail.com>
📝 WalkthroughWalkthroughThis PR fixes a deadlock in Fluent Bit's HTTP server that occurred when pipeline components (like the exec input) made HTTP requests to internal endpoints. The fix disables the caller-provided event loop in HTTP server initialization and adds integration tests to prevent regression. ChangesHTTP Server Deadlock Fix & Validation
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. 👉 Get your free trial and get 200 agent minutes per Slack user (a $50 value). Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Review rate limit: 7/8 reviews remaining, refill in 7 minutes and 30 seconds.Comment |
|
Example configuration [SERVICE]
HTTP_Server On
HTTP_Listen 0.0.0.0
HTTP_Port 2020
Flush 1
Log_Level debug
[INPUT]
Name exec
Tag test
Command curl -s http://127.0.0.1:2020/api/v1/metrics/prometheus
Interval_Sec 30
[OUTPUT]
Name null
Match *Debug log output (patched v5.0.4) |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/http_server/flb_hs.c`:
- Around line 373-375: The shared buffer fields in struct flb_hs_buf (users,
data, raw_data, pending_free) and the hs->health_metrics list need mutual
exclusion to prevent data races between worker-thread request handlers (e.g.,
cb_metrics, cb_storage) and engine-thread push functions (flb_hs_push_metrics,
flb_hs_push_pipeline_metrics, flb_hs_push_storage_metrics,
flb_hs_push_health_metrics) and flb_hs_buf_cleanup; add a pthread_mutex_t (or
similar) to struct flb_hs_buf and a mutex protecting hs->health_metrics,
initialize/destroy them alongside the existing struct lifecycle, and surround
all accesses/updates to users/data/raw_data/pending_free and any insert/remove
operations on hs->health_metrics with the mutex (lock before reading/updating,
unlock after) so increment/decrement of users and nulling/freeing of data are
atomic and race-free.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: da0fef70-89d0-4707-8b7f-76b2dbfd8e68
📒 Files selected for processing (3)
src/http_server/flb_hs.ctests/integration/scenarios/internal_http_server/config/internal_http_server_exec_deadlock.yamltests/integration/scenarios/internal_http_server/tests/test_internal_http_server_exec_deadlock_001.py
| options.event_loop = NULL; | ||
| options.system_context = config; | ||
| options.use_caller_event_loop = FLB_TRUE; | ||
| options.use_caller_event_loop = FLB_FALSE; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Search for any synchronisation primitives protecting hs buffer fields
# in the http_server module (headers, source, callers)
echo "=== mutex / rwlock / atomic references in http_server module ==="
rg -rn --type c \
"pthread_mutex|flb_mutex|flb_lock|pthread_rwlock|rwlock|atomic|_Atomic" \
--include="*.c" --include="*.h" \
-g "*/http_server/*"
echo ""
echo "=== flb_hs struct definition (looking for lock fields) ==="
rg -rn --type c -A 40 "struct flb_hs\b" \
--include="*.h"
echo ""
echo "=== callers of flb_hs_push_pipeline_metrics, flb_hs_push_metrics, etc. ==="
rg -rn --type c \
"flb_hs_push_(pipeline_metrics|metrics|storage_metrics|health_metrics)\b"Repository: fluent/fluent-bit
Length of output: 2147
🏁 Script executed:
#!/bin/bash
echo "=== flb_hs struct definition from header ==="
fd -t f "flb_hs.h" --exec cat -n {} \;
echo ""
echo "=== Content around lines 373-375 in flb_hs.c ==="
sed -n '360,390p' src/http_server/flb_hs.c | cat -n
echo ""
echo "=== Search for any locks/mutexes in flb_hs.c ==="
grep -n "pthread_mutex\|flb_mutex\|flb_lock\|rwlock\|atomic" src/http_server/flb_hs.c
echo ""
echo "=== Search for pthread in entire http_server dir ==="
find src/http_server -name "*.c" -o -name "*.h" | xargs grep -l "pthread" 2>/dev/null
echo ""
echo "=== Search for flb_hs_push functions to see their real names ==="
grep -n "flb_hs_push" include/fluent-bit/http_server/flb_hs.h
echo ""
echo "=== Check if flb_http_server_running_on_caller_context is defined ==="
grep -rn "flb_http_server_running_on_caller_context" src/ include/Repository: fluent/fluent-bit
Length of output: 6066
🏁 Script executed:
#!/bin/bash
echo "=== flb_http_server.c for threading and synchronization ==="
grep -n "pthread\|mutex\|lock" src/http_server/flb_http_server.c | head -20
echo ""
echo "=== flb_hs_push_metrics and related implementations ==="
grep -n "flb_hs_push" src/http_server/flb_hs.c | head -20
echo ""
echo "=== Implementation of flb_hs_push_metrics function ==="
sed -n '1,50p' src/http_server/flb_hs.c | grep -A 30 "flb_hs_push_metrics"
echo ""
echo "=== Search for flb_hs_buf_cleanup ==="
grep -rn "flb_hs_buf_cleanup" src/http_server/
echo ""
echo "=== Search for where flb_hs_buf data/raw_data are accessed in handlers ==="
rg -n "hs->metrics|hs->storage_metrics|hs->health_metrics" src/http_server/flb_hs.c | head -30
echo ""
echo "=== Check flb_http_server_init_with_options for worker thread setup ==="
grep -n "flb_http_server_init_with_options" src/http_server/flb_http_server.cRepository: fluent/fluent-bit
Length of output: 3721
🏁 Script executed:
#!/bin/bash
echo "=== api_v1_registration and endpoint callbacks ==="
grep -n "api_v1_registration\|api_v1_" src/http_server/flb_hs.c | head -20
echo ""
echo "=== Look for /api/v1/metrics or similar handlers ==="
grep -rn "metrics.*callback\|flb_hs_register_endpoint" src/http_server/flb_hs.c | head -20
echo ""
echo "=== Find handler implementations for metrics endpoint ==="
sed -n '1,500p' src/http_server/flb_hs.c | grep -B 5 -A 15 "\.data\|\.raw_data" | head -60
echo ""
echo "=== Check api_v1.c for endpoint implementations ==="
find src/http_server -name "*api_v1*" -o -name "*endpoint*" | head -10
echo ""
echo "=== Check where push_* functions are called from ==="
rg -rn "flb_hs_push_metrics|flb_hs_push_health|flb_hs_push_pipeline|flb_hs_push_storage" src/ | grep -v "\.h:" | grep -v "flb_hs\.c:"Repository: fluent/fluent-bit
Length of output: 2208
🏁 Script executed:
#!/bin/bash
echo "=== flb_hs_endpoints.c - find endpoint handlers ==="
wc -l src/http_server/flb_hs_endpoints.c
echo ""
echo "=== Look for handlers that read metrics buffers ==="
grep -n "hs->metrics\|hs->storage_metrics\|hs->health_metrics" src/http_server/flb_hs_endpoints.c | head -30
echo ""
echo "=== Search for callback functions that handle requests ==="
grep -n "static.*callback\|int.*callback" src/http_server/flb_hs_endpoints.c | head -20
echo ""
echo "=== Get the metrics endpoint handler ==="
grep -B 5 -A 20 "hs->metrics\.data\|hs->metrics\.raw_data" src/http_server/flb_hs_endpoints.c | head -60
echo ""
echo "=== Confirm use_caller_event_loop behavior ==="
grep -B 5 -A 10 "use_caller_event_loop.*FLB_FALSE\|use_caller_event_loop.*FLB_TRUE" src/http_server/flb_http_server.c | head -40Repository: fluent/fluent-bit
Length of output: 1686
🏁 Script executed:
#!/bin/bash
echo "=== Full flb_hs_endpoints.c file ==="
cat -n src/http_server/flb_hs_endpoints.c
echo ""
echo "=== Look for all occurrences of 'hs' parameter usage in callbacks ==="
grep -n "hs->" src/http_server/flb_hs_endpoints.cRepository: fluent/fluent-bit
Length of output: 4701
🏁 Script executed:
#!/bin/bash
echo "=== Find api_v1_registration function ==="
rg -n "api_v1_registration" src/ --type c -A 30 | head -80
echo ""
echo "=== Search for actual HTTP request callbacks in http_server ==="
rg -n "flb_http_request\|flb_http_response" src/http_server/ --type c | grep -v "\.h:" | head -40
echo ""
echo "=== Look for where endpoint callbacks are called from ==="
rg -n "callback\(" src/http_server/flb_http_server.c | head -20
echo ""
echo "=== Check how worker threads invoke callbacks ==="
grep -B 5 -A 15 "request.*response\|handler.*callback" src/http_server/flb_http_server.c | head -60Repository: fluent/fluent-bit
Length of output: 4286
🏁 Script executed:
#!/bin/bash
echo "=== Find api_v1_metrics implementation ==="
find src/http_server/api -name "*.c" | head -20
echo ""
echo "=== List all api endpoint files ==="
find src/http_server/api -type f -name "*.c"
echo ""
echo "=== Check api_v1_metrics.c ==="
wc -l src/http_server/api/v1/metrics.c
echo ""
echo "=== Search for callback functions in v1 api ==="
grep -rn "static int.*callback\|endpoint_metrics\|cb_" src/http_server/api/v1/ --include="*.c" | head -20
echo ""
echo "=== Check metrics.c for buffer access ==="
grep -n "hs->metrics\|hs->storage_metrics" src/http_server/api/v1/metrics.cRepository: fluent/fluent-bit
Length of output: 2593
🏁 Script executed:
#!/bin/bash
echo "=== View cb_metrics function in metrics.c ==="
sed -n '405,430p' src/http_server/api/v1/metrics.c
echo ""
echo "=== View cb_metrics_prometheus and how it accesses buffers ==="
sed -n '124,160p' src/http_server/api/v1/metrics.c
echo ""
echo "=== Check cb_storage function in storage.c ==="
sed -n '40,70p' src/http_server/api/v1/storage.c
echo ""
echo "=== Check cb_health function ==="
sed -n '152,180p' src/http_server/api/v1/health.c
echo ""
echo "=== Find request_callback assignment in flb_http_server.c ==="
grep -n "request_callback.*=" src/http_server/flb_http_server.c | head -10Repository: fluent/fluent-bit
Length of output: 3860
🏁 Script executed:
#!/bin/bash
echo "=== View metrics_get_latest function ==="
grep -B 5 -A 10 "metrics_get_latest" src/http_server/api/v1/metrics.c | head -30
echo ""
echo "=== View flb_hs_buf_cleanup implementation ==="
sed -n '113,150p' src/http_server/flb_hs.c
echo ""
echo "=== View flb_hs_buf_release implementation ==="
sed -n '135,160p' src/http_server/flb_hs.c
echo ""
echo "=== Check if users field is accessed anywhere else ==="
grep -rn "\.users\+\+\|\.users--\|\.users =" src/http_server/ --include="*.c"
echo ""
echo "=== Verify that use_caller_event_loop = FLB_FALSE means worker threads ==="
grep -B 10 -A 10 "use_caller_event_loop == FLB_FALSE" src/http_server/flb_http_server.c | head -40Repository: fluent/fluent-bit
Length of output: 2507
Data race on shared hs buffers after moving HTTP server to worker thread
After this change the HTTP server runs on worker threads instead of integrating into the main event loop. The endpoint request handlers (e.g., cb_metrics, cb_storage in the API endpoints) now run concurrently with the engine's metric-push functions (flb_hs_push_metrics, flb_hs_push_pipeline_metrics, flb_hs_push_storage_metrics, flb_hs_push_health_metrics), with no synchronization:
- Request handlers increment
buf->users++and readbuf->data/buf->raw_datapointers (worker thread) - Push functions call
flb_hs_buf_cleanup()which checks/writesusers,pending_free, and nullsdata/raw_data(main engine thread) - The
usersfield instruct flb_hs_bufis plainint— not atomic — so concurrent increment/decrement can silently miscount, leading to use-after-free or leaks - No mutex or rwlock protects these fields
Add synchronization (e.g., pthread_mutex_t) guarding access to flb_hs_buf structures and hs->health_metrics list mutations before merge.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/http_server/flb_hs.c` around lines 373 - 375, The shared buffer fields in
struct flb_hs_buf (users, data, raw_data, pending_free) and the
hs->health_metrics list need mutual exclusion to prevent data races between
worker-thread request handlers (e.g., cb_metrics, cb_storage) and engine-thread
push functions (flb_hs_push_metrics, flb_hs_push_pipeline_metrics,
flb_hs_push_storage_metrics, flb_hs_push_health_metrics) and flb_hs_buf_cleanup;
add a pthread_mutex_t (or similar) to struct flb_hs_buf and a mutex protecting
hs->health_metrics, initialize/destroy them alongside the existing struct
lifecycle, and surround all accesses/updates to users/data/raw_data/pending_free
and any insert/remove operations on hs->health_metrics with the mutex (lock
before reading/updating, unlock after) so increment/decrement of users and
nulling/freeing of data are atomic and race-free.
A deadlock was introduced in 0cf4edd ("http_server: unify core listener API and built-in server") which replaced Monkey with the new
flb_http_serverfor the internal HTTP server. Monkey ran on its own thread viamk_start(), but the new implementation is hardcoded to run on the main engine event loop:When
in_execspawnscurlagainst port 2020, the main event loop blocks waiting for the child process to exit. The HTTP server can't process the request because it's on the same blocked event loop.Four days later, 2c32a6f ("http_server: add worker runtime support") added threading to
flb_http_serverfor HTTP input plugins, but the internal HTTP server wasn't updated to use it.The fix is two lines in
flb_hs_create(); give the internal HTTP server its own worker thread, same as HTTP input plugins already get.Fixes #11769
Enter
[N/A]in the box, if an item is not applicable to your change.Testing
Before we can approve your change; please submit the following in a comment:
If this is a change to packaging of containers or native binaries then please confirm it works for all targets.
ok-package-testlabel to test for all targets (requires maintainer to do).Documentation
Backporting
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit
Bug Fixes
Tests