From ae78c64f49ee3da77cb4244de8dc09faeaa7312a Mon Sep 17 00:00:00 2001 From: Albert Bausili Date: Thu, 11 Jun 2026 22:43:13 +0200 Subject: [PATCH 1/8] fix(version): resolve self-reported version from build info (stale 1.4.5 constant) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Version constant was never bumped past 1.4.5, so Results from v1.4.6+ builds misreport their producer. Resolve the module version via runtime/debug.ReadBuildInfo — correct both when loadgen is the main module of a tagged build and when consumed as a dependency — falling back to a corrected 1.4.7 constant for (devel)/test builds. --- version.go | 56 ++++++++++++++++++++++++++++++-- version_test.go | 85 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 139 insertions(+), 2 deletions(-) create mode 100644 version_test.go diff --git a/version.go b/version.go index 26060c4..fe49dc9 100644 --- a/version.go +++ b/version.go @@ -1,6 +1,58 @@ package loadgen +import ( + "runtime/debug" + "strings" +) + +// modulePath must match go.mod; used to locate loadgen in a consumer's +// dependency list when resolving Version from build info. +const modulePath = "github.com/goceleris/loadgen" + +// fallbackVersion is reported when build info carries no usable module +// version for loadgen ((devel) builds, tests, directory replaces). Keep in +// sync with the release tag. +const fallbackVersion = "1.4.7" + // Version is the loadgen release version, stamped into Result.LoadgenVersion // so downstream consumers (probatorium, perfmatrix) can record which loadgen -// build produced a given run. -const Version = "1.4.5" +// build produced a given run. Resolved from runtime/debug build info at +// startup — correct both when loadgen is the main module of a tagged build +// and when it is a dependency — with fallbackVersion as the last resort. +var Version = versionFromBuildInfo(debug.ReadBuildInfo()) + +func versionFromBuildInfo(bi *debug.BuildInfo, ok bool) string { + if !ok || bi == nil { + return fallbackVersion + } + if bi.Main.Path == modulePath { + if v := cleanModuleVersion(bi.Main.Version); v != "" { + return v + } + return fallbackVersion + } + for _, dep := range bi.Deps { + if dep.Path != modulePath { + continue + } + ver := dep.Version + if dep.Replace != nil { + ver = dep.Replace.Version + } + if v := cleanModuleVersion(ver); v != "" { + return v + } + break + } + return fallbackVersion +} + +// cleanModuleVersion normalises a module version ("v1.4.7") to the bare +// form loadgen has always reported ("1.4.7"). Returns "" when the version +// carries no release information. +func cleanModuleVersion(v string) string { + if v == "" || v == "(devel)" { + return "" + } + return strings.TrimPrefix(v, "v") +} diff --git a/version_test.go b/version_test.go new file mode 100644 index 0000000..9673460 --- /dev/null +++ b/version_test.go @@ -0,0 +1,85 @@ +package loadgen + +import ( + "runtime/debug" + "testing" +) + +func TestVersionFromBuildInfo(t *testing.T) { + tests := []struct { + name string + bi *debug.BuildInfo + ok bool + want string + }{ + {"no build info", nil, false, fallbackVersion}, + { + "main module tagged", + &debug.BuildInfo{Main: debug.Module{Path: modulePath, Version: "v1.9.3"}}, + true, + "1.9.3", + }, + { + "main module devel falls back", + &debug.BuildInfo{Main: debug.Module{Path: modulePath, Version: "(devel)"}}, + true, + fallbackVersion, + }, + { + "consumed as dependency", + &debug.BuildInfo{ + Main: debug.Module{Path: "github.com/goceleris/probatorium", Version: "(devel)"}, + Deps: []*debug.Module{ + {Path: "github.com/HdrHistogram/hdrhistogram-go", Version: "v1.2.0"}, + {Path: modulePath, Version: "v1.4.8"}, + }, + }, + true, + "1.4.8", + }, + { + "dependency with versioned replace", + &debug.BuildInfo{ + Main: debug.Module{Path: "github.com/goceleris/probatorium"}, + Deps: []*debug.Module{ + {Path: modulePath, Version: "v1.4.7", Replace: &debug.Module{Path: modulePath, Version: "v1.4.7-hotfix"}}, + }, + }, + true, + "1.4.7-hotfix", + }, + { + "dependency with directory replace falls back", + &debug.BuildInfo{ + Main: debug.Module{Path: "github.com/goceleris/probatorium"}, + Deps: []*debug.Module{ + {Path: modulePath, Version: "v1.4.7", Replace: &debug.Module{Path: "../loadgen"}}, + }, + }, + true, + fallbackVersion, + }, + { + "unrelated build falls back", + &debug.BuildInfo{Main: debug.Module{Path: "example.com/other", Version: "v9.9.9"}}, + true, + fallbackVersion, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(st *testing.T) { + if got := versionFromBuildInfo(tt.bi, tt.ok); got != tt.want { + st.Errorf("versionFromBuildInfo() = %q, want %q", got, tt.want) + } + }) + } +} + +func TestVersionNonEmpty(t *testing.T) { + if Version == "" { + t.Fatal("Version must never be empty") + } + if Version == "(devel)" { + t.Fatalf("Version = %q — (devel) must fall back to the release constant", Version) + } +} From 4a6c18ad23cc5685ec909540ee4502943dd186aa Mon Sep 17 00:00:00 2001 From: Albert Bausili Date: Thu, 11 Jun 2026 22:44:27 +0200 Subject: [PATCH 2/8] feat(dial): shared reconnect backoff, fail-fast tracker, connect-error class counter Groundwork for the v3.8 dial-storm fixes: a jittered exponential backoff (10ms doubling to a 1s cap, abortable via ctx or a close channel), a fail-fast tracker that turns a never-connected 5s failure streak into a fatal ErrNeverConnected error shaped like the h1client pre-dial failure ("loadgen: dial: ...: connection refused"), and a process-global connect-error counter so dial/handshake failures are countable as their own class alongside the untyped errors total. --- backoff.go | 116 ++++++++++++++++++++++++++++++++++++++++++ backoff_test.go | 132 ++++++++++++++++++++++++++++++++++++++++++++++++ dial.go | 13 +++++ 3 files changed, 261 insertions(+) create mode 100644 backoff.go create mode 100644 backoff_test.go diff --git a/backoff.go b/backoff.go new file mode 100644 index 0000000..f5c4601 --- /dev/null +++ b/backoff.go @@ -0,0 +1,116 @@ +package loadgen + +import ( + "context" + "errors" + "fmt" + "math/rand/v2" + "sync" + "time" +) + +// Reconnect backoff bounds: the first retry waits ~reconnectBackoffMin, +// doubling per consecutive failure up to reconnectBackoffMax. Sleeps are +// jittered uniformly over [d/2, d] so a fleet of workers does not redial in +// lockstep against a recovering server. v3.8 post-mortem: without pacing, +// the ws/sse drivers redialled a dead port at ~386k dials/sec (34.7M errors +// per 90s cell). +const ( + reconnectBackoffMin = 10 * time.Millisecond + reconnectBackoffMax = time.Second +) + +// failFastWindow is how long a streaming client (ws/sse) tolerates +// nothing-but-connect-failures before declaring the target dead. It only +// applies while NO stream was ever established — a client that had live +// streams keeps retrying with backoff indefinitely (the server may come +// back mid-cell). +const failFastWindow = 5 * time.Second + +// ErrNeverConnected marks the fatal fail-fast condition: every connect +// attempt failed and not a single stream was ever established within +// failFastWindow. Benchmarker.Run aborts the whole run when a driver +// surfaces an error wrapping this, so a harness can classify the cell as +// did-not-finish instead of burning the full duration against a dead port. +var ErrNeverConnected = errors.New("no stream ever established") + +// connectBackoff is per-connection (or per-worker) retry pacing state. +// Not safe for concurrent use — each instance is owned by one goroutine. +type connectBackoff struct { + next time.Duration +} + +// sleep blocks for the current backoff interval (jittered down to half) and +// doubles the interval for the following call, capped at +// reconnectBackoffMax. It returns early — reporting false — when ctx is +// cancelled or abort is closed, so Close() and run shutdown are never stuck +// behind a sleep. A nil abort channel is valid and never fires. +func (b *connectBackoff) sleep(ctx context.Context, abort <-chan struct{}) bool { + d := b.next + if d <= 0 { + d = reconnectBackoffMin + } + b.next = min(d*2, reconnectBackoffMax) + + d = d/2 + rand.N(d/2+1) // uniform in [d/2, d] + t := time.NewTimer(d) + defer t.Stop() + select { + case <-ctx.Done(): + return false + case <-abort: + return false + case <-t.C: + return true + } +} + +// reset returns the backoff to its initial interval after a successful +// connect. +func (b *connectBackoff) reset() { b.next = 0 } + +// failFastTracker decides when a streaming client that has NEVER had a live +// stream should give up on the whole run. One instance is shared by all +// workers of a client; any single established stream disarms it permanently. +type failFastTracker struct { + window time.Duration + + mu sync.Mutex + everConnected bool + failingSince time.Time // zero when there is no active failure streak +} + +func newFailFastTracker(window time.Duration) *failFastTracker { + return &failFastTracker{window: window} +} + +// success records an established stream; fail-fast is permanently disarmed. +func (f *failFastTracker) success() { + f.mu.Lock() + f.everConnected = true + f.failingSince = time.Time{} + f.mu.Unlock() +} + +// failure records a connect failure observed at now. When the client never +// had a single live stream and the failure streak spans the configured +// window, it returns a fatal error wrapping both err and ErrNeverConnected; +// otherwise nil (caller backs off and retries). +func (f *failFastTracker) failure(now time.Time, err error) error { + f.mu.Lock() + defer f.mu.Unlock() + if f.everConnected { + return nil + } + if f.failingSince.IsZero() { + f.failingSince = now + return nil + } + if now.Sub(f.failingSince) < f.window { + return nil + } + // Message shape mirrors the h1client pre-dial failure + // ("loadgen: dial: ...: connection refused") so harness-side + // classification treats the two identically. + return fmt.Errorf("loadgen: dial: %w (fail-fast: %w within %v)", err, ErrNeverConnected, f.window) +} diff --git a/backoff_test.go b/backoff_test.go new file mode 100644 index 0000000..a00b2f1 --- /dev/null +++ b/backoff_test.go @@ -0,0 +1,132 @@ +package loadgen + +import ( + "context" + "errors" + "strings" + "testing" + "time" +) + +// TestConnectBackoffDoublesAndCaps walks the ladder with a pre-cancelled +// context so no real sleeping happens — only the interval progression is +// under test: 10ms doubling per call, capped at 1s. +func TestConnectBackoffDoublesAndCaps(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + var bo connectBackoff + want := []time.Duration{ + 20 * time.Millisecond, 40 * time.Millisecond, 80 * time.Millisecond, + 160 * time.Millisecond, 320 * time.Millisecond, 640 * time.Millisecond, + time.Second, time.Second, time.Second, + } + for i, w := range want { + if bo.sleep(ctx, nil) { + t.Fatalf("call %d: sleep returned true with cancelled ctx", i) + } + if bo.next != w { + t.Fatalf("call %d: next = %v, want %v", i, bo.next, w) + } + } + + bo.reset() + if bo.next != 0 { + t.Fatalf("after reset: next = %v, want 0", bo.next) + } +} + +// TestConnectBackoffSleepsJittered pins the interval and checks the actual +// sleep lands in the jitter band [d/2, d] (with CI-noise headroom above). +func TestConnectBackoffSleepsJittered(t *testing.T) { + bo := connectBackoff{next: 80 * time.Millisecond} + start := time.Now() + if !bo.sleep(context.Background(), nil) { + t.Fatal("sleep aborted without cancellation") + } + elapsed := time.Since(start) + if elapsed < 40*time.Millisecond { + t.Fatalf("slept %v, want >= 40ms (lower jitter bound)", elapsed) + } + if elapsed > 300*time.Millisecond { + t.Fatalf("slept %v, want ~<= 80ms (upper jitter bound + CI noise)", elapsed) + } +} + +// TestConnectBackoffAbort verifies both abort paths — context cancellation +// and the abort channel (Close()) — cut a long sleep short. +func TestConnectBackoffAbort(t *testing.T) { + t.Run("context", func(st *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) + defer cancel() + bo := connectBackoff{next: time.Second} + start := time.Now() + if bo.sleep(ctx, nil) { + st.Fatal("sleep completed despite ctx cancellation") + } + if elapsed := time.Since(start); elapsed > 400*time.Millisecond { + st.Fatalf("aborted sleep took %v, want well under the 0.5-1s interval", elapsed) + } + }) + + t.Run("abort channel", func(st *testing.T) { + abort := make(chan struct{}) + go func() { + time.Sleep(20 * time.Millisecond) + close(abort) + }() + bo := connectBackoff{next: time.Second} + start := time.Now() + if bo.sleep(context.Background(), abort) { + st.Fatal("sleep completed despite abort channel close") + } + if elapsed := time.Since(start); elapsed > 400*time.Millisecond { + st.Fatalf("aborted sleep took %v, want well under the 0.5-1s interval", elapsed) + } + }) +} + +func TestFailFastTracker(t *testing.T) { + t0 := time.Now() + dialErr := errors.New("dial tcp 127.0.0.1:1: connect: connection refused") + + t.Run("trips after window with no success", func(st *testing.T) { + f := newFailFastTracker(5 * time.Second) + if err := f.failure(t0, dialErr); err != nil { + st.Fatalf("first failure must arm, not trip: %v", err) + } + if err := f.failure(t0.Add(4*time.Second), dialErr); err != nil { + st.Fatalf("failure inside window must not trip: %v", err) + } + err := f.failure(t0.Add(5*time.Second), dialErr) + if err == nil { + st.Fatal("expected fatal error once the streak spans the window") + } + if !errors.Is(err, ErrNeverConnected) { + st.Errorf("fatal error must wrap ErrNeverConnected: %v", err) + } + if !errors.Is(err, dialErr) { + st.Errorf("fatal error must wrap the last dial error: %v", err) + } + // Shape must match the h1client pre-dial failure so the harness + // classifies both as dnf. + if !strings.Contains(err.Error(), "loadgen: dial: ") || + !strings.Contains(err.Error(), "connection refused") { + st.Errorf("fatal error shape mismatch: %q", err.Error()) + } + }) + + t.Run("any success disarms permanently", func(st *testing.T) { + f := newFailFastTracker(5 * time.Second) + if err := f.failure(t0, dialErr); err != nil { + st.Fatal(err) + } + f.success() + if err := f.failure(t0.Add(time.Hour), dialErr); err != nil { + st.Fatalf("post-success failure streak must never trip: %v", err) + } + if err := f.failure(t0.Add(2*time.Hour), dialErr); err != nil { + st.Fatalf("post-success failure streak must never trip: %v", err) + } + }) +} diff --git a/dial.go b/dial.go index 192d7c8..4219b59 100644 --- a/dial.go +++ b/dial.go @@ -16,6 +16,19 @@ import ( // the run completes. var dialRetriesCounter atomic.Uint64 +// connectErrorsCounter counts dial/handshake failures (TCP connect, TLS, +// WS/SSE upgrade, H1 reconnect dials) as their own error class, separate +// from the per-request errors total. Process-global for the same reason as +// dialRetriesCounter; drained into Result.ConnectErrors / WarmupStats via +// Swap at the end of each phase. +var connectErrorsCounter atomic.Uint64 + +func recordConnectError() { connectErrorsCounter.Add(1) } + +// snapshotConnectErrors returns the current connect-error count and resets +// the global counter, scoping the value to the phase that just ended. +func snapshotConnectErrors() uint64 { return connectErrorsCounter.Swap(0) } + // maxDialAttempts is the total number of TCP SYN attempts per dial, // including the first. Only transient SYN-RST responses trigger a // retry; every other error fails fast. From e26bc797e1205cb81906c7a07076f8a46218c1a6 Mon Sep 17 00:00:00 2001 From: Albert Bausili Date: Thu, 11 Jun 2026 22:49:29 +0200 Subject: [PATCH 3/8] feat(bench): fatal fail-fast abort, ConnectErrors surfacing, warmup visibility MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Benchmarker now aborts the whole run (warmup or measured phase) the moment a worker surfaces an ErrNeverConnected-wrapped error, instead of burning the configured Duration against a dead target; Run returns the fatal dial error so callers can classify the cell as dnf. - Result.ConnectErrors and TimeseriesPoint.ConnectErrors expose the dial/handshake-failure error class (additive JSON; existing consumers keep parsing Errors unchanged). Producers wired in follow-up commits. - Result.Warmup (WarmupStats) snapshots warmup requests/errors/connect errors before the pre-run counter reset, and the CLI logs a warmup summary plus an explicit warning when zero warmup requests succeeded — a 100%-failing warmup was previously invisible. --- bench.go | 86 ++++++++++++++++++++++++- benchmarker_test.go | 151 ++++++++++++++++++++++++++++++++++++++++++++ cmd/loadgen/main.go | 7 ++ results.go | 31 ++++++++- 4 files changed, 272 insertions(+), 3 deletions(-) diff --git a/bench.go b/bench.go index 1e32592..a09e928 100644 --- a/bench.go +++ b/bench.go @@ -329,11 +329,43 @@ type Benchmarker struct { // Latency tracking (also holds per-shard request/bytes counters) latencies *ShardedLatencyRecorder + // Fatal abort: the first ErrNeverConnected-wrapped failure a worker + // observes lands in fatalErr and closes fatalCh, so Run/warmup stop + // waiting out the clock instead of burning the full duration against + // a dead target. + fatalOnce sync.Once + fatalErr error + fatalCh chan struct{} + + // warmupStats is captured at the end of the warmup phase (before + // counters reset) and surfaced on Result.Warmup. + warmupStats *WarmupStats + // Control running atomic.Bool wg sync.WaitGroup } +// recordFatal stores the first fatal error and signals Run/warmup to stop +// waiting out the clock. Later calls are no-ops. +func (b *Benchmarker) recordFatal(err error) { + b.fatalOnce.Do(func() { + b.fatalErr = err + close(b.fatalCh) + }) +} + +// fatalError returns the recorded fatal error, or nil. The channel-close +// check gives the necessary happens-before edge for reading fatalErr. +func (b *Benchmarker) fatalError() error { + select { + case <-b.fatalCh: + return b.fatalErr + default: + return nil + } +} + // parseURL extracts host, port, path, and scheme from a raw URL. func parseURL(rawURL string) (host, port, path, scheme string, err error) { u, err := url.Parse(rawURL) @@ -411,6 +443,7 @@ func New(cfg Config) (*Benchmarker, error) { config: cfg, raw: cfg.Client, latencies: NewShardedLatencyRecorder(cfg.Workers, flushInterval), + fatalCh: make(chan struct{}), }, nil } @@ -453,6 +486,7 @@ func New(cfg Config) (*Benchmarker, error) { config: cfg, raw: raw, latencies: NewShardedLatencyRecorder(cfg.Workers, flushInterval), + fatalCh: make(chan struct{}), } if mc, ok := raw.(*mixClient); ok { b.mix = mc @@ -469,9 +503,24 @@ func New(cfg Config) (*Benchmarker, error) { // outcome. Federation is best-effort: failure to reach the peer is // recorded on Result.Federation but does not fail the whole run. func (b *Benchmarker) Run(ctx context.Context) (*Result, error) { + // Scope the global connect-error counter to this run. + connectErrorsCounter.Store(0) + // Warmup phase if b.config.Warmup > 0 { b.warmup(ctx) + // Snapshot warmup outcomes before the counters reset for the + // measured run — a 100%-failing warmup must stay observable. + b.latencies.FlushLocal() + warmupReqs, _ := b.latencies.Totals() + b.warmupStats = &WarmupStats{ + Requests: warmupReqs, + Errors: b.errors.Load(), + ConnectErrors: snapshotConnectErrors(), + } + if ferr := b.fatalError(); ferr != nil { + return nil, ferr + } } // Reset metrics for actual benchmark @@ -549,6 +598,7 @@ func (b *Benchmarker) Run(ctx context.Context) (*Result, error) { var timeseries []TimeseriesPoint var prevReqs int64 var prevErrors int64 + var prevConnErrors uint64 // Start workers — each gets a unique workerID for connection partitioning b.running.Store(true) @@ -598,13 +648,16 @@ func (b *Benchmarker) Run(ctx context.Context) (*Result, error) { prevReqs = reqs p99 := b.latencies.SnapshotWindowP99Ms() curErr := b.errors.Load() + curConnErr := connectErrorsCounter.Load() timeseries = append(timeseries, TimeseriesPoint{ TimestampSec: elapsed, RequestsPerSec: float64(deltaReqs), // 1-second window P99Ms: p99, Errors: curErr - prevErrors, + ConnectErrors: int64(curConnErr - prevConnErrors), }) prevErrors = curErr + prevConnErrors = curConnErr if b.config.OnProgress != nil { snapshot := Result{ Requests: reqs, @@ -619,10 +672,11 @@ func (b *Benchmarker) Run(ctx context.Context) (*Result, error) { } }() - // Wait for duration + // Wait for duration — or a fatal fail-fast abort from a worker. select { case <-ctx.Done(): case <-time.After(b.config.Duration): + case <-b.fatalCh: } b.running.Store(false) @@ -661,6 +715,16 @@ func (b *Benchmarker) Run(ctx context.Context) (*Result, error) { result.RecvQHigh = recvqProbe.Stop() } + // A fatal fail-fast abort overrides the (empty) partial result so the + // caller can classify the run as did-not-finish rather than reading a + // zero-request Result as "ran clean with no traffic". + if ferr := b.fatalError(); ferr != nil { + if fed != nil { + _ = fed.Close() + } + return nil, ferr + } + // Collect peer histogram if federation is active. if fed != nil { local := b.latencies.MergedHistogram() @@ -707,8 +771,14 @@ func (b *Benchmarker) warmup(ctx context.Context) { b.wg.Go(func() { b.worker(warmupCtx, workerID) }) } - <-warmupCtx.Done() + // A fatal fail-fast abort cuts the warmup short; Run surfaces the + // error after capturing warmup stats. + select { + case <-warmupCtx.Done(): + case <-b.fatalCh: + } b.running.Store(false) + cancel() // unblock workers stuck in dials or backoff sleeps b.wg.Wait() } @@ -801,6 +871,10 @@ func (b *Benchmarker) ratedWorker(ctx context.Context, workerID int, slots <-cha if !b.running.Load() && ctx.Err() != nil { return } + if errors.Is(err, ErrNeverConnected) { + b.recordFatal(err) + return + } b.errors.Add(1) if b.mix != nil { b.mix.recordError(workerID) @@ -849,6 +923,12 @@ func (b *Benchmarker) worker(ctx context.Context, workerID int) { if !b.running.Load() && ctx.Err() != nil { return } + // A fail-fast abort from a streaming driver kills the whole + // run; the attempts leading up to it are already counted. + if errors.Is(err, ErrNeverConnected) { + b.recordFatal(err) + return + } b.errors.Add(1) if b.mix != nil { b.mix.recordError(workerID) @@ -875,6 +955,8 @@ func (b *Benchmarker) buildResult(elapsed time.Duration) *Result { ThroughputBPS: throughput, Latency: b.latencies.Percentiles(), DialRetries: snapshotDialRetries(), + ConnectErrors: snapshotConnectErrors(), + Warmup: b.warmupStats, } if hist, err := b.latencies.EncodeHistogram(); err == nil { diff --git a/benchmarker_test.go b/benchmarker_test.go index 0bf948a..c087df7 100644 --- a/benchmarker_test.go +++ b/benchmarker_test.go @@ -2,6 +2,7 @@ package loadgen import ( "context" + "errors" "fmt" "net" "net/http" @@ -95,6 +96,156 @@ func TestBenchmarkerWarmup(t *testing.T) { t.Logf("warmup: elapsed=%v, result.Duration=%v, requests=%d", elapsed, result.Duration, result.Requests) } +func TestBenchmarkerWarmupStats(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(200) + _, _ = w.Write([]byte("OK")) + })) + defer srv.Close() + + cfg := Config{ + URL: srv.URL, + Method: "GET", + Duration: 500 * time.Millisecond, + Connections: 2, + Workers: 2, + Warmup: 500 * time.Millisecond, + CPUMonitor: false, + RecvQProbe: false, + } + b, err := New(cfg) + if err != nil { + t.Fatal(err) + } + result, err := b.Run(context.Background()) + if err != nil { + t.Fatal(err) + } + + if result.Warmup == nil { + t.Fatal("expected Result.Warmup to be populated when Warmup > 0") + } + if result.Warmup.Requests == 0 { + t.Error("expected warmup requests > 0 against a healthy server") + } + if result.Warmup.Errors != 0 { + t.Errorf("unexpected warmup errors: %d", result.Warmup.Errors) + } + // Warmup traffic must not leak into the measured totals. + if result.Requests == 0 { + t.Error("expected measured requests > 0") + } + t.Logf("warmup stats: %d ok, %d errors", result.Warmup.Requests, result.Warmup.Errors) +} + +func TestBenchmarkerNoWarmupNoStats(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(200) + })) + defer srv.Close() + + cfg := Config{ + URL: srv.URL, + Method: "GET", + Duration: 300 * time.Millisecond, + Connections: 1, + Workers: 1, + Warmup: 0, + CPUMonitor: false, + RecvQProbe: false, + } + b, err := New(cfg) + if err != nil { + t.Fatal(err) + } + result, err := b.Run(context.Background()) + if err != nil { + t.Fatal(err) + } + if result.Warmup != nil { + t.Errorf("expected nil Result.Warmup when Warmup == 0, got %+v", result.Warmup) + } +} + +// fatalClient returns an ErrNeverConnected-wrapped error from every +// DoRequest, simulating a streaming driver whose fail-fast tripped. +type fatalClient struct{} + +func (f *fatalClient) DoRequest(_ context.Context, _ int) (int, error) { + return 0, fmt.Errorf("loadgen: dial: dial tcp 127.0.0.1:1: connect: connection refused (fail-fast: %w within 5s)", ErrNeverConnected) +} + +func (f *fatalClient) Close() {} + +// TestBenchmarkerFatalAbortsRun verifies a worker surfacing an +// ErrNeverConnected error aborts the run immediately instead of waiting out +// the full Duration, and that Run reports the fatal error. +func TestBenchmarkerFatalAbortsRun(t *testing.T) { + cfg := Config{ + URL: "http://127.0.0.1:1/", + Method: "GET", + Duration: 30 * time.Second, // must NOT be waited out + Connections: 2, + Workers: 2, + Warmup: 0, + Client: &fatalClient{}, + CPUMonitor: false, + RecvQProbe: false, + } + b, err := New(cfg) + if err != nil { + t.Fatal(err) + } + + start := time.Now() + result, err := b.Run(context.Background()) + elapsed := time.Since(start) + + if err == nil { + t.Fatal("expected Run to return the fatal error") + } + if !errors.Is(err, ErrNeverConnected) { + t.Errorf("Run error must wrap ErrNeverConnected: %v", err) + } + if result != nil { + t.Errorf("expected nil result on fatal abort, got %+v", result) + } + if elapsed > 10*time.Second { + t.Errorf("fatal abort took %v — Run waited out the duration instead of aborting", elapsed) + } +} + +// TestBenchmarkerFatalAbortsWarmup verifies a fatal error during the warmup +// phase aborts the run before the measured phase even starts. +func TestBenchmarkerFatalAbortsWarmup(t *testing.T) { + cfg := Config{ + URL: "http://127.0.0.1:1/", + Method: "GET", + Duration: 30 * time.Second, + Connections: 2, + Workers: 2, + Warmup: 30 * time.Second, // must NOT be waited out either + Client: &fatalClient{}, + CPUMonitor: false, + RecvQProbe: false, + } + b, err := New(cfg) + if err != nil { + t.Fatal(err) + } + + start := time.Now() + _, err = b.Run(context.Background()) + elapsed := time.Since(start) + + if err == nil || !errors.Is(err, ErrNeverConnected) { + t.Fatalf("expected ErrNeverConnected from warmup abort, got: %v", err) + } + if elapsed > 10*time.Second { + t.Errorf("warmup fatal abort took %v — warmup was waited out instead of aborting", elapsed) + } +} + func TestBenchmarkerContextCancellation(t *testing.T) { srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(200) diff --git a/cmd/loadgen/main.go b/cmd/loadgen/main.go index deb2a1d..6561ac9 100644 --- a/cmd/loadgen/main.go +++ b/cmd/loadgen/main.go @@ -185,6 +185,13 @@ func main() { log.Fatalf("benchmark failed: %v", err) } + if result.Warmup != nil { + fmt.Fprintf(os.Stderr, "warmup: %d ok, %d errors (%d connect)\n", + result.Warmup.Requests, result.Warmup.Errors, result.Warmup.ConnectErrors) + if result.Warmup.Requests == 0 && result.Warmup.Errors > 0 { + fmt.Fprintln(os.Stderr, "warmup: WARNING — no request succeeded during warmup; the target may be down") + } + } if result.Upgrade != nil { // UpgradeAttempted == UpgradeSucceeded by construction today: // newH2ClientWithDialer fails the whole run on the first dial error, diff --git a/results.go b/results.go index f452cd3..703b660 100644 --- a/results.go +++ b/results.go @@ -10,6 +10,11 @@ type TimeseriesPoint struct { RequestsPerSec float64 `json:"rps"` P99Ms float64 `json:"p99_ms,omitempty"` Errors int64 `json:"errors,omitempty"` + + // ConnectErrors is the dial/handshake-failure subset of Errors that + // landed in this 1-second window. Lets a consumer tell "server + // answering with errors" from "server gone" per bucket. + ConnectErrors int64 `json:"connect_errors,omitempty"` } // Result holds the benchmark results. @@ -23,7 +28,7 @@ type Result struct { // LoadgenVersion records which loadgen build produced this run, so // downstream consumers (probatorium) can attribute a Result to a - // specific release. Stamped from the Version constant by buildResult. + // specific release. Stamped from Version by buildResult. LoadgenVersion string `json:"loadgen_version,omitempty"` ClientCPUPercent float64 `json:"client_cpu_percent,omitempty"` @@ -48,6 +53,20 @@ type Result struct { // retried — the peer may still want to investigate. DialRetries uint64 `json:"dial_retries,omitempty"` + // ConnectErrors counts dial/handshake failures (TCP connect, TLS, + // WS/SSE upgrade, H1 reconnect dials) observed during the measured + // run. A separate error class from Errors (which lumps every failed + // request together): Errors ≈ ConnectErrors means the server was + // unreachable, not misbehaving. Additive — existing consumers keep + // parsing Errors unchanged. + ConnectErrors uint64 `json:"connect_errors,omitempty"` + + // Warmup, when non-nil, summarises the warmup phase, whose counters + // are otherwise reset away before the measured run begins. A warmup + // with zero requests and nonzero errors means the target was never + // healthy — previously invisible. Omitted when Warmup is 0. + Warmup *WarmupStats `json:"warmup,omitempty"` + // Histogram is the V2-compressed HdrHistogram payload covering the // full latency distribution recorded during the run. Range covers // 1µs through 30s with 3 significant digits (~0.1% precision). @@ -100,6 +119,16 @@ type FederationStats struct { MergeError string `json:"merge_error,omitempty"` } +// WarmupStats reports what happened during the warmup phase: successful +// requests, total errors, and the dial/handshake-failure subset of those +// errors. Surfaced on Result.Warmup so a 100%-failing warmup is observable +// instead of being silently reset before the measured run. +type WarmupStats struct { + Requests int64 `json:"requests"` + Errors int64 `json:"errors"` + ConnectErrors uint64 `json:"connect_errors,omitempty"` +} + // UpgradeStats summarises the outcome of h2c-upgrade handshakes across all // dialled connections. UpgradeAttempted is the number of conns loadgen tried // to dial; UpgradeSucceeded is the number that completed the 101 Switching From 7bcd3109d9070624e0cb348c44db7eca4c7b443d Mon Sep 17 00:00:00 2001 From: Albert Bausili Date: Thu, 11 Jun 2026 22:55:26 +0200 Subject: [PATCH 4/8] fix(ws,sse): backoff-paced, abortable dials with never-connected fail-fast v3.8 post-mortem: when the SUT died, the ws/sse drivers redialled the dead port in a hot loop (~386k dials/sec, 34.7M errors per 90s cell) and a cell that never had a server burned its full duration before being misclassified. - Connect failures now pace per-worker via the shared jittered exponential backoff (10ms doubling to 1s) instead of redialling at full speed. - Dials and the upgrade/GET handshake honor ctx cancellation and Config.DialTimeout (both previously ignored mid-dial), and Close() now aborts in-flight dials and backoff sleeps via a client close context. - If NO stream was ever established and connect attempts have failed for failFastWindow (5s), the driver returns an ErrNeverConnected-wrapped error shaped like the h1client pre-dial failure; Benchmarker.Run aborts so the harness can classify dnf. A client that had live streams keeps retrying with backoff indefinitely. - Each dial failure is recorded in the connect-error class (Result.ConnectErrors / timeseries / warmup stats). --- results.go | 12 +-- sse.go | 113 +++++++++++++++++++----- sse_test.go | 160 ++++++++++++++++++++++++++++++++++ ws.go | 124 +++++++++++++++++++++------ ws_test.go | 241 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 599 insertions(+), 51 deletions(-) diff --git a/results.go b/results.go index 703b660..a342445 100644 --- a/results.go +++ b/results.go @@ -11,9 +11,9 @@ type TimeseriesPoint struct { P99Ms float64 `json:"p99_ms,omitempty"` Errors int64 `json:"errors,omitempty"` - // ConnectErrors is the dial/handshake-failure subset of Errors that - // landed in this 1-second window. Lets a consumer tell "server - // answering with errors" from "server gone" per bucket. + // ConnectErrors counts the dial/handshake failures that landed in + // this 1-second window. Lets a consumer tell "server answering with + // errors" from "server gone" per bucket. ConnectErrors int64 `json:"connect_errors,omitempty"` } @@ -57,8 +57,10 @@ type Result struct { // WS/SSE upgrade, H1 reconnect dials) observed during the measured // run. A separate error class from Errors (which lumps every failed // request together): Errors ≈ ConnectErrors means the server was - // unreachable, not misbehaving. Additive — existing consumers keep - // parsing Errors unchanged. + // unreachable, not misbehaving. Counted at the driver, so it can + // differ from the matching Errors increments by a few attempts cut + // off at phase boundaries. Additive — existing consumers keep parsing + // Errors unchanged. ConnectErrors uint64 `json:"connect_errors,omitempty"` // Warmup, when non-nil, summarises the warmup phase, whose counters diff --git a/sse.go b/sse.go index afb7a3a..0a9e5f9 100755 --- a/sse.go +++ b/sse.go @@ -9,6 +9,7 @@ import ( "net" "strings" "sync" + "time" ) const sseMode = "sse-fanout" @@ -19,14 +20,26 @@ const sseMode = "sse-fanout" // therefore becomes the recorded latency — one DoRequest == one event through // the existing worker/RecordSuccess/timeseries pipeline. type sseClient struct { - host string // host:port to dial - path string // request path, e.g. /events - tls bool - insecure bool - headers map[string]string - - mu sync.Mutex - conns map[int]*sseConn + host string // host:port to dial + path string // request path, e.g. /events + tls bool + insecure bool + headers map[string]string + dialTimeout time.Duration + + // closeCtx is cancelled by Close() so in-flight dials, handshakes and + // backoff sleeps abort instead of outliving the client. + closeCtx context.Context + closeStop context.CancelFunc + + // failFast aborts the run when no stream was EVER established and + // connect attempts have failed for a sustained window. A client that + // had live streams retries with backoff forever. + failFast *failFastTracker + + mu sync.Mutex + conns map[int]*sseConn + backoffs map[int]*connectBackoff // per-worker reconnect pacing } func newSSEClient(host, port, path string, cfg Config) (Client, error) { @@ -36,18 +49,24 @@ func newSSEClient(host, port, path string, cfg Config) (Client, error) { if path == "" { path = "/" } + closeCtx, closeStop := context.WithCancel(context.Background()) return &sseClient{ - host: net.JoinHostPort(host, port), - path: path, - tls: cfg.scheme == "https", - insecure: cfg.InsecureSkipVerify, - headers: cfg.Headers, - conns: make(map[int]*sseConn), + host: net.JoinHostPort(host, port), + path: path, + tls: cfg.scheme == "https", + insecure: cfg.InsecureSkipVerify, + headers: cfg.Headers, + dialTimeout: cfg.DialTimeout, + closeCtx: closeCtx, + closeStop: closeStop, + failFast: newFailFastTracker(failFastWindow), + conns: make(map[int]*sseConn), + backoffs: make(map[int]*connectBackoff), }, nil } func (c *sseClient) DoRequest(ctx context.Context, workerID int) (int, error) { - conn, err := c.conn(workerID) + conn, err := c.conn(ctx, workerID) if err != nil { return 0, err } @@ -63,6 +82,7 @@ func (c *sseClient) DoRequest(ctx context.Context, workerID int) (int, error) { } func (c *sseClient) Close() { + c.closeStop() // abort in-flight dials, handshakes, and backoff sleeps c.mu.Lock() conns := c.conns c.conns = make(map[int]*sseConn) @@ -72,20 +92,45 @@ func (c *sseClient) Close() { } } -func (c *sseClient) conn(workerID int) (*sseConn, error) { +func (c *sseClient) conn(ctx context.Context, workerID int) (*sseConn, error) { c.mu.Lock() if conn, ok := c.conns[workerID]; ok { c.mu.Unlock() return conn, nil } + bo := c.backoffs[workerID] + if bo == nil { + bo = &connectBackoff{} + c.backoffs[workerID] = bo + } c.mu.Unlock() - conn, err := c.dial() + conn, err := c.dial(ctx) if err != nil { + if ctx.Err() != nil || c.closeCtx.Err() != nil { + return nil, err // shutdown, not a server failure + } + recordConnectError() + if fatal := c.failFast.failure(time.Now(), err); fatal != nil { + return nil, fatal + } + // One attempt per DoRequest: sleep the (growing) backoff before + // surfacing the error so a dead server cannot induce a redial hot + // loop, then let the worker loop call back in. + bo.sleep(ctx, c.closeCtx.Done()) return nil, err } + c.failFast.success() + bo.reset() c.mu.Lock() + if c.closeCtx.Err() != nil { + // Close() won the race while we were handshaking; don't leak the + // conn into a map nobody will drain. + c.mu.Unlock() + conn.close() + return nil, c.closeCtx.Err() + } c.conns[workerID] = conn c.mu.Unlock() return conn, nil @@ -103,30 +148,58 @@ func (c *sseClient) drop(workerID int) { } } -func (c *sseClient) dial() (*sseConn, error) { +// dial opens one SSE stream. The whole sequence — TCP/TLS connect plus the +// GET handshake — is bounded by DialTimeout and aborts on ctx cancellation +// or Close(). +func (c *sseClient) dial(ctx context.Context) (*sseConn, error) { + var cancel context.CancelFunc + if c.dialTimeout > 0 { + ctx, cancel = context.WithTimeout(ctx, c.dialTimeout) + } else { + ctx, cancel = context.WithCancel(ctx) + } + defer cancel() + // Propagate Close() into the dial context (and from there into the + // in-flight connect and handshake I/O below). + stopClose := context.AfterFunc(c.closeCtx, cancel) + defer stopClose() + var ( raw net.Conn err error ) if c.tls { - raw, err = tls.Dial("tcp", c.host, &tls.Config{InsecureSkipVerify: c.insecure}) //nolint:gosec // InsecureSkipVerify is opt-in for self-signed test targets + d := &tls.Dialer{Config: &tls.Config{InsecureSkipVerify: c.insecure}} //nolint:gosec // InsecureSkipVerify is opt-in for self-signed test targets + raw, err = d.DialContext(ctx, "tcp", c.host) } else { - raw, err = net.Dial("tcp", c.host) + var d net.Dialer + raw, err = d.DialContext(ctx, "tcp", c.host) } if err != nil { return nil, err } + // The GET handshake is plain conn I/O with no deadline of its own; + // closing the socket when ctx fires keeps it bounded too. + stopIO := context.AfterFunc(ctx, func() { _ = raw.Close() }) + if _, err := raw.Write([]byte(sseGetRequest(c.host, c.path, c.headers))); err != nil { + stopIO() _ = raw.Close() return nil, err } br := bufio.NewReader(raw) if err := sseReadHandshake(br); err != nil { + stopIO() _ = raw.Close() return nil, err } + if !stopIO() { + // ctx fired while the handshake was completing; raw is closed (or + // about to be) — treat as a failed dial. + return nil, ctx.Err() + } return &sseConn{raw: raw, br: br}, nil } diff --git a/sse_test.go b/sse_test.go index de31ab5..ffe857a 100644 --- a/sse_test.go +++ b/sse_test.go @@ -2,7 +2,9 @@ package loadgen import ( "context" + "errors" "fmt" + "net" "net/http" "net/http/httptest" "net/url" @@ -90,6 +92,164 @@ func TestSSERejectsNonStream(t *testing.T) { } } +// TestSSEFailFastNeverConnected drives the driver directly against a +// refusing port with a shrunken window: attempts must be backoff-paced and +// end in an ErrNeverConnected-wrapped fatal error. +func TestSSEFailFastNeverConnected(t *testing.T) { + host, port := refusedAddr(t) + + cfg := Config{URL: "http://" + net.JoinHostPort(host, port) + "/events", Mode: sseMode, scheme: "http"} + c, err := newSSEClient(host, port, "/events", cfg) + if err != nil { + t.Fatal(err) + } + defer c.Close() + window := 300 * time.Millisecond + c.(*sseClient).failFast = newFailFastTracker(window) + + start := time.Now() + attempts := 0 + for { + _, err := c.DoRequest(context.Background(), 0) + if err == nil { + t.Fatal("expected every DoRequest to fail against a refusing port") + } + attempts++ + if errors.Is(err, ErrNeverConnected) { + break + } + if attempts > 200 { + t.Fatalf("no fail-fast after %d attempts — dials are hot-looping", attempts) + } + } + if elapsed := time.Since(start); elapsed < window { + t.Errorf("fail-fast tripped after %v, before the %v window elapsed", elapsed, window) + } + t.Logf("fail-fast after %d attempts in %v", attempts, time.Since(start)) +} + +// TestSSECloseAbortsDial parks a dial inside the GET handshake (the server +// accepts but never responds) and verifies Close() aborts it promptly. +func TestSSECloseAbortsDial(t *testing.T) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer func() { _ = ln.Close() }() + var held []net.Conn + defer func() { + for _, conn := range held { + _ = conn.Close() + } + }() + acceptDone := make(chan struct{}) + go func() { + defer close(acceptDone) + for { + conn, err := ln.Accept() + if err != nil { + return + } + held = append(held, conn) // hold open, never answer the GET + } + }() + + host, port, err := net.SplitHostPort(ln.Addr().String()) + if err != nil { + t.Fatal(err) + } + cfg := Config{ + URL: "http://" + ln.Addr().String() + "/events", + Mode: sseMode, + scheme: "http", + DialTimeout: 30 * time.Second, // Close, not the timeout, must abort + } + c, err := newSSEClient(host, port, "/events", cfg) + if err != nil { + t.Fatal(err) + } + + done := make(chan error, 1) + go func() { + _, derr := c.DoRequest(context.Background(), 0) + done <- derr + }() + + time.Sleep(100 * time.Millisecond) // let the dial park in the handshake read + start := time.Now() + c.Close() + + select { + case derr := <-done: + if derr == nil { + t.Error("expected an error from the aborted dial") + } + if elapsed := time.Since(start); elapsed > 2*time.Second { + t.Errorf("Close took %v to abort the in-flight dial", elapsed) + } + case <-time.After(5 * time.Second): + t.Fatal("Close() did not abort the in-flight dial") + } + _ = ln.Close() + <-acceptDone +} + +// TestSSEDialHonorsDialTimeout parks the handshake against a silent server +// and verifies DialTimeout bounds it (the v1.4.7 driver ignored it). +func TestSSEDialHonorsDialTimeout(t *testing.T) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer func() { _ = ln.Close() }() + var held []net.Conn + defer func() { + for _, conn := range held { + _ = conn.Close() + } + }() + acceptDone := make(chan struct{}) + go func() { + defer close(acceptDone) + for { + conn, err := ln.Accept() + if err != nil { + return + } + held = append(held, conn) + } + }() + + host, port, err := net.SplitHostPort(ln.Addr().String()) + if err != nil { + t.Fatal(err) + } + cfg := Config{ + URL: "http://" + ln.Addr().String() + "/events", + Mode: sseMode, + scheme: "http", + DialTimeout: 150 * time.Millisecond, + } + c, err := newSSEClient(host, port, "/events", cfg) + if err != nil { + t.Fatal(err) + } + defer c.Close() + + start := time.Now() + _, derr := c.DoRequest(context.Background(), 0) + elapsed := time.Since(start) + if derr == nil { + t.Fatal("expected the dial to time out against a silent server") + } + // Budget: 150ms timeout + one backoff sleep (≤10ms) + slack. + if elapsed > 2*time.Second { + t.Errorf("DoRequest took %v, want ~150ms (DialTimeout-bounded)", elapsed) + } + _ = ln.Close() + <-acceptDone +} + func TestSSERunIntegration(t *testing.T) { srv := sseTestServer(t, time.Millisecond) defer srv.Close() diff --git a/ws.go b/ws.go index 8cbe0d7..0303ad5 100644 --- a/ws.go +++ b/ws.go @@ -2,6 +2,7 @@ package loadgen import ( "bufio" + "context" "crypto/rand" "crypto/tls" "encoding/base64" @@ -11,8 +12,7 @@ import ( "net" "strings" "sync" - - "context" + "time" ) const ( @@ -39,16 +39,28 @@ var errWSClosed = errors.New("loadgen: websocket connection closed by peer") // blocking broadcast read — one DoRequest == one unit through the existing // worker/RecordSuccess/timeseries pipeline. type wsClient struct { - host string // host:port to dial - path string // request path, e.g. /ws - tls bool - srvMode string // ?mode= value the server understands - mode string // public Config.Mode - payload []byte // outbound frame payload (nil for hub) - insecure bool - - mu sync.Mutex - conns map[int]*wsConn + host string // host:port to dial + path string // request path, e.g. /ws + tls bool + srvMode string // ?mode= value the server understands + mode string // public Config.Mode + payload []byte // outbound frame payload (nil for hub) + insecure bool + dialTimeout time.Duration + + // closeCtx is cancelled by Close() so in-flight dials, handshakes and + // backoff sleeps abort instead of outliving the client. + closeCtx context.Context + closeStop context.CancelFunc + + // failFast aborts the run when no stream was EVER established and + // connect attempts have failed for a sustained window. A client that + // had live streams retries with backoff forever. + failFast *failFastTracker + + mu sync.Mutex + conns map[int]*wsConn + backoffs map[int]*connectBackoff // per-worker reconnect pacing } func newWSClient(host, port, path string, cfg Config) (Client, error) { @@ -75,20 +87,26 @@ func newWSClient(host, port, path string, cfg Config) (Client, error) { } } + closeCtx, closeStop := context.WithCancel(context.Background()) return &wsClient{ - host: net.JoinHostPort(host, port), - path: path, - tls: secure, - srvMode: srvMode, - mode: cfg.Mode, - payload: payload, - insecure: cfg.InsecureSkipVerify, - conns: make(map[int]*wsConn), + host: net.JoinHostPort(host, port), + path: path, + tls: secure, + srvMode: srvMode, + mode: cfg.Mode, + payload: payload, + insecure: cfg.InsecureSkipVerify, + dialTimeout: cfg.DialTimeout, + closeCtx: closeCtx, + closeStop: closeStop, + failFast: newFailFastTracker(failFastWindow), + conns: make(map[int]*wsConn), + backoffs: make(map[int]*connectBackoff), }, nil } func (c *wsClient) DoRequest(ctx context.Context, workerID int) (int, error) { - conn, err := c.conn(workerID) + conn, err := c.conn(ctx, workerID) if err != nil { return 0, err } @@ -106,6 +124,7 @@ func (c *wsClient) DoRequest(ctx context.Context, workerID int) (int, error) { } func (c *wsClient) Close() { + c.closeStop() // abort in-flight dials, handshakes, and backoff sleeps c.mu.Lock() conns := c.conns c.conns = make(map[int]*wsConn) @@ -144,20 +163,45 @@ func (c *wsClient) exchange(conn *wsConn) (int, error) { } } -func (c *wsClient) conn(workerID int) (*wsConn, error) { +func (c *wsClient) conn(ctx context.Context, workerID int) (*wsConn, error) { c.mu.Lock() if conn, ok := c.conns[workerID]; ok { c.mu.Unlock() return conn, nil } + bo := c.backoffs[workerID] + if bo == nil { + bo = &connectBackoff{} + c.backoffs[workerID] = bo + } c.mu.Unlock() - conn, err := c.dial() + conn, err := c.dial(ctx) if err != nil { + if ctx.Err() != nil || c.closeCtx.Err() != nil { + return nil, err // shutdown, not a server failure + } + recordConnectError() + if fatal := c.failFast.failure(time.Now(), err); fatal != nil { + return nil, fatal + } + // One attempt per DoRequest: sleep the (growing) backoff before + // surfacing the error so a dead server cannot induce a redial hot + // loop, then let the worker loop call back in. + bo.sleep(ctx, c.closeCtx.Done()) return nil, err } + c.failFast.success() + bo.reset() c.mu.Lock() + if c.closeCtx.Err() != nil { + // Close() won the race while we were handshaking; don't leak the + // conn into a map nobody will drain. + c.mu.Unlock() + conn.close() + return nil, c.closeCtx.Err() + } c.conns[workerID] = conn c.mu.Unlock() return conn, nil @@ -175,30 +219,58 @@ func (c *wsClient) drop(workerID int) { } } -func (c *wsClient) dial() (*wsConn, error) { +// dial opens and upgrades one WebSocket connection. The whole sequence — +// TCP/TLS connect plus the 101 handshake — is bounded by DialTimeout and +// aborts on ctx cancellation or Close(). +func (c *wsClient) dial(ctx context.Context) (*wsConn, error) { + var cancel context.CancelFunc + if c.dialTimeout > 0 { + ctx, cancel = context.WithTimeout(ctx, c.dialTimeout) + } else { + ctx, cancel = context.WithCancel(ctx) + } + defer cancel() + // Propagate Close() into the dial context (and from there into the + // in-flight connect and handshake I/O below). + stopClose := context.AfterFunc(c.closeCtx, cancel) + defer stopClose() + var ( raw net.Conn err error ) if c.tls { - raw, err = tls.Dial("tcp", c.host, &tls.Config{InsecureSkipVerify: c.insecure}) //nolint:gosec // InsecureSkipVerify is opt-in for self-signed test targets + d := &tls.Dialer{Config: &tls.Config{InsecureSkipVerify: c.insecure}} //nolint:gosec // InsecureSkipVerify is opt-in for self-signed test targets + raw, err = d.DialContext(ctx, "tcp", c.host) } else { - raw, err = net.Dial("tcp", c.host) + var d net.Dialer + raw, err = d.DialContext(ctx, "tcp", c.host) } if err != nil { return nil, err } + // The upgrade handshake is plain conn I/O with no deadline of its own; + // closing the socket when ctx fires keeps it bounded too. + stopIO := context.AfterFunc(ctx, func() { _ = raw.Close() }) + if _, err := raw.Write([]byte(wsUpgradeRequest(c.host, c.path, c.srvMode))); err != nil { + stopIO() _ = raw.Close() return nil, err } br := bufio.NewReader(raw) if err := wsReadHandshake(br); err != nil { + stopIO() _ = raw.Close() return nil, err } + if !stopIO() { + // ctx fired while the handshake was completing; raw is closed (or + // about to be) — treat as a failed dial. + return nil, ctx.Err() + } return &wsConn{raw: raw, br: br}, nil } diff --git a/ws_test.go b/ws_test.go index 100e8fa..e9dc7aa 100644 --- a/ws_test.go +++ b/ws_test.go @@ -5,6 +5,7 @@ import ( "context" "crypto/sha1" "encoding/base64" + "errors" "net" "net/http" "net/http/httptest" @@ -240,3 +241,243 @@ func withMod(c Config, f func(*Config)) Config { f(&c) return c } + +// refusedAddr returns host, port for an address that actively refuses +// connections (bound then immediately released). +func refusedAddr(t *testing.T) (string, string) { + t.Helper() + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + addr := ln.Addr().String() + _ = ln.Close() + host, port, err := net.SplitHostPort(addr) + if err != nil { + t.Fatal(err) + } + return host, port +} + +// TestWSFailFastAbortsRun drives a full Benchmarker run against a refusing +// port: with no stream ever established, the run must abort with an +// ErrNeverConnected-wrapped dial error well before Duration elapses, and +// dial attempts must be backoff-paced rather than hot-looped. +func TestWSFailFastAbortsRun(t *testing.T) { + host, port := refusedAddr(t) + + cfg := Config{ + URL: "http://" + net.JoinHostPort(host, port) + "/ws", + Mode: wsModeEcho, + scheme: "http", + } + raw, err := newWSClient(host, port, "/ws", cfg) + if err != nil { + t.Fatal(err) + } + // Shrink the fail-fast window so the test doesn't sit through the + // production 5s default. + raw.(*wsClient).failFast = newFailFastTracker(300 * time.Millisecond) + + before := connectErrorsCounter.Swap(0) + defer connectErrorsCounter.Add(before) + + b, err := New(Config{ + URL: cfg.URL, + Duration: 30 * time.Second, // must NOT be waited out + Connections: 2, + Workers: 2, + Warmup: 0, + Client: raw, + CPUMonitor: false, + RecvQProbe: false, + }) + if err != nil { + t.Fatal(err) + } + + start := time.Now() + result, err := b.Run(context.Background()) + elapsed := time.Since(start) + + if err == nil { + t.Fatal("expected fatal error from never-connected run") + } + if !errors.Is(err, ErrNeverConnected) { + t.Errorf("error must wrap ErrNeverConnected: %v", err) + } + if !strings.Contains(err.Error(), "loadgen: dial: ") || + !strings.Contains(err.Error(), "connection refused") { + t.Errorf("fatal error shape mismatch (want h1client pre-dial shape): %q", err.Error()) + } + if result != nil { + t.Errorf("expected nil result on fatal abort, got %+v", result) + } + if elapsed > 10*time.Second { + t.Errorf("abort took %v — run waited out the duration", elapsed) + } + // Hot-loop regression guard: v3.8 burned ~386k dials/sec. With 2 + // workers, a ~300ms window, and 10ms-doubling backoff, the attempt + // count must stay in the tens. + if n := b.errors.Load(); n == 0 || n > 200 { + t.Errorf("dial attempts = %d, want a small nonzero count (backoff-paced)", n) + } +} + +// TestWSReconnectBackoffAfterServerDeath: a client that HAD a live stream +// must keep retrying with backoff after the server dies — never fail fast, +// never hot-loop. +func TestWSReconnectBackoffAfterServerDeath(t *testing.T) { + srv := wsTestServer(t) + + c := wsDialClient(t, srv, wsModeEcho) + defer c.Close() + + if _, err := c.DoRequest(context.Background(), 0); err != nil { + t.Fatalf("priming request: %v", err) + } + + // Server dies mid-cell. srv.Close() releases the port but not the + // hijacked WS conn, so drop the client side too to force redials. + srv.Close() + c.(*wsClient).drop(0) + + deadline := time.Now().Add(700 * time.Millisecond) + errCount := 0 + for time.Now().Before(deadline) { + _, err := c.DoRequest(context.Background(), 0) + if err == nil { + continue + } + if errors.Is(err, ErrNeverConnected) { + t.Fatalf("had a live stream — must never fail fast: %v", err) + } + errCount++ + } + if errCount == 0 { + t.Fatal("expected reconnect errors after server death") + } + // 10ms-doubling backoff allows roughly 7 attempts in 700ms (plus the + // initial dead-conn read error); a hot loop would produce thousands. + if errCount > 100 { + t.Errorf("errCount = %d in 700ms — reconnects are not backoff-paced", errCount) + } + t.Logf("reconnect errors in 700ms: %d", errCount) +} + +// TestWSCloseAbortsDial parks a dial inside the upgrade handshake (the +// server accepts but never responds) and verifies Close() aborts it +// promptly even though neither ctx deadline nor DialTimeout is near. +func TestWSCloseAbortsDial(t *testing.T) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer func() { _ = ln.Close() }() + var held []net.Conn + defer func() { + for _, conn := range held { + _ = conn.Close() + } + }() + acceptDone := make(chan struct{}) + go func() { + defer close(acceptDone) + for { + conn, err := ln.Accept() + if err != nil { + return + } + held = append(held, conn) // hold open, never answer the upgrade + } + }() + + host, port, err := net.SplitHostPort(ln.Addr().String()) + if err != nil { + t.Fatal(err) + } + cfg := Config{ + URL: "http://" + ln.Addr().String() + "/ws", + Mode: wsModeEcho, + scheme: "http", + DialTimeout: 30 * time.Second, // Close, not the timeout, must abort + } + c, err := newWSClient(host, port, "/ws", cfg) + if err != nil { + t.Fatal(err) + } + + done := make(chan error, 1) + go func() { + _, derr := c.DoRequest(context.Background(), 0) + done <- derr + }() + + time.Sleep(100 * time.Millisecond) // let the dial park in the handshake read + start := time.Now() + c.Close() + + select { + case derr := <-done: + if derr == nil { + t.Error("expected an error from the aborted dial") + } + if elapsed := time.Since(start); elapsed > 2*time.Second { + t.Errorf("Close took %v to abort the in-flight dial", elapsed) + } + case <-time.After(5 * time.Second): + t.Fatal("Close() did not abort the in-flight dial") + } + _ = ln.Close() + <-acceptDone +} + +// TestWSConnectErrorsAccounting runs a short, non-fatal window against a +// refusing port and checks the new error-class plumbing end to end: +// Result.ConnectErrors, Result.Warmup, and bounded (non-hot-loop) totals. +func TestWSConnectErrorsAccounting(t *testing.T) { + host, port := refusedAddr(t) + + before := connectErrorsCounter.Swap(0) + defer connectErrorsCounter.Add(before) + + cfg := Config{ + URL: "http://" + net.JoinHostPort(host, port) + "/ws", + Mode: wsModeEcho, + Duration: 900 * time.Millisecond, // well under the 5s fail-fast window + Connections: 2, + Workers: 2, + Warmup: 400 * time.Millisecond, + CPUMonitor: false, + RecvQProbe: false, + } + b, err := New(cfg) + if err != nil { + t.Fatal(err) + } + result, err := b.Run(context.Background()) + if err != nil { + t.Fatalf("run should not trip fail-fast inside the 5s window: %v", err) + } + + if result.Errors == 0 { + t.Error("expected errors against a refusing port") + } + if result.ConnectErrors == 0 { + t.Error("expected ConnectErrors > 0 (dial failures must land in the connect class)") + } + if result.Warmup == nil { + t.Fatal("expected warmup stats") + } + if result.Warmup.Requests != 0 { + t.Errorf("warmup requests = %d, want 0 against a dead port", result.Warmup.Requests) + } + if result.Warmup.Errors == 0 || result.Warmup.ConnectErrors == 0 { + t.Errorf("100%%-failing warmup must be visible: %+v", result.Warmup) + } + // Hot-loop regression guard (v3.8: ~386k dials/sec). + if result.Errors > 500 { + t.Errorf("errors = %d in ~1.3s — dials are not backoff-paced", result.Errors) + } + t.Logf("errors=%d connect=%d warmup=%+v", result.Errors, result.ConnectErrors, result.Warmup) +} From c902b92e1bc3ac92b17650ce5f07a9cdd3eaeef2 Mon Sep 17 00:00:00 2001 From: Albert Bausili Date: Thu, 11 Jun 2026 22:56:58 +0200 Subject: [PATCH 5/8] fix(h1client): backoff-paced reconnects after mid-cell server death MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The reconnect-and-retry path failed fast on a refused dial and returned, letting the worker loop redial immediately — v3.8's crash cell logged 33.1M dial errors at ~370k/s. Failed reconnects now sleep the shared jittered exponential backoff (10ms doubling to 1s, ctx-abortable) before surfacing the error, reset on the next successful reconnect, and count into the connect-error class. --- h1client.go | 12 +++++ h1client_test.go | 125 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 137 insertions(+) diff --git a/h1client.go b/h1client.go index 1108cfb..30971d1 100644 --- a/h1client.go +++ b/h1client.go @@ -49,6 +49,11 @@ type h1Conn struct { writeBufferSize int scheme string tlsConfig *tls.Config + + // backoff paces reconnect attempts after a failed redial. Only the + // owning worker touches it (reconnect runs on the worker goroutine), + // so it needs no mu. + backoff connectBackoff } // newH1Client creates a new zero-alloc HTTP/1.1 client. @@ -223,8 +228,15 @@ func (c *h1Client) DoRequest(ctx context.Context, workerID int) (int, error) { } // Reconnect and retry once if reconnErr := hc.reconnect(); reconnErr != nil { + recordConnectError() + // Server-death guard: pace this conn's next attempt with + // exponential backoff (ctx-abortable) so a dead server cannot + // induce a redial hot loop (v3.8 crash cell: 33.1M dial errors + // at ~370k/s). Reset below on the next successful reconnect. + hc.backoff.sleep(ctx, nil) return 0, fmt.Errorf("h1client: conn[%d] reconnect failed: %w", connIdx, reconnErr) } + hc.backoff.reset() if _, err = hc.conn.Write(c.reqBuf); err != nil { return 0, fmt.Errorf("h1client: conn[%d] write after reconnect: %w", connIdx, err) } diff --git a/h1client_test.go b/h1client_test.go index 719be33..e8e9233 100644 --- a/h1client_test.go +++ b/h1client_test.go @@ -531,3 +531,128 @@ func TestH1MultipleWorkersConnectionIsolation(t *testing.T) { t.Error(err) } } + +// startKillableH1Server is startH1Server plus a kill function that tears +// down the listener AND every accepted conn, simulating server death +// (plain cleanup() leaves established keep-alive conns serving). +func startKillableH1Server(t *testing.T) (host, port string, kill func()) { + t.Helper() + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + var mu sync.Mutex + var conns []net.Conn + done := make(chan struct{}) + go func() { + defer close(done) + for { + conn, err := ln.Accept() + if err != nil { + return + } + mu.Lock() + conns = append(conns, conn) + mu.Unlock() + go func() { + reader := bufio.NewReader(conn) + for { + if !readH1Request(reader) { + return + } + resp := "HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK" + if _, err := conn.Write([]byte(resp)); err != nil { + return + } + } + }() + } + }() + h, p, _ := net.SplitHostPort(ln.Addr().String()) + return h, p, func() { + _ = ln.Close() + <-done + mu.Lock() + defer mu.Unlock() + for _, conn := range conns { + _ = conn.Close() + } + } +} + +// TestH1ReconnectBackoffAfterServerDeath verifies the mid-cell +// reconnect-after-server-death path is backoff-paced (v3.8 crash cell: +// 33.1M dial errors at ~370k/s) and lands in the connect-error class. +func TestH1ReconnectBackoffAfterServerDeath(t *testing.T) { + host, port, kill := startKillableH1Server(t) + + client, err := newH1Client(host, port, "/", testH1Cfg(true, 1)) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + if _, err := client.DoRequest(context.Background(), 0); err != nil { + t.Fatalf("priming request: %v", err) + } + + before := connectErrorsCounter.Swap(0) + defer connectErrorsCounter.Add(before) + + kill() // server dies mid-cell; port now refuses + + deadline := time.Now().Add(700 * time.Millisecond) + errCount := 0 + for time.Now().Before(deadline) { + if _, err := client.DoRequest(context.Background(), 0); err != nil { + errCount++ + } + } + if errCount == 0 { + t.Fatal("expected reconnect errors after server death") + } + // 10ms-doubling backoff allows roughly 7 reconnect attempts in 700ms; + // a hot loop would produce hundreds of thousands. + if errCount > 100 { + t.Errorf("errCount = %d in 700ms — reconnects are not backoff-paced", errCount) + } + if n := connectErrorsCounter.Swap(0); n == 0 { + t.Error("expected reconnect dial failures in the connect-error class") + } + if next := client.conns[0].backoff.next; next <= reconnectBackoffMin { + t.Errorf("backoff.next = %v after repeated failures, want > %v", next, reconnectBackoffMin) + } + t.Logf("reconnect errors in 700ms: %d", errCount) +} + +// TestH1ReconnectBackoffCtxAbort verifies ctx cancellation cuts a pending +// reconnect-backoff sleep short instead of holding the worker hostage. +func TestH1ReconnectBackoffCtxAbort(t *testing.T) { + host, port, kill := startKillableH1Server(t) + + client, err := newH1Client(host, port, "/", testH1Cfg(true, 1)) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + if _, err := client.DoRequest(context.Background(), 0); err != nil { + t.Fatalf("priming request: %v", err) + } + kill() + + // Force the next failure onto a long backoff sleep, then cancel. + client.conns[0].backoff.next = 2 * time.Second + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + + start := time.Now() + _, derr := client.DoRequest(ctx, 0) + elapsed := time.Since(start) + if derr == nil { + t.Fatal("expected an error against a dead server") + } + if elapsed > time.Second { + t.Errorf("DoRequest took %v — ctx cancellation did not abort the backoff sleep", elapsed) + } +} From 057f4bfe8310fbe69076ebffd8fa81305b54dd58 Mon Sep 17 00:00:00 2001 From: Albert Bausili Date: Fri, 12 Jun 2026 17:45:09 +0200 Subject: [PATCH 6/8] feat(mix): scope per-protocol counters to the measured window via baselines The warmup->measure boundary used to Store(0) the six mix counters while no workers were running. The calibrated saturation handoff (next commit) keeps workers in flight across that boundary, where a reset would race concurrent Adds and lose increments. markMeasureStart captures baselines instead; stats() subtracts them, so reported counts still cover the measured window only. --- mix.go | 42 +++++++++++++++++++++++++++++++++++------- mix_test.go | 30 ++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 7 deletions(-) diff --git a/mix.go b/mix.go index ff1ea40..e8fe846 100644 --- a/mix.go +++ b/mix.go @@ -146,6 +146,18 @@ type mixClient struct { h2Errors atomic.Int64 upgradeErrors atomic.Int64 + // Measured-window baselines captured by markMeasureStart at the + // warmup→measure handoff; stats() subtracts them so the reported + // per-protocol counts cover the measured window only. Plain fields: + // written once by the Run goroutine before the measured phase + // begins, read by stats() after the workers drain. + h1ReqBase int64 + h2ReqBase int64 + upgradeReqBase int64 + h1ErrBase int64 + h2ErrBase int64 + upgradeErrBase int64 + // Per-protocol conn counts (informational, surfaced in Result). h1Conns int h2Conns int @@ -303,6 +315,20 @@ func (c *mixClient) recordError(workerID int) { } } +// markMeasureStart records the current counter values as the measured-window +// baselines. Called once by Benchmarker.Run at the warmup→measure handoff. +// Baselines (rather than Store(0) resets) keep the handoff safe while +// saturation-mode workers are still adding to the counters: a concurrent +// Add can never be lost to a racing reset. +func (c *mixClient) markMeasureStart() { + c.h1ReqBase = c.h1Requests.Load() + c.h2ReqBase = c.h2Requests.Load() + c.upgradeReqBase = c.upgradeRequests.Load() + c.h1ErrBase = c.h1Errors.Load() + c.h2ErrBase = c.h2Errors.Load() + c.upgradeErrBase = c.upgradeErrors.Load() +} + // Close shuts down every sub-client. func (c *mixClient) Close() { if c.h1 != nil { @@ -316,17 +342,19 @@ func (c *mixClient) Close() { } } -// stats returns the per-protocol counters as a MixStats snapshot. +// stats returns the per-protocol counters as a MixStats snapshot, scoped +// to the measured window (counts accumulated before markMeasureStart are +// subtracted out). func (c *mixClient) stats() MixStats { return MixStats{ H1Conns: c.h1Conns, H2Conns: c.h2Conns, UpgradeConns: c.upgradeConns, - H1Requests: c.h1Requests.Load(), - H2Requests: c.h2Requests.Load(), - UpgradeRequests: c.upgradeRequests.Load(), - H1Errors: c.h1Errors.Load(), - H2Errors: c.h2Errors.Load(), - UpgradeErrors: c.upgradeErrors.Load(), + H1Requests: c.h1Requests.Load() - c.h1ReqBase, + H2Requests: c.h2Requests.Load() - c.h2ReqBase, + UpgradeRequests: c.upgradeRequests.Load() - c.upgradeReqBase, + H1Errors: c.h1Errors.Load() - c.h1ErrBase, + H2Errors: c.h2Errors.Load() - c.h2ErrBase, + UpgradeErrors: c.upgradeErrors.Load() - c.upgradeErrBase, } } diff --git a/mix_test.go b/mix_test.go index 032d60b..f1e994c 100644 --- a/mix_test.go +++ b/mix_test.go @@ -352,3 +352,33 @@ func TestMixProtoString(t *testing.T) { } } } + +// TestMixMarkMeasureStartBaselines verifies the per-protocol counters are +// scoped to the measured window via baselines rather than racy resets: in +// saturation mode the warmup→measure handoff happens while workers are +// still adding to the counters, so a Store(0) reset could lose increments. +func TestMixMarkMeasureStartBaselines(t *testing.T) { + mc := &mixClient{} + mc.h1Requests.Store(10) + mc.h2Requests.Store(20) + mc.upgradeRequests.Store(30) + mc.h1Errors.Store(1) + mc.h2Errors.Store(2) + mc.upgradeErrors.Store(3) + + mc.markMeasureStart() + + mc.h1Requests.Add(5) + mc.h2Requests.Add(6) + mc.upgradeRequests.Add(7) + mc.h1Errors.Add(1) + + got := mc.stats() + want := MixStats{ + H1Requests: 5, H2Requests: 6, UpgradeRequests: 7, + H1Errors: 1, H2Errors: 0, UpgradeErrors: 0, + } + if got != want { + t.Errorf("stats after markMeasureStart = %+v, want %+v", got, want) + } +} From 48bd709856359b320e6fb7e452a9b69d5cc7850f Mon Sep 17 00:00:00 2001 From: Albert Bausili Date: Fri, 12 Jun 2026 17:45:26 +0200 Subject: [PATCH 7/8] =?UTF-8?q?fix(bench):=20calibrated=20warmup->measure?= =?UTF-8?q?=20handoff=20=E2=80=94=20no=20t=3D0=20step=20in=20saturation=20?= =?UTF-8?q?mode?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit v3.9 repro (chain-api-*/celeris-iouring-h1-async, 90s cells): every saturation cell's errors landed exclusively in the first measured second (get-json 22, get-json-1c 53, post-4k 271) with connect_errors=0, while the 20s warmup ran clean at ~508k RPS and the measured window settled at ~595k. Mechanism: warmup drove only 75% of the workers (so the closed loop never explored the knee and 25% of the keep-alive conns sat idle, free for the server to expire), then the boundary stopped every worker and restarted the full set at t=0 — a +33% concurrency step plus a phase-aligned restart burst that the server absorbed by resetting established conns. Steady state never repeats any of that, hence the one-second burst. The handoff is now calibrated: - Saturation mode (Rate==0): warmupHot starts the FULL worker set and leaves it running across the boundary. Knee discovery happens during warmup (errors land in Result.Warmup — honest calibration); the measured window adopts the same goroutines mid-flight, so it opens at the converged steady rate with no concurrency step, no cold conns, no herd. Short warmups simply continue the same ramp from below inside the window — converging to the true max without overshoot. - Boundary bookkeeping is swap/baseline based since workers stay hot: the latency recorder is held behind an atomic.Pointer and a fresh one is swapped in at the boundary (one pointer load per recorded request; negligible next to the existing windowMu acquisition), measured errors are reported as errors-errorsBase instead of resetting the counter, and warmup request totals are finalised at end-of-run so per-shard locals unflushed at the swap are not lost. - Rated mode keeps its stop/start phases (the measured window swaps in the rate scheduler), but its warmup now also runs the full worker set so no conn enters the paced window cold. Measured-window errors on a healthy server are now structurally zero — no samples are trimmed or edited, the burst simply happens where the calibration phase reports it. --- bench.go | 257 ++++++++++++++++++++++++++++++++++++++--------------- latency.go | 5 +- 2 files changed, 190 insertions(+), 72 deletions(-) diff --git a/bench.go b/bench.go index a09e928..4a97ef6 100644 --- a/bench.go +++ b/bench.go @@ -70,9 +70,13 @@ type Config struct { Workers int // Warmup is the warmup duration before the measured benchmark begins. - // During warmup, 75% of workers send requests to warm connection pools - // and give the server a realistic load preview. Set to 0 to skip. - // Default: 5s. + // Warmup runs the full worker set so every connection carries traffic + // before measurement starts. In saturation mode (Rate == 0) warmup is + // a calibration phase: the workers it starts keep running across the + // warmup→measure boundary, so the measured window opens at the + // already-converged steady state instead of stepping past the knee at + // t=0. Warmup-phase outcomes are reported on Result.Warmup. Set to 0 + // to skip. Default: 5s. Warmup time.Duration // DisableKeepAlive disables HTTP keep-alive (Connection: close mode). @@ -326,8 +330,33 @@ type Benchmarker struct { // Requests and bytesRead are tracked per-shard in latencies. errors atomic.Int64 - // Latency tracking (also holds per-shard request/bytes counters) - latencies *ShardedLatencyRecorder + // errorsBase is the value of errors at the warmup→measure boundary. + // Measured-phase errors are reported as errors − errorsBase instead + // of resetting the counter, because saturation-mode workers keep + // running across the boundary and a concurrent Add could be lost to + // a racing Store(0). Written once by Run before the timeseries + // ticker starts; read by the ticker and buildResult afterwards. + errorsBase int64 + + // Latency tracking (also holds per-shard request/bytes counters). + // Held behind an atomic pointer because the saturation-mode + // warmup→measure handoff swaps in a fresh recorder while workers are + // still in flight (see Run). Hot-path cost is a single atomic + // pointer load per recorded request — negligible next to the + // windowMu acquisition RecordSuccess already performs. + latencies atomic.Pointer[ShardedLatencyRecorder] + + // warmupRec retains the recorder that accumulated warmup traffic + // after the handoff swap. Per-shard local counters that were still + // unflushed at the boundary stay stranded inside it until the final + // single-threaded FlushLocal at end-of-run, which makes the warmup + // request count exact. Nil when Warmup == 0. + warmupRec *ShardedLatencyRecorder + + // flushInterval mirrors the recorder flush cadence chosen in New so + // the measured-phase recorder created at the warmup boundary matches + // the warmup one. + flushInterval int64 // Fatal abort: the first ErrNeverConnected-wrapped failure a worker // observes lands in fatalErr and closes fatalCh, so Run/warmup stop @@ -439,12 +468,14 @@ func New(cfg Config) (*Benchmarker, error) { // Use custom client if provided if cfg.Client != nil { - return &Benchmarker{ - config: cfg, - raw: cfg.Client, - latencies: NewShardedLatencyRecorder(cfg.Workers, flushInterval), - fatalCh: make(chan struct{}), - }, nil + b := &Benchmarker{ + config: cfg, + raw: cfg.Client, + flushInterval: flushInterval, + fatalCh: make(chan struct{}), + } + b.latencies.Store(NewShardedLatencyRecorder(cfg.Workers, flushInterval)) + return b, nil } host, port, path, scheme, err := parseURL(cfg.URL) @@ -483,11 +514,12 @@ func New(cfg Config) (*Benchmarker, error) { } b := &Benchmarker{ - config: cfg, - raw: raw, - latencies: NewShardedLatencyRecorder(cfg.Workers, flushInterval), - fatalCh: make(chan struct{}), + config: cfg, + raw: raw, + flushInterval: flushInterval, + fatalCh: make(chan struct{}), } + b.latencies.Store(NewShardedLatencyRecorder(cfg.Workers, flushInterval)) if mc, ok := raw.(*mixClient); ok { b.mix = mc } @@ -506,36 +538,43 @@ func (b *Benchmarker) Run(ctx context.Context) (*Result, error) { // Scope the global connect-error counter to this run. connectErrorsCounter.Store(0) + // Saturation mode hands the warmup workers over to the measured + // window without stopping them: the same goroutines (and therefore + // the same closed-loop concurrency, connection set, and arrival + // phase) carry straight across the boundary, so the measured window + // opens at the steady state warmup converged to. Rated mode keeps + // its stop/start phases because the measured window swaps in the + // rate scheduler + ratedWorker loop. + continuous := b.config.Rate <= 0 && b.config.Warmup > 0 + + // Create a scoped context for this benchmark run. Workers use this context + // so their in-flight HTTP requests are cancelled when the benchmark ends, + // preventing wg.Wait() from hanging if the server stops responding. + // Allow Warmup + Duration + 60s for in-flight requests to drain. + runCtx, runCancel := context.WithTimeout(ctx, b.config.Warmup+b.config.Duration+60*time.Second) + defer runCancel() + // Warmup phase if b.config.Warmup > 0 { - b.warmup(ctx) - // Snapshot warmup outcomes before the counters reset for the - // measured run — a 100%-failing warmup must stay observable. - b.latencies.FlushLocal() - warmupReqs, _ := b.latencies.Totals() - b.warmupStats = &WarmupStats{ - Requests: warmupReqs, - Errors: b.errors.Load(), - ConnectErrors: snapshotConnectErrors(), + if continuous { + b.warmupHot(ctx, runCtx) + } else { + b.warmup(ctx) } if ferr := b.fatalError(); ferr != nil { + // Continuous warmup leaves workers running; tear them down + // before surfacing the abort (rated warmup already drained + // its own). + if continuous { + b.running.Store(false) + runCancel() + b.raw.Close() + b.wg.Wait() + } return nil, ferr } } - // Reset metrics for actual benchmark - b.errors.Store(0) - b.latencies.Reset() - if b.mix != nil { - // Clear per-protocol counters accumulated during warmup. - b.mix.h1Requests.Store(0) - b.mix.h2Requests.Store(0) - b.mix.upgradeRequests.Store(0) - b.mix.h1Errors.Store(0) - b.mix.h2Errors.Store(0) - b.mix.upgradeErrors.Store(0) - } - // Federation: dial the peer before workers spin up so the start // signal lands at a well-defined moment relative to local launch. var fed *FederationCoordinator @@ -578,13 +617,6 @@ func (b *Benchmarker) Run(ctx context.Context) (*Result, error) { selfCPU.Start() } - // Create a scoped context for this benchmark run. Workers use this context - // so their in-flight HTTP requests are cancelled when the benchmark ends, - // preventing wg.Wait() from hanging if the server stops responding. - // Allow Duration + 60s for in-flight requests to drain. - runCtx, runCancel := context.WithTimeout(ctx, b.config.Duration+60*time.Second) - defer runCancel() - // Per-socket recv-Q probe (Linux only). The probe is best-effort: when // it cannot read /proc/net/tcp it stays silent and RecvQHigh remains // false. Caller filters by loadgen's open socket inodes. @@ -594,13 +626,40 @@ func (b *Benchmarker) Run(ctx context.Context) (*Result, error) { recvqProbe.Start(runCtx) } + // Warmup→measure handoff: snapshot warmup outcomes, then swap in a + // fresh recorder so measured percentiles and counters start clean. + // In continuous (saturation) mode workers fly straight through the + // swap — the measured window begins at the already-converged steady + // rate with no concurrency step, no cold connections, and no + // phase-aligned restart burst. The warmup request count is finalised + // at end-of-run (see below) so per-shard local counters that are + // still unflushed here are not lost. + if b.config.Warmup > 0 { + b.warmupRec = b.latencies.Load() + b.errorsBase = b.errors.Load() + b.warmupStats = &WarmupStats{ + Errors: b.errorsBase, + ConnectErrors: snapshotConnectErrors(), + } + b.latencies.Store(NewShardedLatencyRecorder(b.config.Workers, b.flushInterval)) + if b.mix != nil { + b.mix.markMeasureStart() + } + } + // Timeseries collection: 1-second snapshots var timeseries []TimeseriesPoint var prevReqs int64 - var prevErrors int64 + prevErrors := b.errorsBase var prevConnErrors uint64 - // Start workers — each gets a unique workerID for connection partitioning + // The measured-phase recorder: stable from here on, so the ticker + // and workers may cache the pointer. + rec := b.latencies.Load() + + // Start workers — each gets a unique workerID for connection partitioning. + // In continuous mode the warmup workers are already running and are + // simply adopted by the measured window. b.running.Store(true) start := time.Now() @@ -624,12 +683,14 @@ func (b *Benchmarker) Run(ctx context.Context) (*Result, error) { go b.rateScheduler(runCtx, start, slots, schedDone) } - for i := range b.config.Workers { - workerID := i - if slots != nil { - b.wg.Go(func() { b.ratedWorker(runCtx, workerID, slots) }) - } else { - b.wg.Go(func() { b.worker(runCtx, workerID) }) + if !continuous { + for i := range b.config.Workers { + workerID := i + if slots != nil { + b.wg.Go(func() { b.ratedWorker(runCtx, workerID, slots) }) + } else { + b.wg.Go(func() { b.worker(runCtx, workerID) }) + } } } @@ -642,11 +703,11 @@ func (b *Benchmarker) Run(ctx context.Context) (*Result, error) { for { select { case <-ticker.C: - reqs, _ := b.latencies.Totals() + reqs, _ := rec.Totals() elapsed := time.Since(start).Seconds() deltaReqs := reqs - prevReqs prevReqs = reqs - p99 := b.latencies.SnapshotWindowP99Ms() + p99 := rec.SnapshotWindowP99Ms() curErr := b.errors.Load() curConnErr := connectErrorsCounter.Load() timeseries = append(timeseries, TimeseriesPoint{ @@ -703,7 +764,15 @@ func (b *Benchmarker) Run(ctx context.Context) (*Result, error) { elapsed := time.Since(start) // Flush unflushed local counters to atomics now that workers have stopped. - b.latencies.FlushLocal() + rec.FlushLocal() + + // Finalise the warmup request count: locals that were unflushed at + // the handoff stayed stranded in the warmup recorder; flushing them + // now (single-threaded) makes Result.Warmup.Requests exact. + if b.warmupRec != nil && b.warmupStats != nil { + b.warmupRec.FlushLocal() + b.warmupStats.Requests, _ = b.warmupRec.Totals() + } result := b.buildResult(elapsed) result.ClientCPUPercent = cpuMon.Stop() @@ -727,7 +796,7 @@ func (b *Benchmarker) Run(ctx context.Context) (*Result, error) { // Collect peer histogram if federation is active. if fed != nil { - local := b.latencies.MergedHistogram() + local := rec.MergedHistogram() peerReqs, peerErrs, merged, err := fed.CollectResult(local) _ = fed.Close() if err != nil { @@ -752,21 +821,23 @@ func (b *Benchmarker) Run(ctx context.Context) (*Result, error) { return result, nil } +// warmup is the stop/start warmup used by rated mode: closed-loop workers +// run for the warmup window, then drain so the measured window can swap in +// the rate scheduler + ratedWorker loop. Saturation mode uses warmupHot +// instead, which keeps its workers running across the boundary. +// +// The full worker set runs so every connection in the pool carries traffic +// before measurement begins. The old 75% heuristic left a quarter of the +// keep-alive conns idle through warmup; their first-ever requests then +// landed inside the measured window, after the server had been free to +// idle-expire them. func (b *Benchmarker) warmup(ctx context.Context) { warmupCtx, cancel := context.WithTimeout(ctx, b.config.Warmup) defer cancel() b.running.Store(true) - // Use 75% of workers for warmup to properly warm up connection pools - // and give the server a realistic preview of the load - warmupWorkers := max((b.config.Workers*3)/4, 4) - // Don't exceed actual worker count - if warmupWorkers > b.config.Workers { - warmupWorkers = b.config.Workers - } - - for i := range warmupWorkers { + for i := range b.config.Workers { workerID := i b.wg.Go(func() { b.worker(warmupCtx, workerID) }) } @@ -782,6 +853,43 @@ func (b *Benchmarker) warmup(ctx context.Context) { b.wg.Wait() } +// warmupHot runs the calibration warmup for saturation mode: the full +// worker set starts immediately — the same closed-loop concurrency the +// measured window uses — and is left RUNNING when this returns. Knee +// discovery happens here: the aggressive initial approach (simultaneous +// first requests, server backpressure while it absorbs the burst) plays +// out during warmup, where any errors land in Result.Warmup. By the time +// the measured window opens, the loop has self-paced to the knee and +// carries across the boundary with no concurrency step, no cold +// connections, and no phase-aligned restart burst — the v3.9 repro showed +// every saturation cell's errors confined to the first measured second +// because warmup drove only 75% of the workers and the boundary +// stopped+restarted them. +// +// If the warmup window is too short for full convergence, the measured +// window simply continues the same closed-loop ramp from below — the +// searcher still reaches the true max inside the window, without a step +// that overshoots the knee. +// +// ctx is the caller's context (aborts the wait); runCtx is the +// whole-run context handed to the workers, since they outlive warmup. +func (b *Benchmarker) warmupHot(ctx context.Context, runCtx context.Context) { + b.running.Store(true) + + for i := range b.config.Workers { + workerID := i + b.wg.Go(func() { b.worker(runCtx, workerID) }) + } + + t := time.NewTimer(b.config.Warmup) + defer t.Stop() + select { + case <-ctx.Done(): + case <-t.C: + case <-b.fatalCh: + } +} + // rateScheduler emits intended-dispatch timestamps at the configured Rate. // It closes slots when the run context is cancelled or when running flips // false, so ratedWorker goroutines can drop out cleanly. @@ -881,7 +989,7 @@ func (b *Benchmarker) ratedWorker(ctx context.Context, workerID int, slots <-cha } continue } - b.latencies.RecordSuccess(workerID, latency, bytesRead) + b.latencies.Load().RecordSuccess(workerID, latency, bytesRead) } } } @@ -934,14 +1042,21 @@ func (b *Benchmarker) worker(ctx context.Context, workerID int) { b.mix.recordError(workerID) } } else { - b.latencies.RecordSuccess(workerID, latency, bytesRead) + // The pointer load (not a cached pointer) is deliberate: the + // saturation warmup→measure handoff swaps recorders while this + // loop keeps running, and loading at record time files each + // completion into the phase it finished in. + b.latencies.Load().RecordSuccess(workerID, latency, bytesRead) } } } func (b *Benchmarker) buildResult(elapsed time.Duration) *Result { - reqs, bytesRead := b.latencies.Totals() - errs := b.errors.Load() + rec := b.latencies.Load() + reqs, bytesRead := rec.Totals() + // errorsBase scopes the cumulative counter to the measured window — + // warmup errors are reported separately on Result.Warmup. + errs := b.errors.Load() - b.errorsBase rps := float64(reqs) / elapsed.Seconds() throughput := float64(bytesRead) / elapsed.Seconds() @@ -953,13 +1068,13 @@ func (b *Benchmarker) buildResult(elapsed time.Duration) *Result { Duration: elapsed, RequestsPerSec: rps, ThroughputBPS: throughput, - Latency: b.latencies.Percentiles(), + Latency: rec.Percentiles(), DialRetries: snapshotDialRetries(), ConnectErrors: snapshotConnectErrors(), Warmup: b.warmupStats, } - if hist, err := b.latencies.EncodeHistogram(); err == nil { + if hist, err := rec.EncodeHistogram(); err == nil { res.Histogram = hist } diff --git a/latency.go b/latency.go index 388ec02..4f34c78 100644 --- a/latency.go +++ b/latency.go @@ -234,7 +234,10 @@ func (s *ShardedLatencyRecorder) SnapshotWindowP99Ms() float64 { return float64(merged.ValueAtQuantile(99)) / 1e6 } -// Reset clears all shards (called between warmup and main benchmark). +// Reset clears all shards. Only safe while no workers are recording — +// the warmup→measure handoff swaps in a fresh recorder instead of +// resetting, because saturation-mode workers keep running across the +// boundary. func (s *ShardedLatencyRecorder) Reset() { for i := range s.shards { sh := &s.shards[i] From b19424235b784e116761abd028fd9a0c45e76413 Mon Sep 17 00:00:00 2001 From: Albert Bausili Date: Fri, 12 Jun 2026 17:45:42 +0200 Subject: [PATCH 8/8] test(bench): handoff continuity unit tests + shedding-server integration test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - TestSaturationHandoffContinuity: goroutine-identity fingerprinting proves the measured window adopts the warmup workers (no second worker set, full set active from warmup start) and that warmup+measured request accounting is exact across the recorder swap. - TestRatedHandoffKeepsStopStart: rated mode still drains warmup workers and spawns a fresh ratedWorker set — scheduler semantics untouched. - TestSaturationHandoffZeroMeasuredErrors: loopback raw-TCP server with a zero-burst 8k RPS pacer, cold-start shedding, and idle-conn expiry (the three SUT behaviours behind the v3.9 burst). Asserts measured window has zero errors in every timeseries bucket while warmup reports the shedding, and that measured RPS converges to the server limit without understating the warmup-calibrated rate. --- bench_handoff_test.go | 395 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 395 insertions(+) create mode 100644 bench_handoff_test.go diff --git a/bench_handoff_test.go b/bench_handoff_test.go new file mode 100644 index 0000000..34ef465 --- /dev/null +++ b/bench_handoff_test.go @@ -0,0 +1,395 @@ +package loadgen + +import ( + "bufio" + "context" + "net" + "runtime" + "sync" + "sync/atomic" + "testing" + "time" +) + +// goid returns the calling goroutine's id parsed from runtime.Stack. +// Test-only: goroutine ids are never reused by the runtime, which makes +// them a deterministic fingerprint for "did the benchmarker spawn a second +// worker set at the warmup→measure boundary". +func goid() int64 { + var buf [64]byte + n := runtime.Stack(buf[:], false) + // Format: "goroutine 123 [running]:..." + const prefix = len("goroutine ") + var id int64 + for _, c := range buf[prefix:n] { + if c < '0' || c > '9' { + break + } + id = id*10 + int64(c-'0') + } + return id +} + +// handoffClient is a Client that fingerprints every DoRequest call: +// which goroutine issued it, which workerID, and when the worker was +// first seen relative to the run start. +type handoffClient struct { + start time.Time + + mu sync.Mutex + goroutines map[int64]struct{} + firstSeen map[int]time.Duration + + calls atomic.Int64 +} + +func newHandoffClient() *handoffClient { + return &handoffClient{ + start: time.Now(), + goroutines: make(map[int64]struct{}), + firstSeen: make(map[int]time.Duration), + } +} + +func (h *handoffClient) DoRequest(_ context.Context, workerID int) (int, error) { + h.mu.Lock() + h.goroutines[goid()] = struct{}{} + if _, ok := h.firstSeen[workerID]; !ok { + h.firstSeen[workerID] = time.Since(h.start) + } + h.mu.Unlock() + h.calls.Add(1) + time.Sleep(200 * time.Microsecond) + return 2, nil +} + +func (h *handoffClient) Close() {} + +// TestSaturationHandoffContinuity pins the calibrated warmup→measure +// handoff in saturation mode: the commanded rate of an open benchmark is +// its closed-loop concurrency, so "no step discontinuity across the +// boundary" means (a) warmup already runs the full worker set and (b) the +// exact same worker goroutines carry across the boundary instead of being +// stopped and restarted. Before the fix, warmup ran 75% of the workers and +// the boundary respawned everything — the +25% concurrency step plus the +// phase-aligned restart burst put every error of a run into the first +// measured second (v3.9 chain-api repro). +func TestSaturationHandoffContinuity(t *testing.T) { + const workers = 8 + hc := newHandoffClient() + + cfg := Config{ + URL: "http://127.0.0.1:1/", // never dialled: custom Client + Method: "GET", + Duration: 400 * time.Millisecond, + Connections: workers, + Workers: workers, + Warmup: 600 * time.Millisecond, + Client: hc, + CPUMonitor: false, + RecvQProbe: false, + } + b, err := New(cfg) + if err != nil { + t.Fatal(err) + } + result, err := b.Run(context.Background()) + if err != nil { + t.Fatal(err) + } + + hc.mu.Lock() + gor := len(hc.goroutines) + seen := len(hc.firstSeen) + var latestFirst time.Duration + for _, d := range hc.firstSeen { + if d > latestFirst { + latestFirst = d + } + } + hc.mu.Unlock() + + // (a) Full worker set participates in the warmup, and (b) no second + // worker set is spawned at the boundary: exactly `workers` distinct + // goroutines must have issued requests over the whole run. The + // pre-fix behaviour would show warmup (6) + measured (8) = 14. + if gor != workers { + t.Errorf("distinct worker goroutines = %d, want %d (stop/restart at the warmup boundary?)", gor, workers) + } + if seen != workers { + t.Errorf("distinct workerIDs = %d, want %d (warmup did not run the full worker set)", seen, workers) + } + // Every worker must be active early in the warmup — well before the + // boundary at 600ms. 300ms is half the warmup: a worker first seen + // later than that was started by the measured window, i.e. a step. + if latestFirst > 300*time.Millisecond { + t.Errorf("latest worker first seen at %v, want < 300ms (worker joined after warmup start)", latestFirst) + } + + // Accounting across the handoff is exact: every successful request + // lands in exactly one phase, including per-shard local counters + // that were unflushed when the recorder swap happened. + if result.Warmup == nil { + t.Fatal("expected Result.Warmup to be populated") + } + total := hc.calls.Load() + if got := result.Warmup.Requests + result.Requests; got != total { + t.Errorf("warmup(%d) + measured(%d) = %d requests, want exactly %d (client total)", + result.Warmup.Requests, result.Requests, got, total) + } + if result.Warmup.Requests == 0 { + t.Error("expected warmup requests > 0") + } + if result.Requests == 0 { + t.Error("expected measured requests > 0") + } + if result.Errors != 0 || result.Warmup.Errors != 0 { + t.Errorf("unexpected errors: measured=%d warmup=%d", result.Errors, result.Warmup.Errors) + } + t.Logf("handoff: %d goroutines, warmup=%d measured=%d total=%d latestFirstSeen=%v", + gor, result.Warmup.Requests, result.Requests, total, latestFirst) +} + +// TestRatedHandoffKeepsStopStart pins that rated mode keeps its two-phase +// structure: the warmup's closed-loop workers drain at the boundary and a +// fresh ratedWorker set drives the measured window — so the constant-rate +// scheduler semantics are untouched by the saturation handoff change. +func TestRatedHandoffKeepsStopStart(t *testing.T) { + const workers = 4 + hc := newHandoffClient() + + cfg := Config{ + URL: "http://127.0.0.1:1/", + Method: "GET", + Duration: 500 * time.Millisecond, + Connections: workers, + Workers: workers, + Warmup: 300 * time.Millisecond, + Rate: 200, + Client: hc, + CPUMonitor: false, + RecvQProbe: false, + } + b, err := New(cfg) + if err != nil { + t.Fatal(err) + } + result, err := b.Run(context.Background()) + if err != nil { + t.Fatal(err) + } + + hc.mu.Lock() + gor := len(hc.goroutines) + hc.mu.Unlock() + + // Warmup spawns `workers` closed-loop goroutines, the measured window + // spawns `workers` ratedWorker goroutines: 2× distinct ids in total. + if gor != 2*workers { + t.Errorf("distinct worker goroutines = %d, want %d (rated warmup must stop/start at the boundary)", gor, 2*workers) + } + if !result.RatedMode { + t.Error("expected RatedMode result") + } + if result.Warmup == nil || result.Warmup.Requests == 0 { + t.Errorf("expected populated warmup stats, got %+v", result.Warmup) + } + if got := result.Warmup.Requests + result.Requests; got != hc.calls.Load() { + t.Errorf("warmup(%d) + measured(%d) = %d requests, want exactly %d", + result.Warmup.Requests, result.Requests, got, hc.calls.Load()) + } + t.Logf("rated handoff: %d goroutines, warmup=%d measured=%d", gor, result.Warmup.Requests, result.Requests) +} + +// sheddingServer is a raw-TCP HTTP/1.1 server that models the SUT +// behaviours behind the v3.9 t=0 error burst: +// +// - a hard throughput limit: every response waits for a slot from a +// zero-burst pacer (capacity 1), so sustained RPS cannot exceed +// 1/interval no matter how hard the client pushes; +// - cold-start shedding: during the slow-start window after boot, every +// 3rd request gets its connection closed without a response (the +// "server absorbing the initial burst" phase — these errors must land +// in warmup, never in the measured window); +// - idle-connection expiry: a connection idle longer than idleTimeout is +// closed server-side. Before the fix, the 25% of conns that warmup +// never exercised were idle-expired and turned into guaranteed +// measured-window errors at t=0. +type sheddingServer struct { + ln net.Listener + start time.Time + interval int64 // pacer slot width, nanoseconds + slowStart time.Duration + idle time.Duration + + nextSlot atomic.Int64 + reqCount atomic.Int64 + sheds atomic.Int64 +} + +func startSheddingServer(t *testing.T, interval, slowStart, idle time.Duration) *sheddingServer { + t.Helper() + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + s := &sheddingServer{ + ln: ln, + start: time.Now(), + interval: int64(interval), + slowStart: slowStart, + idle: idle, + } + go func() { + for { + conn, err := ln.Accept() + if err != nil { + return + } + go s.serve(conn) + } + }() + t.Cleanup(func() { _ = ln.Close() }) + return s +} + +// acquireSlot blocks until this request's paced slot arrives. Zero-burst: +// the next slot never lags behind "now", so quiet periods do not bank +// tokens that a later burst could spend. +func (s *sheddingServer) acquireSlot() { + for { + now := time.Now().UnixNano() + cur := s.nextSlot.Load() + slot := cur + if now > slot { + slot = now + } + if s.nextSlot.CompareAndSwap(cur, slot+s.interval) { + if d := slot - now; d > 0 { + time.Sleep(time.Duration(d)) + } + return + } + } +} + +var sheddingResponse = []byte("HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK") + +func (s *sheddingServer) serve(conn net.Conn) { + defer func() { _ = conn.Close() }() + br := bufio.NewReaderSize(conn, 4096) + for { + // Idle expiry: kill connections that sit quiet, like a server + // keep-alive timeout would. + _ = conn.SetReadDeadline(time.Now().Add(s.idle)) + // Read one request: header lines until the blank line (GETs only). + seenRequestLine := false + for { + line, err := br.ReadSlice('\n') + if err != nil { + return // idle timeout, client close, or mid-request error + } + if len(line) <= 2 { + break + } + seenRequestLine = true + } + if !seenRequestLine { + return + } + // Cold-start shedding: while the server "boots", every 3rd + // request gets its conn dropped instead of a response. + if time.Since(s.start) < s.slowStart && s.reqCount.Add(1)%3 == 0 { + s.sheds.Add(1) + return + } + s.acquireSlot() + if _, err := conn.Write(sheddingResponse); err != nil { + return + } + } +} + +// TestSaturationHandoffZeroMeasuredErrors is the loopback integration test +// for the calibrated handoff: against a rate-limited server that sheds +// during its cold start and expires idle connections, the warmup phase may +// record errors (and reports them on Result.Warmup — honest calibration), +// but the measured window must be structurally clean AND must converge to +// the server's throughput limit rather than understating it. +func TestSaturationHandoffZeroMeasuredErrors(t *testing.T) { + const ( + workers = 16 + slotInterval = 125 * time.Microsecond // 8000 req/s server-side limit + slowStart = 250 * time.Millisecond + idleTimeout = 400 * time.Millisecond + ) + srv := startSheddingServer(t, slotInterval, slowStart, idleTimeout) + + cfg := Config{ + URL: "http://" + srv.ln.Addr().String() + "/", + Method: "GET", + Duration: 1200 * time.Millisecond, + Connections: workers, + Workers: workers, + Warmup: 1200 * time.Millisecond, + CPUMonitor: false, + RecvQProbe: false, + } + b, err := New(cfg) + if err != nil { + t.Fatal(err) + } + result, err := b.Run(context.Background()) + if err != nil { + t.Fatal(err) + } + if result.Warmup == nil { + t.Fatal("expected warmup stats") + } + + // The requirement: measured-window errors are structurally zero. + // Cold-start sheds happen inside the warmup; no connection ever goes + // idle long enough to be expired because the warmup exercises the + // full pool and the handoff never stops the workers. + if result.Errors != 0 { + t.Errorf("measured-window errors = %d, want 0 (calibration burst leaked past the warmup boundary)", result.Errors) + } + for _, p := range result.Timeseries { + if p.Errors != 0 { + t.Errorf("timeseries bucket at t=%.1fs has %d errors, want 0", p.TimestampSec, p.Errors) + } + } + + // The shedding really happened — and landed in warmup where it is + // reported, not discarded. + if srv.sheds.Load() == 0 { + t.Error("server shed no requests — slow-start window never exercised, test is vacuous") + } + if result.Warmup.Errors == 0 { + t.Error("expected warmup errors > 0 from cold-start shedding (honest calibration reporting)") + } + + // Convergence: the measured window must track the server's limit, not + // re-discover it. Upper bound: the pacer is zero-burst, so measured + // RPS cannot exceed the limit by more than scheduling noise. Lower + // bound: half the limit is far above what a botched handoff that + // understates the knee would produce, while staying robust to slow CI. + limit := float64(time.Second) / float64(slotInterval) + if result.RequestsPerSec > 1.15*limit { + t.Errorf("measured RPS %.0f exceeds server limit %.0f — pacer bypassed?", result.RequestsPerSec, limit) + } + if result.RequestsPerSec < 0.5*limit { + t.Errorf("measured RPS %.0f below 50%% of server limit %.0f — max understated", result.RequestsPerSec, limit) + } + + // No understatement across the handoff: the measured window must be + // at least as fast as the warmup average (which includes the slow + // cold-start convergence). + warmupAvg := float64(result.Warmup.Requests) / cfg.Warmup.Seconds() + if result.RequestsPerSec < 0.9*warmupAvg { + t.Errorf("measured RPS %.0f < 90%% of warmup average %.0f — handoff lost the calibrated rate", result.RequestsPerSec, warmupAvg) + } + + t.Logf("shedding server: warmup=%d reqs / %d errors, measured=%.0f RPS (limit %.0f), sheds=%d", + result.Warmup.Requests, result.Warmup.Errors, result.RequestsPerSec, limit, srv.sheds.Load()) +}