diff --git a/backoff.go b/backoff.go new file mode 100644 index 0000000..f5c4601 --- /dev/null +++ b/backoff.go @@ -0,0 +1,116 @@ +package loadgen + +import ( + "context" + "errors" + "fmt" + "math/rand/v2" + "sync" + "time" +) + +// Reconnect backoff bounds: the first retry waits ~reconnectBackoffMin, +// doubling per consecutive failure up to reconnectBackoffMax. Sleeps are +// jittered uniformly over [d/2, d] so a fleet of workers does not redial in +// lockstep against a recovering server. v3.8 post-mortem: without pacing, +// the ws/sse drivers redialled a dead port at ~386k dials/sec (34.7M errors +// per 90s cell). +const ( + reconnectBackoffMin = 10 * time.Millisecond + reconnectBackoffMax = time.Second +) + +// failFastWindow is how long a streaming client (ws/sse) tolerates +// nothing-but-connect-failures before declaring the target dead. It only +// applies while NO stream was ever established — a client that had live +// streams keeps retrying with backoff indefinitely (the server may come +// back mid-cell). +const failFastWindow = 5 * time.Second + +// ErrNeverConnected marks the fatal fail-fast condition: every connect +// attempt failed and not a single stream was ever established within +// failFastWindow. Benchmarker.Run aborts the whole run when a driver +// surfaces an error wrapping this, so a harness can classify the cell as +// did-not-finish instead of burning the full duration against a dead port. +var ErrNeverConnected = errors.New("no stream ever established") + +// connectBackoff is per-connection (or per-worker) retry pacing state. +// Not safe for concurrent use — each instance is owned by one goroutine. +type connectBackoff struct { + next time.Duration +} + +// sleep blocks for the current backoff interval (jittered down to half) and +// doubles the interval for the following call, capped at +// reconnectBackoffMax. It returns early — reporting false — when ctx is +// cancelled or abort is closed, so Close() and run shutdown are never stuck +// behind a sleep. A nil abort channel is valid and never fires. +func (b *connectBackoff) sleep(ctx context.Context, abort <-chan struct{}) bool { + d := b.next + if d <= 0 { + d = reconnectBackoffMin + } + b.next = min(d*2, reconnectBackoffMax) + + d = d/2 + rand.N(d/2+1) // uniform in [d/2, d] + t := time.NewTimer(d) + defer t.Stop() + select { + case <-ctx.Done(): + return false + case <-abort: + return false + case <-t.C: + return true + } +} + +// reset returns the backoff to its initial interval after a successful +// connect. +func (b *connectBackoff) reset() { b.next = 0 } + +// failFastTracker decides when a streaming client that has NEVER had a live +// stream should give up on the whole run. One instance is shared by all +// workers of a client; any single established stream disarms it permanently. +type failFastTracker struct { + window time.Duration + + mu sync.Mutex + everConnected bool + failingSince time.Time // zero when there is no active failure streak +} + +func newFailFastTracker(window time.Duration) *failFastTracker { + return &failFastTracker{window: window} +} + +// success records an established stream; fail-fast is permanently disarmed. +func (f *failFastTracker) success() { + f.mu.Lock() + f.everConnected = true + f.failingSince = time.Time{} + f.mu.Unlock() +} + +// failure records a connect failure observed at now. When the client never +// had a single live stream and the failure streak spans the configured +// window, it returns a fatal error wrapping both err and ErrNeverConnected; +// otherwise nil (caller backs off and retries). +func (f *failFastTracker) failure(now time.Time, err error) error { + f.mu.Lock() + defer f.mu.Unlock() + if f.everConnected { + return nil + } + if f.failingSince.IsZero() { + f.failingSince = now + return nil + } + if now.Sub(f.failingSince) < f.window { + return nil + } + // Message shape mirrors the h1client pre-dial failure + // ("loadgen: dial: ...: connection refused") so harness-side + // classification treats the two identically. + return fmt.Errorf("loadgen: dial: %w (fail-fast: %w within %v)", err, ErrNeverConnected, f.window) +} diff --git a/backoff_test.go b/backoff_test.go new file mode 100644 index 0000000..a00b2f1 --- /dev/null +++ b/backoff_test.go @@ -0,0 +1,132 @@ +package loadgen + +import ( + "context" + "errors" + "strings" + "testing" + "time" +) + +// TestConnectBackoffDoublesAndCaps walks the ladder with a pre-cancelled +// context so no real sleeping happens — only the interval progression is +// under test: 10ms doubling per call, capped at 1s. +func TestConnectBackoffDoublesAndCaps(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + var bo connectBackoff + want := []time.Duration{ + 20 * time.Millisecond, 40 * time.Millisecond, 80 * time.Millisecond, + 160 * time.Millisecond, 320 * time.Millisecond, 640 * time.Millisecond, + time.Second, time.Second, time.Second, + } + for i, w := range want { + if bo.sleep(ctx, nil) { + t.Fatalf("call %d: sleep returned true with cancelled ctx", i) + } + if bo.next != w { + t.Fatalf("call %d: next = %v, want %v", i, bo.next, w) + } + } + + bo.reset() + if bo.next != 0 { + t.Fatalf("after reset: next = %v, want 0", bo.next) + } +} + +// TestConnectBackoffSleepsJittered pins the interval and checks the actual +// sleep lands in the jitter band [d/2, d] (with CI-noise headroom above). +func TestConnectBackoffSleepsJittered(t *testing.T) { + bo := connectBackoff{next: 80 * time.Millisecond} + start := time.Now() + if !bo.sleep(context.Background(), nil) { + t.Fatal("sleep aborted without cancellation") + } + elapsed := time.Since(start) + if elapsed < 40*time.Millisecond { + t.Fatalf("slept %v, want >= 40ms (lower jitter bound)", elapsed) + } + if elapsed > 300*time.Millisecond { + t.Fatalf("slept %v, want ~<= 80ms (upper jitter bound + CI noise)", elapsed) + } +} + +// TestConnectBackoffAbort verifies both abort paths — context cancellation +// and the abort channel (Close()) — cut a long sleep short. +func TestConnectBackoffAbort(t *testing.T) { + t.Run("context", func(st *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) + defer cancel() + bo := connectBackoff{next: time.Second} + start := time.Now() + if bo.sleep(ctx, nil) { + st.Fatal("sleep completed despite ctx cancellation") + } + if elapsed := time.Since(start); elapsed > 400*time.Millisecond { + st.Fatalf("aborted sleep took %v, want well under the 0.5-1s interval", elapsed) + } + }) + + t.Run("abort channel", func(st *testing.T) { + abort := make(chan struct{}) + go func() { + time.Sleep(20 * time.Millisecond) + close(abort) + }() + bo := connectBackoff{next: time.Second} + start := time.Now() + if bo.sleep(context.Background(), abort) { + st.Fatal("sleep completed despite abort channel close") + } + if elapsed := time.Since(start); elapsed > 400*time.Millisecond { + st.Fatalf("aborted sleep took %v, want well under the 0.5-1s interval", elapsed) + } + }) +} + +func TestFailFastTracker(t *testing.T) { + t0 := time.Now() + dialErr := errors.New("dial tcp 127.0.0.1:1: connect: connection refused") + + t.Run("trips after window with no success", func(st *testing.T) { + f := newFailFastTracker(5 * time.Second) + if err := f.failure(t0, dialErr); err != nil { + st.Fatalf("first failure must arm, not trip: %v", err) + } + if err := f.failure(t0.Add(4*time.Second), dialErr); err != nil { + st.Fatalf("failure inside window must not trip: %v", err) + } + err := f.failure(t0.Add(5*time.Second), dialErr) + if err == nil { + st.Fatal("expected fatal error once the streak spans the window") + } + if !errors.Is(err, ErrNeverConnected) { + st.Errorf("fatal error must wrap ErrNeverConnected: %v", err) + } + if !errors.Is(err, dialErr) { + st.Errorf("fatal error must wrap the last dial error: %v", err) + } + // Shape must match the h1client pre-dial failure so the harness + // classifies both as dnf. + if !strings.Contains(err.Error(), "loadgen: dial: ") || + !strings.Contains(err.Error(), "connection refused") { + st.Errorf("fatal error shape mismatch: %q", err.Error()) + } + }) + + t.Run("any success disarms permanently", func(st *testing.T) { + f := newFailFastTracker(5 * time.Second) + if err := f.failure(t0, dialErr); err != nil { + st.Fatal(err) + } + f.success() + if err := f.failure(t0.Add(time.Hour), dialErr); err != nil { + st.Fatalf("post-success failure streak must never trip: %v", err) + } + if err := f.failure(t0.Add(2*time.Hour), dialErr); err != nil { + st.Fatalf("post-success failure streak must never trip: %v", err) + } + }) +} diff --git a/bench.go b/bench.go index 1e32592..4a97ef6 100644 --- a/bench.go +++ b/bench.go @@ -70,9 +70,13 @@ type Config struct { Workers int // Warmup is the warmup duration before the measured benchmark begins. - // During warmup, 75% of workers send requests to warm connection pools - // and give the server a realistic load preview. Set to 0 to skip. - // Default: 5s. + // Warmup runs the full worker set so every connection carries traffic + // before measurement starts. In saturation mode (Rate == 0) warmup is + // a calibration phase: the workers it starts keep running across the + // warmup→measure boundary, so the measured window opens at the + // already-converged steady state instead of stepping past the knee at + // t=0. Warmup-phase outcomes are reported on Result.Warmup. Set to 0 + // to skip. Default: 5s. Warmup time.Duration // DisableKeepAlive disables HTTP keep-alive (Connection: close mode). @@ -326,14 +330,71 @@ type Benchmarker struct { // Requests and bytesRead are tracked per-shard in latencies. errors atomic.Int64 - // Latency tracking (also holds per-shard request/bytes counters) - latencies *ShardedLatencyRecorder + // errorsBase is the value of errors at the warmup→measure boundary. + // Measured-phase errors are reported as errors − errorsBase instead + // of resetting the counter, because saturation-mode workers keep + // running across the boundary and a concurrent Add could be lost to + // a racing Store(0). Written once by Run before the timeseries + // ticker starts; read by the ticker and buildResult afterwards. + errorsBase int64 + + // Latency tracking (also holds per-shard request/bytes counters). + // Held behind an atomic pointer because the saturation-mode + // warmup→measure handoff swaps in a fresh recorder while workers are + // still in flight (see Run). Hot-path cost is a single atomic + // pointer load per recorded request — negligible next to the + // windowMu acquisition RecordSuccess already performs. + latencies atomic.Pointer[ShardedLatencyRecorder] + + // warmupRec retains the recorder that accumulated warmup traffic + // after the handoff swap. Per-shard local counters that were still + // unflushed at the boundary stay stranded inside it until the final + // single-threaded FlushLocal at end-of-run, which makes the warmup + // request count exact. Nil when Warmup == 0. + warmupRec *ShardedLatencyRecorder + + // flushInterval mirrors the recorder flush cadence chosen in New so + // the measured-phase recorder created at the warmup boundary matches + // the warmup one. + flushInterval int64 + + // Fatal abort: the first ErrNeverConnected-wrapped failure a worker + // observes lands in fatalErr and closes fatalCh, so Run/warmup stop + // waiting out the clock instead of burning the full duration against + // a dead target. + fatalOnce sync.Once + fatalErr error + fatalCh chan struct{} + + // warmupStats is captured at the end of the warmup phase (before + // counters reset) and surfaced on Result.Warmup. + warmupStats *WarmupStats // Control running atomic.Bool wg sync.WaitGroup } +// recordFatal stores the first fatal error and signals Run/warmup to stop +// waiting out the clock. Later calls are no-ops. +func (b *Benchmarker) recordFatal(err error) { + b.fatalOnce.Do(func() { + b.fatalErr = err + close(b.fatalCh) + }) +} + +// fatalError returns the recorded fatal error, or nil. The channel-close +// check gives the necessary happens-before edge for reading fatalErr. +func (b *Benchmarker) fatalError() error { + select { + case <-b.fatalCh: + return b.fatalErr + default: + return nil + } +} + // parseURL extracts host, port, path, and scheme from a raw URL. func parseURL(rawURL string) (host, port, path, scheme string, err error) { u, err := url.Parse(rawURL) @@ -407,11 +468,14 @@ func New(cfg Config) (*Benchmarker, error) { // Use custom client if provided if cfg.Client != nil { - return &Benchmarker{ - config: cfg, - raw: cfg.Client, - latencies: NewShardedLatencyRecorder(cfg.Workers, flushInterval), - }, nil + b := &Benchmarker{ + config: cfg, + raw: cfg.Client, + flushInterval: flushInterval, + fatalCh: make(chan struct{}), + } + b.latencies.Store(NewShardedLatencyRecorder(cfg.Workers, flushInterval)) + return b, nil } host, port, path, scheme, err := parseURL(cfg.URL) @@ -450,10 +514,12 @@ func New(cfg Config) (*Benchmarker, error) { } b := &Benchmarker{ - config: cfg, - raw: raw, - latencies: NewShardedLatencyRecorder(cfg.Workers, flushInterval), + config: cfg, + raw: raw, + flushInterval: flushInterval, + fatalCh: make(chan struct{}), } + b.latencies.Store(NewShardedLatencyRecorder(cfg.Workers, flushInterval)) if mc, ok := raw.(*mixClient); ok { b.mix = mc } @@ -469,22 +535,44 @@ func New(cfg Config) (*Benchmarker, error) { // outcome. Federation is best-effort: failure to reach the peer is // recorded on Result.Federation but does not fail the whole run. func (b *Benchmarker) Run(ctx context.Context) (*Result, error) { + // Scope the global connect-error counter to this run. + connectErrorsCounter.Store(0) + + // Saturation mode hands the warmup workers over to the measured + // window without stopping them: the same goroutines (and therefore + // the same closed-loop concurrency, connection set, and arrival + // phase) carry straight across the boundary, so the measured window + // opens at the steady state warmup converged to. Rated mode keeps + // its stop/start phases because the measured window swaps in the + // rate scheduler + ratedWorker loop. + continuous := b.config.Rate <= 0 && b.config.Warmup > 0 + + // Create a scoped context for this benchmark run. Workers use this context + // so their in-flight HTTP requests are cancelled when the benchmark ends, + // preventing wg.Wait() from hanging if the server stops responding. + // Allow Warmup + Duration + 60s for in-flight requests to drain. + runCtx, runCancel := context.WithTimeout(ctx, b.config.Warmup+b.config.Duration+60*time.Second) + defer runCancel() + // Warmup phase if b.config.Warmup > 0 { - b.warmup(ctx) - } - - // Reset metrics for actual benchmark - b.errors.Store(0) - b.latencies.Reset() - if b.mix != nil { - // Clear per-protocol counters accumulated during warmup. - b.mix.h1Requests.Store(0) - b.mix.h2Requests.Store(0) - b.mix.upgradeRequests.Store(0) - b.mix.h1Errors.Store(0) - b.mix.h2Errors.Store(0) - b.mix.upgradeErrors.Store(0) + if continuous { + b.warmupHot(ctx, runCtx) + } else { + b.warmup(ctx) + } + if ferr := b.fatalError(); ferr != nil { + // Continuous warmup leaves workers running; tear them down + // before surfacing the abort (rated warmup already drained + // its own). + if continuous { + b.running.Store(false) + runCancel() + b.raw.Close() + b.wg.Wait() + } + return nil, ferr + } } // Federation: dial the peer before workers spin up so the start @@ -529,13 +617,6 @@ func (b *Benchmarker) Run(ctx context.Context) (*Result, error) { selfCPU.Start() } - // Create a scoped context for this benchmark run. Workers use this context - // so their in-flight HTTP requests are cancelled when the benchmark ends, - // preventing wg.Wait() from hanging if the server stops responding. - // Allow Duration + 60s for in-flight requests to drain. - runCtx, runCancel := context.WithTimeout(ctx, b.config.Duration+60*time.Second) - defer runCancel() - // Per-socket recv-Q probe (Linux only). The probe is best-effort: when // it cannot read /proc/net/tcp it stays silent and RecvQHigh remains // false. Caller filters by loadgen's open socket inodes. @@ -545,12 +626,40 @@ func (b *Benchmarker) Run(ctx context.Context) (*Result, error) { recvqProbe.Start(runCtx) } + // Warmup→measure handoff: snapshot warmup outcomes, then swap in a + // fresh recorder so measured percentiles and counters start clean. + // In continuous (saturation) mode workers fly straight through the + // swap — the measured window begins at the already-converged steady + // rate with no concurrency step, no cold connections, and no + // phase-aligned restart burst. The warmup request count is finalised + // at end-of-run (see below) so per-shard local counters that are + // still unflushed here are not lost. + if b.config.Warmup > 0 { + b.warmupRec = b.latencies.Load() + b.errorsBase = b.errors.Load() + b.warmupStats = &WarmupStats{ + Errors: b.errorsBase, + ConnectErrors: snapshotConnectErrors(), + } + b.latencies.Store(NewShardedLatencyRecorder(b.config.Workers, b.flushInterval)) + if b.mix != nil { + b.mix.markMeasureStart() + } + } + // Timeseries collection: 1-second snapshots var timeseries []TimeseriesPoint var prevReqs int64 - var prevErrors int64 + prevErrors := b.errorsBase + var prevConnErrors uint64 - // Start workers — each gets a unique workerID for connection partitioning + // The measured-phase recorder: stable from here on, so the ticker + // and workers may cache the pointer. + rec := b.latencies.Load() + + // Start workers — each gets a unique workerID for connection partitioning. + // In continuous mode the warmup workers are already running and are + // simply adopted by the measured window. b.running.Store(true) start := time.Now() @@ -574,12 +683,14 @@ func (b *Benchmarker) Run(ctx context.Context) (*Result, error) { go b.rateScheduler(runCtx, start, slots, schedDone) } - for i := range b.config.Workers { - workerID := i - if slots != nil { - b.wg.Go(func() { b.ratedWorker(runCtx, workerID, slots) }) - } else { - b.wg.Go(func() { b.worker(runCtx, workerID) }) + if !continuous { + for i := range b.config.Workers { + workerID := i + if slots != nil { + b.wg.Go(func() { b.ratedWorker(runCtx, workerID, slots) }) + } else { + b.wg.Go(func() { b.worker(runCtx, workerID) }) + } } } @@ -592,19 +703,22 @@ func (b *Benchmarker) Run(ctx context.Context) (*Result, error) { for { select { case <-ticker.C: - reqs, _ := b.latencies.Totals() + reqs, _ := rec.Totals() elapsed := time.Since(start).Seconds() deltaReqs := reqs - prevReqs prevReqs = reqs - p99 := b.latencies.SnapshotWindowP99Ms() + p99 := rec.SnapshotWindowP99Ms() curErr := b.errors.Load() + curConnErr := connectErrorsCounter.Load() timeseries = append(timeseries, TimeseriesPoint{ TimestampSec: elapsed, RequestsPerSec: float64(deltaReqs), // 1-second window P99Ms: p99, Errors: curErr - prevErrors, + ConnectErrors: int64(curConnErr - prevConnErrors), }) prevErrors = curErr + prevConnErrors = curConnErr if b.config.OnProgress != nil { snapshot := Result{ Requests: reqs, @@ -619,10 +733,11 @@ func (b *Benchmarker) Run(ctx context.Context) (*Result, error) { } }() - // Wait for duration + // Wait for duration — or a fatal fail-fast abort from a worker. select { case <-ctx.Done(): case <-time.After(b.config.Duration): + case <-b.fatalCh: } b.running.Store(false) @@ -649,7 +764,15 @@ func (b *Benchmarker) Run(ctx context.Context) (*Result, error) { elapsed := time.Since(start) // Flush unflushed local counters to atomics now that workers have stopped. - b.latencies.FlushLocal() + rec.FlushLocal() + + // Finalise the warmup request count: locals that were unflushed at + // the handoff stayed stranded in the warmup recorder; flushing them + // now (single-threaded) makes Result.Warmup.Requests exact. + if b.warmupRec != nil && b.warmupStats != nil { + b.warmupRec.FlushLocal() + b.warmupStats.Requests, _ = b.warmupRec.Totals() + } result := b.buildResult(elapsed) result.ClientCPUPercent = cpuMon.Stop() @@ -661,9 +784,19 @@ func (b *Benchmarker) Run(ctx context.Context) (*Result, error) { result.RecvQHigh = recvqProbe.Stop() } + // A fatal fail-fast abort overrides the (empty) partial result so the + // caller can classify the run as did-not-finish rather than reading a + // zero-request Result as "ran clean with no traffic". + if ferr := b.fatalError(); ferr != nil { + if fed != nil { + _ = fed.Close() + } + return nil, ferr + } + // Collect peer histogram if federation is active. if fed != nil { - local := b.latencies.MergedHistogram() + local := rec.MergedHistogram() peerReqs, peerErrs, merged, err := fed.CollectResult(local) _ = fed.Close() if err != nil { @@ -688,30 +821,75 @@ func (b *Benchmarker) Run(ctx context.Context) (*Result, error) { return result, nil } +// warmup is the stop/start warmup used by rated mode: closed-loop workers +// run for the warmup window, then drain so the measured window can swap in +// the rate scheduler + ratedWorker loop. Saturation mode uses warmupHot +// instead, which keeps its workers running across the boundary. +// +// The full worker set runs so every connection in the pool carries traffic +// before measurement begins. The old 75% heuristic left a quarter of the +// keep-alive conns idle through warmup; their first-ever requests then +// landed inside the measured window, after the server had been free to +// idle-expire them. func (b *Benchmarker) warmup(ctx context.Context) { warmupCtx, cancel := context.WithTimeout(ctx, b.config.Warmup) defer cancel() b.running.Store(true) - // Use 75% of workers for warmup to properly warm up connection pools - // and give the server a realistic preview of the load - warmupWorkers := max((b.config.Workers*3)/4, 4) - // Don't exceed actual worker count - if warmupWorkers > b.config.Workers { - warmupWorkers = b.config.Workers - } - - for i := range warmupWorkers { + for i := range b.config.Workers { workerID := i b.wg.Go(func() { b.worker(warmupCtx, workerID) }) } - <-warmupCtx.Done() + // A fatal fail-fast abort cuts the warmup short; Run surfaces the + // error after capturing warmup stats. + select { + case <-warmupCtx.Done(): + case <-b.fatalCh: + } b.running.Store(false) + cancel() // unblock workers stuck in dials or backoff sleeps b.wg.Wait() } +// warmupHot runs the calibration warmup for saturation mode: the full +// worker set starts immediately — the same closed-loop concurrency the +// measured window uses — and is left RUNNING when this returns. Knee +// discovery happens here: the aggressive initial approach (simultaneous +// first requests, server backpressure while it absorbs the burst) plays +// out during warmup, where any errors land in Result.Warmup. By the time +// the measured window opens, the loop has self-paced to the knee and +// carries across the boundary with no concurrency step, no cold +// connections, and no phase-aligned restart burst — the v3.9 repro showed +// every saturation cell's errors confined to the first measured second +// because warmup drove only 75% of the workers and the boundary +// stopped+restarted them. +// +// If the warmup window is too short for full convergence, the measured +// window simply continues the same closed-loop ramp from below — the +// searcher still reaches the true max inside the window, without a step +// that overshoots the knee. +// +// ctx is the caller's context (aborts the wait); runCtx is the +// whole-run context handed to the workers, since they outlive warmup. +func (b *Benchmarker) warmupHot(ctx context.Context, runCtx context.Context) { + b.running.Store(true) + + for i := range b.config.Workers { + workerID := i + b.wg.Go(func() { b.worker(runCtx, workerID) }) + } + + t := time.NewTimer(b.config.Warmup) + defer t.Stop() + select { + case <-ctx.Done(): + case <-t.C: + case <-b.fatalCh: + } +} + // rateScheduler emits intended-dispatch timestamps at the configured Rate. // It closes slots when the run context is cancelled or when running flips // false, so ratedWorker goroutines can drop out cleanly. @@ -801,13 +979,17 @@ func (b *Benchmarker) ratedWorker(ctx context.Context, workerID int, slots <-cha if !b.running.Load() && ctx.Err() != nil { return } + if errors.Is(err, ErrNeverConnected) { + b.recordFatal(err) + return + } b.errors.Add(1) if b.mix != nil { b.mix.recordError(workerID) } continue } - b.latencies.RecordSuccess(workerID, latency, bytesRead) + b.latencies.Load().RecordSuccess(workerID, latency, bytesRead) } } } @@ -849,19 +1031,32 @@ func (b *Benchmarker) worker(ctx context.Context, workerID int) { if !b.running.Load() && ctx.Err() != nil { return } + // A fail-fast abort from a streaming driver kills the whole + // run; the attempts leading up to it are already counted. + if errors.Is(err, ErrNeverConnected) { + b.recordFatal(err) + return + } b.errors.Add(1) if b.mix != nil { b.mix.recordError(workerID) } } else { - b.latencies.RecordSuccess(workerID, latency, bytesRead) + // The pointer load (not a cached pointer) is deliberate: the + // saturation warmup→measure handoff swaps recorders while this + // loop keeps running, and loading at record time files each + // completion into the phase it finished in. + b.latencies.Load().RecordSuccess(workerID, latency, bytesRead) } } } func (b *Benchmarker) buildResult(elapsed time.Duration) *Result { - reqs, bytesRead := b.latencies.Totals() - errs := b.errors.Load() + rec := b.latencies.Load() + reqs, bytesRead := rec.Totals() + // errorsBase scopes the cumulative counter to the measured window — + // warmup errors are reported separately on Result.Warmup. + errs := b.errors.Load() - b.errorsBase rps := float64(reqs) / elapsed.Seconds() throughput := float64(bytesRead) / elapsed.Seconds() @@ -873,11 +1068,13 @@ func (b *Benchmarker) buildResult(elapsed time.Duration) *Result { Duration: elapsed, RequestsPerSec: rps, ThroughputBPS: throughput, - Latency: b.latencies.Percentiles(), + Latency: rec.Percentiles(), DialRetries: snapshotDialRetries(), + ConnectErrors: snapshotConnectErrors(), + Warmup: b.warmupStats, } - if hist, err := b.latencies.EncodeHistogram(); err == nil { + if hist, err := rec.EncodeHistogram(); err == nil { res.Histogram = hist } diff --git a/bench_handoff_test.go b/bench_handoff_test.go new file mode 100644 index 0000000..34ef465 --- /dev/null +++ b/bench_handoff_test.go @@ -0,0 +1,395 @@ +package loadgen + +import ( + "bufio" + "context" + "net" + "runtime" + "sync" + "sync/atomic" + "testing" + "time" +) + +// goid returns the calling goroutine's id parsed from runtime.Stack. +// Test-only: goroutine ids are never reused by the runtime, which makes +// them a deterministic fingerprint for "did the benchmarker spawn a second +// worker set at the warmup→measure boundary". +func goid() int64 { + var buf [64]byte + n := runtime.Stack(buf[:], false) + // Format: "goroutine 123 [running]:..." + const prefix = len("goroutine ") + var id int64 + for _, c := range buf[prefix:n] { + if c < '0' || c > '9' { + break + } + id = id*10 + int64(c-'0') + } + return id +} + +// handoffClient is a Client that fingerprints every DoRequest call: +// which goroutine issued it, which workerID, and when the worker was +// first seen relative to the run start. +type handoffClient struct { + start time.Time + + mu sync.Mutex + goroutines map[int64]struct{} + firstSeen map[int]time.Duration + + calls atomic.Int64 +} + +func newHandoffClient() *handoffClient { + return &handoffClient{ + start: time.Now(), + goroutines: make(map[int64]struct{}), + firstSeen: make(map[int]time.Duration), + } +} + +func (h *handoffClient) DoRequest(_ context.Context, workerID int) (int, error) { + h.mu.Lock() + h.goroutines[goid()] = struct{}{} + if _, ok := h.firstSeen[workerID]; !ok { + h.firstSeen[workerID] = time.Since(h.start) + } + h.mu.Unlock() + h.calls.Add(1) + time.Sleep(200 * time.Microsecond) + return 2, nil +} + +func (h *handoffClient) Close() {} + +// TestSaturationHandoffContinuity pins the calibrated warmup→measure +// handoff in saturation mode: the commanded rate of an open benchmark is +// its closed-loop concurrency, so "no step discontinuity across the +// boundary" means (a) warmup already runs the full worker set and (b) the +// exact same worker goroutines carry across the boundary instead of being +// stopped and restarted. Before the fix, warmup ran 75% of the workers and +// the boundary respawned everything — the +25% concurrency step plus the +// phase-aligned restart burst put every error of a run into the first +// measured second (v3.9 chain-api repro). +func TestSaturationHandoffContinuity(t *testing.T) { + const workers = 8 + hc := newHandoffClient() + + cfg := Config{ + URL: "http://127.0.0.1:1/", // never dialled: custom Client + Method: "GET", + Duration: 400 * time.Millisecond, + Connections: workers, + Workers: workers, + Warmup: 600 * time.Millisecond, + Client: hc, + CPUMonitor: false, + RecvQProbe: false, + } + b, err := New(cfg) + if err != nil { + t.Fatal(err) + } + result, err := b.Run(context.Background()) + if err != nil { + t.Fatal(err) + } + + hc.mu.Lock() + gor := len(hc.goroutines) + seen := len(hc.firstSeen) + var latestFirst time.Duration + for _, d := range hc.firstSeen { + if d > latestFirst { + latestFirst = d + } + } + hc.mu.Unlock() + + // (a) Full worker set participates in the warmup, and (b) no second + // worker set is spawned at the boundary: exactly `workers` distinct + // goroutines must have issued requests over the whole run. The + // pre-fix behaviour would show warmup (6) + measured (8) = 14. + if gor != workers { + t.Errorf("distinct worker goroutines = %d, want %d (stop/restart at the warmup boundary?)", gor, workers) + } + if seen != workers { + t.Errorf("distinct workerIDs = %d, want %d (warmup did not run the full worker set)", seen, workers) + } + // Every worker must be active early in the warmup — well before the + // boundary at 600ms. 300ms is half the warmup: a worker first seen + // later than that was started by the measured window, i.e. a step. + if latestFirst > 300*time.Millisecond { + t.Errorf("latest worker first seen at %v, want < 300ms (worker joined after warmup start)", latestFirst) + } + + // Accounting across the handoff is exact: every successful request + // lands in exactly one phase, including per-shard local counters + // that were unflushed when the recorder swap happened. + if result.Warmup == nil { + t.Fatal("expected Result.Warmup to be populated") + } + total := hc.calls.Load() + if got := result.Warmup.Requests + result.Requests; got != total { + t.Errorf("warmup(%d) + measured(%d) = %d requests, want exactly %d (client total)", + result.Warmup.Requests, result.Requests, got, total) + } + if result.Warmup.Requests == 0 { + t.Error("expected warmup requests > 0") + } + if result.Requests == 0 { + t.Error("expected measured requests > 0") + } + if result.Errors != 0 || result.Warmup.Errors != 0 { + t.Errorf("unexpected errors: measured=%d warmup=%d", result.Errors, result.Warmup.Errors) + } + t.Logf("handoff: %d goroutines, warmup=%d measured=%d total=%d latestFirstSeen=%v", + gor, result.Warmup.Requests, result.Requests, total, latestFirst) +} + +// TestRatedHandoffKeepsStopStart pins that rated mode keeps its two-phase +// structure: the warmup's closed-loop workers drain at the boundary and a +// fresh ratedWorker set drives the measured window — so the constant-rate +// scheduler semantics are untouched by the saturation handoff change. +func TestRatedHandoffKeepsStopStart(t *testing.T) { + const workers = 4 + hc := newHandoffClient() + + cfg := Config{ + URL: "http://127.0.0.1:1/", + Method: "GET", + Duration: 500 * time.Millisecond, + Connections: workers, + Workers: workers, + Warmup: 300 * time.Millisecond, + Rate: 200, + Client: hc, + CPUMonitor: false, + RecvQProbe: false, + } + b, err := New(cfg) + if err != nil { + t.Fatal(err) + } + result, err := b.Run(context.Background()) + if err != nil { + t.Fatal(err) + } + + hc.mu.Lock() + gor := len(hc.goroutines) + hc.mu.Unlock() + + // Warmup spawns `workers` closed-loop goroutines, the measured window + // spawns `workers` ratedWorker goroutines: 2× distinct ids in total. + if gor != 2*workers { + t.Errorf("distinct worker goroutines = %d, want %d (rated warmup must stop/start at the boundary)", gor, 2*workers) + } + if !result.RatedMode { + t.Error("expected RatedMode result") + } + if result.Warmup == nil || result.Warmup.Requests == 0 { + t.Errorf("expected populated warmup stats, got %+v", result.Warmup) + } + if got := result.Warmup.Requests + result.Requests; got != hc.calls.Load() { + t.Errorf("warmup(%d) + measured(%d) = %d requests, want exactly %d", + result.Warmup.Requests, result.Requests, got, hc.calls.Load()) + } + t.Logf("rated handoff: %d goroutines, warmup=%d measured=%d", gor, result.Warmup.Requests, result.Requests) +} + +// sheddingServer is a raw-TCP HTTP/1.1 server that models the SUT +// behaviours behind the v3.9 t=0 error burst: +// +// - a hard throughput limit: every response waits for a slot from a +// zero-burst pacer (capacity 1), so sustained RPS cannot exceed +// 1/interval no matter how hard the client pushes; +// - cold-start shedding: during the slow-start window after boot, every +// 3rd request gets its connection closed without a response (the +// "server absorbing the initial burst" phase — these errors must land +// in warmup, never in the measured window); +// - idle-connection expiry: a connection idle longer than idleTimeout is +// closed server-side. Before the fix, the 25% of conns that warmup +// never exercised were idle-expired and turned into guaranteed +// measured-window errors at t=0. +type sheddingServer struct { + ln net.Listener + start time.Time + interval int64 // pacer slot width, nanoseconds + slowStart time.Duration + idle time.Duration + + nextSlot atomic.Int64 + reqCount atomic.Int64 + sheds atomic.Int64 +} + +func startSheddingServer(t *testing.T, interval, slowStart, idle time.Duration) *sheddingServer { + t.Helper() + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + s := &sheddingServer{ + ln: ln, + start: time.Now(), + interval: int64(interval), + slowStart: slowStart, + idle: idle, + } + go func() { + for { + conn, err := ln.Accept() + if err != nil { + return + } + go s.serve(conn) + } + }() + t.Cleanup(func() { _ = ln.Close() }) + return s +} + +// acquireSlot blocks until this request's paced slot arrives. Zero-burst: +// the next slot never lags behind "now", so quiet periods do not bank +// tokens that a later burst could spend. +func (s *sheddingServer) acquireSlot() { + for { + now := time.Now().UnixNano() + cur := s.nextSlot.Load() + slot := cur + if now > slot { + slot = now + } + if s.nextSlot.CompareAndSwap(cur, slot+s.interval) { + if d := slot - now; d > 0 { + time.Sleep(time.Duration(d)) + } + return + } + } +} + +var sheddingResponse = []byte("HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK") + +func (s *sheddingServer) serve(conn net.Conn) { + defer func() { _ = conn.Close() }() + br := bufio.NewReaderSize(conn, 4096) + for { + // Idle expiry: kill connections that sit quiet, like a server + // keep-alive timeout would. + _ = conn.SetReadDeadline(time.Now().Add(s.idle)) + // Read one request: header lines until the blank line (GETs only). + seenRequestLine := false + for { + line, err := br.ReadSlice('\n') + if err != nil { + return // idle timeout, client close, or mid-request error + } + if len(line) <= 2 { + break + } + seenRequestLine = true + } + if !seenRequestLine { + return + } + // Cold-start shedding: while the server "boots", every 3rd + // request gets its conn dropped instead of a response. + if time.Since(s.start) < s.slowStart && s.reqCount.Add(1)%3 == 0 { + s.sheds.Add(1) + return + } + s.acquireSlot() + if _, err := conn.Write(sheddingResponse); err != nil { + return + } + } +} + +// TestSaturationHandoffZeroMeasuredErrors is the loopback integration test +// for the calibrated handoff: against a rate-limited server that sheds +// during its cold start and expires idle connections, the warmup phase may +// record errors (and reports them on Result.Warmup — honest calibration), +// but the measured window must be structurally clean AND must converge to +// the server's throughput limit rather than understating it. +func TestSaturationHandoffZeroMeasuredErrors(t *testing.T) { + const ( + workers = 16 + slotInterval = 125 * time.Microsecond // 8000 req/s server-side limit + slowStart = 250 * time.Millisecond + idleTimeout = 400 * time.Millisecond + ) + srv := startSheddingServer(t, slotInterval, slowStart, idleTimeout) + + cfg := Config{ + URL: "http://" + srv.ln.Addr().String() + "/", + Method: "GET", + Duration: 1200 * time.Millisecond, + Connections: workers, + Workers: workers, + Warmup: 1200 * time.Millisecond, + CPUMonitor: false, + RecvQProbe: false, + } + b, err := New(cfg) + if err != nil { + t.Fatal(err) + } + result, err := b.Run(context.Background()) + if err != nil { + t.Fatal(err) + } + if result.Warmup == nil { + t.Fatal("expected warmup stats") + } + + // The requirement: measured-window errors are structurally zero. + // Cold-start sheds happen inside the warmup; no connection ever goes + // idle long enough to be expired because the warmup exercises the + // full pool and the handoff never stops the workers. + if result.Errors != 0 { + t.Errorf("measured-window errors = %d, want 0 (calibration burst leaked past the warmup boundary)", result.Errors) + } + for _, p := range result.Timeseries { + if p.Errors != 0 { + t.Errorf("timeseries bucket at t=%.1fs has %d errors, want 0", p.TimestampSec, p.Errors) + } + } + + // The shedding really happened — and landed in warmup where it is + // reported, not discarded. + if srv.sheds.Load() == 0 { + t.Error("server shed no requests — slow-start window never exercised, test is vacuous") + } + if result.Warmup.Errors == 0 { + t.Error("expected warmup errors > 0 from cold-start shedding (honest calibration reporting)") + } + + // Convergence: the measured window must track the server's limit, not + // re-discover it. Upper bound: the pacer is zero-burst, so measured + // RPS cannot exceed the limit by more than scheduling noise. Lower + // bound: half the limit is far above what a botched handoff that + // understates the knee would produce, while staying robust to slow CI. + limit := float64(time.Second) / float64(slotInterval) + if result.RequestsPerSec > 1.15*limit { + t.Errorf("measured RPS %.0f exceeds server limit %.0f — pacer bypassed?", result.RequestsPerSec, limit) + } + if result.RequestsPerSec < 0.5*limit { + t.Errorf("measured RPS %.0f below 50%% of server limit %.0f — max understated", result.RequestsPerSec, limit) + } + + // No understatement across the handoff: the measured window must be + // at least as fast as the warmup average (which includes the slow + // cold-start convergence). + warmupAvg := float64(result.Warmup.Requests) / cfg.Warmup.Seconds() + if result.RequestsPerSec < 0.9*warmupAvg { + t.Errorf("measured RPS %.0f < 90%% of warmup average %.0f — handoff lost the calibrated rate", result.RequestsPerSec, warmupAvg) + } + + t.Logf("shedding server: warmup=%d reqs / %d errors, measured=%.0f RPS (limit %.0f), sheds=%d", + result.Warmup.Requests, result.Warmup.Errors, result.RequestsPerSec, limit, srv.sheds.Load()) +} diff --git a/benchmarker_test.go b/benchmarker_test.go index 0bf948a..c087df7 100644 --- a/benchmarker_test.go +++ b/benchmarker_test.go @@ -2,6 +2,7 @@ package loadgen import ( "context" + "errors" "fmt" "net" "net/http" @@ -95,6 +96,156 @@ func TestBenchmarkerWarmup(t *testing.T) { t.Logf("warmup: elapsed=%v, result.Duration=%v, requests=%d", elapsed, result.Duration, result.Requests) } +func TestBenchmarkerWarmupStats(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(200) + _, _ = w.Write([]byte("OK")) + })) + defer srv.Close() + + cfg := Config{ + URL: srv.URL, + Method: "GET", + Duration: 500 * time.Millisecond, + Connections: 2, + Workers: 2, + Warmup: 500 * time.Millisecond, + CPUMonitor: false, + RecvQProbe: false, + } + b, err := New(cfg) + if err != nil { + t.Fatal(err) + } + result, err := b.Run(context.Background()) + if err != nil { + t.Fatal(err) + } + + if result.Warmup == nil { + t.Fatal("expected Result.Warmup to be populated when Warmup > 0") + } + if result.Warmup.Requests == 0 { + t.Error("expected warmup requests > 0 against a healthy server") + } + if result.Warmup.Errors != 0 { + t.Errorf("unexpected warmup errors: %d", result.Warmup.Errors) + } + // Warmup traffic must not leak into the measured totals. + if result.Requests == 0 { + t.Error("expected measured requests > 0") + } + t.Logf("warmup stats: %d ok, %d errors", result.Warmup.Requests, result.Warmup.Errors) +} + +func TestBenchmarkerNoWarmupNoStats(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(200) + })) + defer srv.Close() + + cfg := Config{ + URL: srv.URL, + Method: "GET", + Duration: 300 * time.Millisecond, + Connections: 1, + Workers: 1, + Warmup: 0, + CPUMonitor: false, + RecvQProbe: false, + } + b, err := New(cfg) + if err != nil { + t.Fatal(err) + } + result, err := b.Run(context.Background()) + if err != nil { + t.Fatal(err) + } + if result.Warmup != nil { + t.Errorf("expected nil Result.Warmup when Warmup == 0, got %+v", result.Warmup) + } +} + +// fatalClient returns an ErrNeverConnected-wrapped error from every +// DoRequest, simulating a streaming driver whose fail-fast tripped. +type fatalClient struct{} + +func (f *fatalClient) DoRequest(_ context.Context, _ int) (int, error) { + return 0, fmt.Errorf("loadgen: dial: dial tcp 127.0.0.1:1: connect: connection refused (fail-fast: %w within 5s)", ErrNeverConnected) +} + +func (f *fatalClient) Close() {} + +// TestBenchmarkerFatalAbortsRun verifies a worker surfacing an +// ErrNeverConnected error aborts the run immediately instead of waiting out +// the full Duration, and that Run reports the fatal error. +func TestBenchmarkerFatalAbortsRun(t *testing.T) { + cfg := Config{ + URL: "http://127.0.0.1:1/", + Method: "GET", + Duration: 30 * time.Second, // must NOT be waited out + Connections: 2, + Workers: 2, + Warmup: 0, + Client: &fatalClient{}, + CPUMonitor: false, + RecvQProbe: false, + } + b, err := New(cfg) + if err != nil { + t.Fatal(err) + } + + start := time.Now() + result, err := b.Run(context.Background()) + elapsed := time.Since(start) + + if err == nil { + t.Fatal("expected Run to return the fatal error") + } + if !errors.Is(err, ErrNeverConnected) { + t.Errorf("Run error must wrap ErrNeverConnected: %v", err) + } + if result != nil { + t.Errorf("expected nil result on fatal abort, got %+v", result) + } + if elapsed > 10*time.Second { + t.Errorf("fatal abort took %v — Run waited out the duration instead of aborting", elapsed) + } +} + +// TestBenchmarkerFatalAbortsWarmup verifies a fatal error during the warmup +// phase aborts the run before the measured phase even starts. +func TestBenchmarkerFatalAbortsWarmup(t *testing.T) { + cfg := Config{ + URL: "http://127.0.0.1:1/", + Method: "GET", + Duration: 30 * time.Second, + Connections: 2, + Workers: 2, + Warmup: 30 * time.Second, // must NOT be waited out either + Client: &fatalClient{}, + CPUMonitor: false, + RecvQProbe: false, + } + b, err := New(cfg) + if err != nil { + t.Fatal(err) + } + + start := time.Now() + _, err = b.Run(context.Background()) + elapsed := time.Since(start) + + if err == nil || !errors.Is(err, ErrNeverConnected) { + t.Fatalf("expected ErrNeverConnected from warmup abort, got: %v", err) + } + if elapsed > 10*time.Second { + t.Errorf("warmup fatal abort took %v — warmup was waited out instead of aborting", elapsed) + } +} + func TestBenchmarkerContextCancellation(t *testing.T) { srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(200) diff --git a/cmd/loadgen/main.go b/cmd/loadgen/main.go index deb2a1d..6561ac9 100644 --- a/cmd/loadgen/main.go +++ b/cmd/loadgen/main.go @@ -185,6 +185,13 @@ func main() { log.Fatalf("benchmark failed: %v", err) } + if result.Warmup != nil { + fmt.Fprintf(os.Stderr, "warmup: %d ok, %d errors (%d connect)\n", + result.Warmup.Requests, result.Warmup.Errors, result.Warmup.ConnectErrors) + if result.Warmup.Requests == 0 && result.Warmup.Errors > 0 { + fmt.Fprintln(os.Stderr, "warmup: WARNING — no request succeeded during warmup; the target may be down") + } + } if result.Upgrade != nil { // UpgradeAttempted == UpgradeSucceeded by construction today: // newH2ClientWithDialer fails the whole run on the first dial error, diff --git a/dial.go b/dial.go index 192d7c8..4219b59 100644 --- a/dial.go +++ b/dial.go @@ -16,6 +16,19 @@ import ( // the run completes. var dialRetriesCounter atomic.Uint64 +// connectErrorsCounter counts dial/handshake failures (TCP connect, TLS, +// WS/SSE upgrade, H1 reconnect dials) as their own error class, separate +// from the per-request errors total. Process-global for the same reason as +// dialRetriesCounter; drained into Result.ConnectErrors / WarmupStats via +// Swap at the end of each phase. +var connectErrorsCounter atomic.Uint64 + +func recordConnectError() { connectErrorsCounter.Add(1) } + +// snapshotConnectErrors returns the current connect-error count and resets +// the global counter, scoping the value to the phase that just ended. +func snapshotConnectErrors() uint64 { return connectErrorsCounter.Swap(0) } + // maxDialAttempts is the total number of TCP SYN attempts per dial, // including the first. Only transient SYN-RST responses trigger a // retry; every other error fails fast. diff --git a/h1client.go b/h1client.go index 1108cfb..30971d1 100644 --- a/h1client.go +++ b/h1client.go @@ -49,6 +49,11 @@ type h1Conn struct { writeBufferSize int scheme string tlsConfig *tls.Config + + // backoff paces reconnect attempts after a failed redial. Only the + // owning worker touches it (reconnect runs on the worker goroutine), + // so it needs no mu. + backoff connectBackoff } // newH1Client creates a new zero-alloc HTTP/1.1 client. @@ -223,8 +228,15 @@ func (c *h1Client) DoRequest(ctx context.Context, workerID int) (int, error) { } // Reconnect and retry once if reconnErr := hc.reconnect(); reconnErr != nil { + recordConnectError() + // Server-death guard: pace this conn's next attempt with + // exponential backoff (ctx-abortable) so a dead server cannot + // induce a redial hot loop (v3.8 crash cell: 33.1M dial errors + // at ~370k/s). Reset below on the next successful reconnect. + hc.backoff.sleep(ctx, nil) return 0, fmt.Errorf("h1client: conn[%d] reconnect failed: %w", connIdx, reconnErr) } + hc.backoff.reset() if _, err = hc.conn.Write(c.reqBuf); err != nil { return 0, fmt.Errorf("h1client: conn[%d] write after reconnect: %w", connIdx, err) } diff --git a/h1client_test.go b/h1client_test.go index 719be33..e8e9233 100644 --- a/h1client_test.go +++ b/h1client_test.go @@ -531,3 +531,128 @@ func TestH1MultipleWorkersConnectionIsolation(t *testing.T) { t.Error(err) } } + +// startKillableH1Server is startH1Server plus a kill function that tears +// down the listener AND every accepted conn, simulating server death +// (plain cleanup() leaves established keep-alive conns serving). +func startKillableH1Server(t *testing.T) (host, port string, kill func()) { + t.Helper() + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + var mu sync.Mutex + var conns []net.Conn + done := make(chan struct{}) + go func() { + defer close(done) + for { + conn, err := ln.Accept() + if err != nil { + return + } + mu.Lock() + conns = append(conns, conn) + mu.Unlock() + go func() { + reader := bufio.NewReader(conn) + for { + if !readH1Request(reader) { + return + } + resp := "HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK" + if _, err := conn.Write([]byte(resp)); err != nil { + return + } + } + }() + } + }() + h, p, _ := net.SplitHostPort(ln.Addr().String()) + return h, p, func() { + _ = ln.Close() + <-done + mu.Lock() + defer mu.Unlock() + for _, conn := range conns { + _ = conn.Close() + } + } +} + +// TestH1ReconnectBackoffAfterServerDeath verifies the mid-cell +// reconnect-after-server-death path is backoff-paced (v3.8 crash cell: +// 33.1M dial errors at ~370k/s) and lands in the connect-error class. +func TestH1ReconnectBackoffAfterServerDeath(t *testing.T) { + host, port, kill := startKillableH1Server(t) + + client, err := newH1Client(host, port, "/", testH1Cfg(true, 1)) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + if _, err := client.DoRequest(context.Background(), 0); err != nil { + t.Fatalf("priming request: %v", err) + } + + before := connectErrorsCounter.Swap(0) + defer connectErrorsCounter.Add(before) + + kill() // server dies mid-cell; port now refuses + + deadline := time.Now().Add(700 * time.Millisecond) + errCount := 0 + for time.Now().Before(deadline) { + if _, err := client.DoRequest(context.Background(), 0); err != nil { + errCount++ + } + } + if errCount == 0 { + t.Fatal("expected reconnect errors after server death") + } + // 10ms-doubling backoff allows roughly 7 reconnect attempts in 700ms; + // a hot loop would produce hundreds of thousands. + if errCount > 100 { + t.Errorf("errCount = %d in 700ms — reconnects are not backoff-paced", errCount) + } + if n := connectErrorsCounter.Swap(0); n == 0 { + t.Error("expected reconnect dial failures in the connect-error class") + } + if next := client.conns[0].backoff.next; next <= reconnectBackoffMin { + t.Errorf("backoff.next = %v after repeated failures, want > %v", next, reconnectBackoffMin) + } + t.Logf("reconnect errors in 700ms: %d", errCount) +} + +// TestH1ReconnectBackoffCtxAbort verifies ctx cancellation cuts a pending +// reconnect-backoff sleep short instead of holding the worker hostage. +func TestH1ReconnectBackoffCtxAbort(t *testing.T) { + host, port, kill := startKillableH1Server(t) + + client, err := newH1Client(host, port, "/", testH1Cfg(true, 1)) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + if _, err := client.DoRequest(context.Background(), 0); err != nil { + t.Fatalf("priming request: %v", err) + } + kill() + + // Force the next failure onto a long backoff sleep, then cancel. + client.conns[0].backoff.next = 2 * time.Second + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + + start := time.Now() + _, derr := client.DoRequest(ctx, 0) + elapsed := time.Since(start) + if derr == nil { + t.Fatal("expected an error against a dead server") + } + if elapsed > time.Second { + t.Errorf("DoRequest took %v — ctx cancellation did not abort the backoff sleep", elapsed) + } +} diff --git a/latency.go b/latency.go index 388ec02..4f34c78 100644 --- a/latency.go +++ b/latency.go @@ -234,7 +234,10 @@ func (s *ShardedLatencyRecorder) SnapshotWindowP99Ms() float64 { return float64(merged.ValueAtQuantile(99)) / 1e6 } -// Reset clears all shards (called between warmup and main benchmark). +// Reset clears all shards. Only safe while no workers are recording — +// the warmup→measure handoff swaps in a fresh recorder instead of +// resetting, because saturation-mode workers keep running across the +// boundary. func (s *ShardedLatencyRecorder) Reset() { for i := range s.shards { sh := &s.shards[i] diff --git a/mix.go b/mix.go index ff1ea40..e8fe846 100644 --- a/mix.go +++ b/mix.go @@ -146,6 +146,18 @@ type mixClient struct { h2Errors atomic.Int64 upgradeErrors atomic.Int64 + // Measured-window baselines captured by markMeasureStart at the + // warmup→measure handoff; stats() subtracts them so the reported + // per-protocol counts cover the measured window only. Plain fields: + // written once by the Run goroutine before the measured phase + // begins, read by stats() after the workers drain. + h1ReqBase int64 + h2ReqBase int64 + upgradeReqBase int64 + h1ErrBase int64 + h2ErrBase int64 + upgradeErrBase int64 + // Per-protocol conn counts (informational, surfaced in Result). h1Conns int h2Conns int @@ -303,6 +315,20 @@ func (c *mixClient) recordError(workerID int) { } } +// markMeasureStart records the current counter values as the measured-window +// baselines. Called once by Benchmarker.Run at the warmup→measure handoff. +// Baselines (rather than Store(0) resets) keep the handoff safe while +// saturation-mode workers are still adding to the counters: a concurrent +// Add can never be lost to a racing reset. +func (c *mixClient) markMeasureStart() { + c.h1ReqBase = c.h1Requests.Load() + c.h2ReqBase = c.h2Requests.Load() + c.upgradeReqBase = c.upgradeRequests.Load() + c.h1ErrBase = c.h1Errors.Load() + c.h2ErrBase = c.h2Errors.Load() + c.upgradeErrBase = c.upgradeErrors.Load() +} + // Close shuts down every sub-client. func (c *mixClient) Close() { if c.h1 != nil { @@ -316,17 +342,19 @@ func (c *mixClient) Close() { } } -// stats returns the per-protocol counters as a MixStats snapshot. +// stats returns the per-protocol counters as a MixStats snapshot, scoped +// to the measured window (counts accumulated before markMeasureStart are +// subtracted out). func (c *mixClient) stats() MixStats { return MixStats{ H1Conns: c.h1Conns, H2Conns: c.h2Conns, UpgradeConns: c.upgradeConns, - H1Requests: c.h1Requests.Load(), - H2Requests: c.h2Requests.Load(), - UpgradeRequests: c.upgradeRequests.Load(), - H1Errors: c.h1Errors.Load(), - H2Errors: c.h2Errors.Load(), - UpgradeErrors: c.upgradeErrors.Load(), + H1Requests: c.h1Requests.Load() - c.h1ReqBase, + H2Requests: c.h2Requests.Load() - c.h2ReqBase, + UpgradeRequests: c.upgradeRequests.Load() - c.upgradeReqBase, + H1Errors: c.h1Errors.Load() - c.h1ErrBase, + H2Errors: c.h2Errors.Load() - c.h2ErrBase, + UpgradeErrors: c.upgradeErrors.Load() - c.upgradeErrBase, } } diff --git a/mix_test.go b/mix_test.go index 032d60b..f1e994c 100644 --- a/mix_test.go +++ b/mix_test.go @@ -352,3 +352,33 @@ func TestMixProtoString(t *testing.T) { } } } + +// TestMixMarkMeasureStartBaselines verifies the per-protocol counters are +// scoped to the measured window via baselines rather than racy resets: in +// saturation mode the warmup→measure handoff happens while workers are +// still adding to the counters, so a Store(0) reset could lose increments. +func TestMixMarkMeasureStartBaselines(t *testing.T) { + mc := &mixClient{} + mc.h1Requests.Store(10) + mc.h2Requests.Store(20) + mc.upgradeRequests.Store(30) + mc.h1Errors.Store(1) + mc.h2Errors.Store(2) + mc.upgradeErrors.Store(3) + + mc.markMeasureStart() + + mc.h1Requests.Add(5) + mc.h2Requests.Add(6) + mc.upgradeRequests.Add(7) + mc.h1Errors.Add(1) + + got := mc.stats() + want := MixStats{ + H1Requests: 5, H2Requests: 6, UpgradeRequests: 7, + H1Errors: 1, H2Errors: 0, UpgradeErrors: 0, + } + if got != want { + t.Errorf("stats after markMeasureStart = %+v, want %+v", got, want) + } +} diff --git a/results.go b/results.go index f452cd3..a342445 100644 --- a/results.go +++ b/results.go @@ -10,6 +10,11 @@ type TimeseriesPoint struct { RequestsPerSec float64 `json:"rps"` P99Ms float64 `json:"p99_ms,omitempty"` Errors int64 `json:"errors,omitempty"` + + // ConnectErrors counts the dial/handshake failures that landed in + // this 1-second window. Lets a consumer tell "server answering with + // errors" from "server gone" per bucket. + ConnectErrors int64 `json:"connect_errors,omitempty"` } // Result holds the benchmark results. @@ -23,7 +28,7 @@ type Result struct { // LoadgenVersion records which loadgen build produced this run, so // downstream consumers (probatorium) can attribute a Result to a - // specific release. Stamped from the Version constant by buildResult. + // specific release. Stamped from Version by buildResult. LoadgenVersion string `json:"loadgen_version,omitempty"` ClientCPUPercent float64 `json:"client_cpu_percent,omitempty"` @@ -48,6 +53,22 @@ type Result struct { // retried — the peer may still want to investigate. DialRetries uint64 `json:"dial_retries,omitempty"` + // ConnectErrors counts dial/handshake failures (TCP connect, TLS, + // WS/SSE upgrade, H1 reconnect dials) observed during the measured + // run. A separate error class from Errors (which lumps every failed + // request together): Errors ≈ ConnectErrors means the server was + // unreachable, not misbehaving. Counted at the driver, so it can + // differ from the matching Errors increments by a few attempts cut + // off at phase boundaries. Additive — existing consumers keep parsing + // Errors unchanged. + ConnectErrors uint64 `json:"connect_errors,omitempty"` + + // Warmup, when non-nil, summarises the warmup phase, whose counters + // are otherwise reset away before the measured run begins. A warmup + // with zero requests and nonzero errors means the target was never + // healthy — previously invisible. Omitted when Warmup is 0. + Warmup *WarmupStats `json:"warmup,omitempty"` + // Histogram is the V2-compressed HdrHistogram payload covering the // full latency distribution recorded during the run. Range covers // 1µs through 30s with 3 significant digits (~0.1% precision). @@ -100,6 +121,16 @@ type FederationStats struct { MergeError string `json:"merge_error,omitempty"` } +// WarmupStats reports what happened during the warmup phase: successful +// requests, total errors, and the dial/handshake-failure subset of those +// errors. Surfaced on Result.Warmup so a 100%-failing warmup is observable +// instead of being silently reset before the measured run. +type WarmupStats struct { + Requests int64 `json:"requests"` + Errors int64 `json:"errors"` + ConnectErrors uint64 `json:"connect_errors,omitempty"` +} + // UpgradeStats summarises the outcome of h2c-upgrade handshakes across all // dialled connections. UpgradeAttempted is the number of conns loadgen tried // to dial; UpgradeSucceeded is the number that completed the 101 Switching diff --git a/sse.go b/sse.go index afb7a3a..0a9e5f9 100755 --- a/sse.go +++ b/sse.go @@ -9,6 +9,7 @@ import ( "net" "strings" "sync" + "time" ) const sseMode = "sse-fanout" @@ -19,14 +20,26 @@ const sseMode = "sse-fanout" // therefore becomes the recorded latency — one DoRequest == one event through // the existing worker/RecordSuccess/timeseries pipeline. type sseClient struct { - host string // host:port to dial - path string // request path, e.g. /events - tls bool - insecure bool - headers map[string]string - - mu sync.Mutex - conns map[int]*sseConn + host string // host:port to dial + path string // request path, e.g. /events + tls bool + insecure bool + headers map[string]string + dialTimeout time.Duration + + // closeCtx is cancelled by Close() so in-flight dials, handshakes and + // backoff sleeps abort instead of outliving the client. + closeCtx context.Context + closeStop context.CancelFunc + + // failFast aborts the run when no stream was EVER established and + // connect attempts have failed for a sustained window. A client that + // had live streams retries with backoff forever. + failFast *failFastTracker + + mu sync.Mutex + conns map[int]*sseConn + backoffs map[int]*connectBackoff // per-worker reconnect pacing } func newSSEClient(host, port, path string, cfg Config) (Client, error) { @@ -36,18 +49,24 @@ func newSSEClient(host, port, path string, cfg Config) (Client, error) { if path == "" { path = "/" } + closeCtx, closeStop := context.WithCancel(context.Background()) return &sseClient{ - host: net.JoinHostPort(host, port), - path: path, - tls: cfg.scheme == "https", - insecure: cfg.InsecureSkipVerify, - headers: cfg.Headers, - conns: make(map[int]*sseConn), + host: net.JoinHostPort(host, port), + path: path, + tls: cfg.scheme == "https", + insecure: cfg.InsecureSkipVerify, + headers: cfg.Headers, + dialTimeout: cfg.DialTimeout, + closeCtx: closeCtx, + closeStop: closeStop, + failFast: newFailFastTracker(failFastWindow), + conns: make(map[int]*sseConn), + backoffs: make(map[int]*connectBackoff), }, nil } func (c *sseClient) DoRequest(ctx context.Context, workerID int) (int, error) { - conn, err := c.conn(workerID) + conn, err := c.conn(ctx, workerID) if err != nil { return 0, err } @@ -63,6 +82,7 @@ func (c *sseClient) DoRequest(ctx context.Context, workerID int) (int, error) { } func (c *sseClient) Close() { + c.closeStop() // abort in-flight dials, handshakes, and backoff sleeps c.mu.Lock() conns := c.conns c.conns = make(map[int]*sseConn) @@ -72,20 +92,45 @@ func (c *sseClient) Close() { } } -func (c *sseClient) conn(workerID int) (*sseConn, error) { +func (c *sseClient) conn(ctx context.Context, workerID int) (*sseConn, error) { c.mu.Lock() if conn, ok := c.conns[workerID]; ok { c.mu.Unlock() return conn, nil } + bo := c.backoffs[workerID] + if bo == nil { + bo = &connectBackoff{} + c.backoffs[workerID] = bo + } c.mu.Unlock() - conn, err := c.dial() + conn, err := c.dial(ctx) if err != nil { + if ctx.Err() != nil || c.closeCtx.Err() != nil { + return nil, err // shutdown, not a server failure + } + recordConnectError() + if fatal := c.failFast.failure(time.Now(), err); fatal != nil { + return nil, fatal + } + // One attempt per DoRequest: sleep the (growing) backoff before + // surfacing the error so a dead server cannot induce a redial hot + // loop, then let the worker loop call back in. + bo.sleep(ctx, c.closeCtx.Done()) return nil, err } + c.failFast.success() + bo.reset() c.mu.Lock() + if c.closeCtx.Err() != nil { + // Close() won the race while we were handshaking; don't leak the + // conn into a map nobody will drain. + c.mu.Unlock() + conn.close() + return nil, c.closeCtx.Err() + } c.conns[workerID] = conn c.mu.Unlock() return conn, nil @@ -103,30 +148,58 @@ func (c *sseClient) drop(workerID int) { } } -func (c *sseClient) dial() (*sseConn, error) { +// dial opens one SSE stream. The whole sequence — TCP/TLS connect plus the +// GET handshake — is bounded by DialTimeout and aborts on ctx cancellation +// or Close(). +func (c *sseClient) dial(ctx context.Context) (*sseConn, error) { + var cancel context.CancelFunc + if c.dialTimeout > 0 { + ctx, cancel = context.WithTimeout(ctx, c.dialTimeout) + } else { + ctx, cancel = context.WithCancel(ctx) + } + defer cancel() + // Propagate Close() into the dial context (and from there into the + // in-flight connect and handshake I/O below). + stopClose := context.AfterFunc(c.closeCtx, cancel) + defer stopClose() + var ( raw net.Conn err error ) if c.tls { - raw, err = tls.Dial("tcp", c.host, &tls.Config{InsecureSkipVerify: c.insecure}) //nolint:gosec // InsecureSkipVerify is opt-in for self-signed test targets + d := &tls.Dialer{Config: &tls.Config{InsecureSkipVerify: c.insecure}} //nolint:gosec // InsecureSkipVerify is opt-in for self-signed test targets + raw, err = d.DialContext(ctx, "tcp", c.host) } else { - raw, err = net.Dial("tcp", c.host) + var d net.Dialer + raw, err = d.DialContext(ctx, "tcp", c.host) } if err != nil { return nil, err } + // The GET handshake is plain conn I/O with no deadline of its own; + // closing the socket when ctx fires keeps it bounded too. + stopIO := context.AfterFunc(ctx, func() { _ = raw.Close() }) + if _, err := raw.Write([]byte(sseGetRequest(c.host, c.path, c.headers))); err != nil { + stopIO() _ = raw.Close() return nil, err } br := bufio.NewReader(raw) if err := sseReadHandshake(br); err != nil { + stopIO() _ = raw.Close() return nil, err } + if !stopIO() { + // ctx fired while the handshake was completing; raw is closed (or + // about to be) — treat as a failed dial. + return nil, ctx.Err() + } return &sseConn{raw: raw, br: br}, nil } diff --git a/sse_test.go b/sse_test.go index de31ab5..ffe857a 100644 --- a/sse_test.go +++ b/sse_test.go @@ -2,7 +2,9 @@ package loadgen import ( "context" + "errors" "fmt" + "net" "net/http" "net/http/httptest" "net/url" @@ -90,6 +92,164 @@ func TestSSERejectsNonStream(t *testing.T) { } } +// TestSSEFailFastNeverConnected drives the driver directly against a +// refusing port with a shrunken window: attempts must be backoff-paced and +// end in an ErrNeverConnected-wrapped fatal error. +func TestSSEFailFastNeverConnected(t *testing.T) { + host, port := refusedAddr(t) + + cfg := Config{URL: "http://" + net.JoinHostPort(host, port) + "/events", Mode: sseMode, scheme: "http"} + c, err := newSSEClient(host, port, "/events", cfg) + if err != nil { + t.Fatal(err) + } + defer c.Close() + window := 300 * time.Millisecond + c.(*sseClient).failFast = newFailFastTracker(window) + + start := time.Now() + attempts := 0 + for { + _, err := c.DoRequest(context.Background(), 0) + if err == nil { + t.Fatal("expected every DoRequest to fail against a refusing port") + } + attempts++ + if errors.Is(err, ErrNeverConnected) { + break + } + if attempts > 200 { + t.Fatalf("no fail-fast after %d attempts — dials are hot-looping", attempts) + } + } + if elapsed := time.Since(start); elapsed < window { + t.Errorf("fail-fast tripped after %v, before the %v window elapsed", elapsed, window) + } + t.Logf("fail-fast after %d attempts in %v", attempts, time.Since(start)) +} + +// TestSSECloseAbortsDial parks a dial inside the GET handshake (the server +// accepts but never responds) and verifies Close() aborts it promptly. +func TestSSECloseAbortsDial(t *testing.T) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer func() { _ = ln.Close() }() + var held []net.Conn + defer func() { + for _, conn := range held { + _ = conn.Close() + } + }() + acceptDone := make(chan struct{}) + go func() { + defer close(acceptDone) + for { + conn, err := ln.Accept() + if err != nil { + return + } + held = append(held, conn) // hold open, never answer the GET + } + }() + + host, port, err := net.SplitHostPort(ln.Addr().String()) + if err != nil { + t.Fatal(err) + } + cfg := Config{ + URL: "http://" + ln.Addr().String() + "/events", + Mode: sseMode, + scheme: "http", + DialTimeout: 30 * time.Second, // Close, not the timeout, must abort + } + c, err := newSSEClient(host, port, "/events", cfg) + if err != nil { + t.Fatal(err) + } + + done := make(chan error, 1) + go func() { + _, derr := c.DoRequest(context.Background(), 0) + done <- derr + }() + + time.Sleep(100 * time.Millisecond) // let the dial park in the handshake read + start := time.Now() + c.Close() + + select { + case derr := <-done: + if derr == nil { + t.Error("expected an error from the aborted dial") + } + if elapsed := time.Since(start); elapsed > 2*time.Second { + t.Errorf("Close took %v to abort the in-flight dial", elapsed) + } + case <-time.After(5 * time.Second): + t.Fatal("Close() did not abort the in-flight dial") + } + _ = ln.Close() + <-acceptDone +} + +// TestSSEDialHonorsDialTimeout parks the handshake against a silent server +// and verifies DialTimeout bounds it (the v1.4.7 driver ignored it). +func TestSSEDialHonorsDialTimeout(t *testing.T) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer func() { _ = ln.Close() }() + var held []net.Conn + defer func() { + for _, conn := range held { + _ = conn.Close() + } + }() + acceptDone := make(chan struct{}) + go func() { + defer close(acceptDone) + for { + conn, err := ln.Accept() + if err != nil { + return + } + held = append(held, conn) + } + }() + + host, port, err := net.SplitHostPort(ln.Addr().String()) + if err != nil { + t.Fatal(err) + } + cfg := Config{ + URL: "http://" + ln.Addr().String() + "/events", + Mode: sseMode, + scheme: "http", + DialTimeout: 150 * time.Millisecond, + } + c, err := newSSEClient(host, port, "/events", cfg) + if err != nil { + t.Fatal(err) + } + defer c.Close() + + start := time.Now() + _, derr := c.DoRequest(context.Background(), 0) + elapsed := time.Since(start) + if derr == nil { + t.Fatal("expected the dial to time out against a silent server") + } + // Budget: 150ms timeout + one backoff sleep (≤10ms) + slack. + if elapsed > 2*time.Second { + t.Errorf("DoRequest took %v, want ~150ms (DialTimeout-bounded)", elapsed) + } + _ = ln.Close() + <-acceptDone +} + func TestSSERunIntegration(t *testing.T) { srv := sseTestServer(t, time.Millisecond) defer srv.Close() diff --git a/version.go b/version.go index 26060c4..fe49dc9 100644 --- a/version.go +++ b/version.go @@ -1,6 +1,58 @@ package loadgen +import ( + "runtime/debug" + "strings" +) + +// modulePath must match go.mod; used to locate loadgen in a consumer's +// dependency list when resolving Version from build info. +const modulePath = "github.com/goceleris/loadgen" + +// fallbackVersion is reported when build info carries no usable module +// version for loadgen ((devel) builds, tests, directory replaces). Keep in +// sync with the release tag. +const fallbackVersion = "1.4.7" + // Version is the loadgen release version, stamped into Result.LoadgenVersion // so downstream consumers (probatorium, perfmatrix) can record which loadgen -// build produced a given run. -const Version = "1.4.5" +// build produced a given run. Resolved from runtime/debug build info at +// startup — correct both when loadgen is the main module of a tagged build +// and when it is a dependency — with fallbackVersion as the last resort. +var Version = versionFromBuildInfo(debug.ReadBuildInfo()) + +func versionFromBuildInfo(bi *debug.BuildInfo, ok bool) string { + if !ok || bi == nil { + return fallbackVersion + } + if bi.Main.Path == modulePath { + if v := cleanModuleVersion(bi.Main.Version); v != "" { + return v + } + return fallbackVersion + } + for _, dep := range bi.Deps { + if dep.Path != modulePath { + continue + } + ver := dep.Version + if dep.Replace != nil { + ver = dep.Replace.Version + } + if v := cleanModuleVersion(ver); v != "" { + return v + } + break + } + return fallbackVersion +} + +// cleanModuleVersion normalises a module version ("v1.4.7") to the bare +// form loadgen has always reported ("1.4.7"). Returns "" when the version +// carries no release information. +func cleanModuleVersion(v string) string { + if v == "" || v == "(devel)" { + return "" + } + return strings.TrimPrefix(v, "v") +} diff --git a/version_test.go b/version_test.go new file mode 100644 index 0000000..9673460 --- /dev/null +++ b/version_test.go @@ -0,0 +1,85 @@ +package loadgen + +import ( + "runtime/debug" + "testing" +) + +func TestVersionFromBuildInfo(t *testing.T) { + tests := []struct { + name string + bi *debug.BuildInfo + ok bool + want string + }{ + {"no build info", nil, false, fallbackVersion}, + { + "main module tagged", + &debug.BuildInfo{Main: debug.Module{Path: modulePath, Version: "v1.9.3"}}, + true, + "1.9.3", + }, + { + "main module devel falls back", + &debug.BuildInfo{Main: debug.Module{Path: modulePath, Version: "(devel)"}}, + true, + fallbackVersion, + }, + { + "consumed as dependency", + &debug.BuildInfo{ + Main: debug.Module{Path: "github.com/goceleris/probatorium", Version: "(devel)"}, + Deps: []*debug.Module{ + {Path: "github.com/HdrHistogram/hdrhistogram-go", Version: "v1.2.0"}, + {Path: modulePath, Version: "v1.4.8"}, + }, + }, + true, + "1.4.8", + }, + { + "dependency with versioned replace", + &debug.BuildInfo{ + Main: debug.Module{Path: "github.com/goceleris/probatorium"}, + Deps: []*debug.Module{ + {Path: modulePath, Version: "v1.4.7", Replace: &debug.Module{Path: modulePath, Version: "v1.4.7-hotfix"}}, + }, + }, + true, + "1.4.7-hotfix", + }, + { + "dependency with directory replace falls back", + &debug.BuildInfo{ + Main: debug.Module{Path: "github.com/goceleris/probatorium"}, + Deps: []*debug.Module{ + {Path: modulePath, Version: "v1.4.7", Replace: &debug.Module{Path: "../loadgen"}}, + }, + }, + true, + fallbackVersion, + }, + { + "unrelated build falls back", + &debug.BuildInfo{Main: debug.Module{Path: "example.com/other", Version: "v9.9.9"}}, + true, + fallbackVersion, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(st *testing.T) { + if got := versionFromBuildInfo(tt.bi, tt.ok); got != tt.want { + st.Errorf("versionFromBuildInfo() = %q, want %q", got, tt.want) + } + }) + } +} + +func TestVersionNonEmpty(t *testing.T) { + if Version == "" { + t.Fatal("Version must never be empty") + } + if Version == "(devel)" { + t.Fatalf("Version = %q — (devel) must fall back to the release constant", Version) + } +} diff --git a/ws.go b/ws.go index 8cbe0d7..0303ad5 100644 --- a/ws.go +++ b/ws.go @@ -2,6 +2,7 @@ package loadgen import ( "bufio" + "context" "crypto/rand" "crypto/tls" "encoding/base64" @@ -11,8 +12,7 @@ import ( "net" "strings" "sync" - - "context" + "time" ) const ( @@ -39,16 +39,28 @@ var errWSClosed = errors.New("loadgen: websocket connection closed by peer") // blocking broadcast read — one DoRequest == one unit through the existing // worker/RecordSuccess/timeseries pipeline. type wsClient struct { - host string // host:port to dial - path string // request path, e.g. /ws - tls bool - srvMode string // ?mode= value the server understands - mode string // public Config.Mode - payload []byte // outbound frame payload (nil for hub) - insecure bool - - mu sync.Mutex - conns map[int]*wsConn + host string // host:port to dial + path string // request path, e.g. /ws + tls bool + srvMode string // ?mode= value the server understands + mode string // public Config.Mode + payload []byte // outbound frame payload (nil for hub) + insecure bool + dialTimeout time.Duration + + // closeCtx is cancelled by Close() so in-flight dials, handshakes and + // backoff sleeps abort instead of outliving the client. + closeCtx context.Context + closeStop context.CancelFunc + + // failFast aborts the run when no stream was EVER established and + // connect attempts have failed for a sustained window. A client that + // had live streams retries with backoff forever. + failFast *failFastTracker + + mu sync.Mutex + conns map[int]*wsConn + backoffs map[int]*connectBackoff // per-worker reconnect pacing } func newWSClient(host, port, path string, cfg Config) (Client, error) { @@ -75,20 +87,26 @@ func newWSClient(host, port, path string, cfg Config) (Client, error) { } } + closeCtx, closeStop := context.WithCancel(context.Background()) return &wsClient{ - host: net.JoinHostPort(host, port), - path: path, - tls: secure, - srvMode: srvMode, - mode: cfg.Mode, - payload: payload, - insecure: cfg.InsecureSkipVerify, - conns: make(map[int]*wsConn), + host: net.JoinHostPort(host, port), + path: path, + tls: secure, + srvMode: srvMode, + mode: cfg.Mode, + payload: payload, + insecure: cfg.InsecureSkipVerify, + dialTimeout: cfg.DialTimeout, + closeCtx: closeCtx, + closeStop: closeStop, + failFast: newFailFastTracker(failFastWindow), + conns: make(map[int]*wsConn), + backoffs: make(map[int]*connectBackoff), }, nil } func (c *wsClient) DoRequest(ctx context.Context, workerID int) (int, error) { - conn, err := c.conn(workerID) + conn, err := c.conn(ctx, workerID) if err != nil { return 0, err } @@ -106,6 +124,7 @@ func (c *wsClient) DoRequest(ctx context.Context, workerID int) (int, error) { } func (c *wsClient) Close() { + c.closeStop() // abort in-flight dials, handshakes, and backoff sleeps c.mu.Lock() conns := c.conns c.conns = make(map[int]*wsConn) @@ -144,20 +163,45 @@ func (c *wsClient) exchange(conn *wsConn) (int, error) { } } -func (c *wsClient) conn(workerID int) (*wsConn, error) { +func (c *wsClient) conn(ctx context.Context, workerID int) (*wsConn, error) { c.mu.Lock() if conn, ok := c.conns[workerID]; ok { c.mu.Unlock() return conn, nil } + bo := c.backoffs[workerID] + if bo == nil { + bo = &connectBackoff{} + c.backoffs[workerID] = bo + } c.mu.Unlock() - conn, err := c.dial() + conn, err := c.dial(ctx) if err != nil { + if ctx.Err() != nil || c.closeCtx.Err() != nil { + return nil, err // shutdown, not a server failure + } + recordConnectError() + if fatal := c.failFast.failure(time.Now(), err); fatal != nil { + return nil, fatal + } + // One attempt per DoRequest: sleep the (growing) backoff before + // surfacing the error so a dead server cannot induce a redial hot + // loop, then let the worker loop call back in. + bo.sleep(ctx, c.closeCtx.Done()) return nil, err } + c.failFast.success() + bo.reset() c.mu.Lock() + if c.closeCtx.Err() != nil { + // Close() won the race while we were handshaking; don't leak the + // conn into a map nobody will drain. + c.mu.Unlock() + conn.close() + return nil, c.closeCtx.Err() + } c.conns[workerID] = conn c.mu.Unlock() return conn, nil @@ -175,30 +219,58 @@ func (c *wsClient) drop(workerID int) { } } -func (c *wsClient) dial() (*wsConn, error) { +// dial opens and upgrades one WebSocket connection. The whole sequence — +// TCP/TLS connect plus the 101 handshake — is bounded by DialTimeout and +// aborts on ctx cancellation or Close(). +func (c *wsClient) dial(ctx context.Context) (*wsConn, error) { + var cancel context.CancelFunc + if c.dialTimeout > 0 { + ctx, cancel = context.WithTimeout(ctx, c.dialTimeout) + } else { + ctx, cancel = context.WithCancel(ctx) + } + defer cancel() + // Propagate Close() into the dial context (and from there into the + // in-flight connect and handshake I/O below). + stopClose := context.AfterFunc(c.closeCtx, cancel) + defer stopClose() + var ( raw net.Conn err error ) if c.tls { - raw, err = tls.Dial("tcp", c.host, &tls.Config{InsecureSkipVerify: c.insecure}) //nolint:gosec // InsecureSkipVerify is opt-in for self-signed test targets + d := &tls.Dialer{Config: &tls.Config{InsecureSkipVerify: c.insecure}} //nolint:gosec // InsecureSkipVerify is opt-in for self-signed test targets + raw, err = d.DialContext(ctx, "tcp", c.host) } else { - raw, err = net.Dial("tcp", c.host) + var d net.Dialer + raw, err = d.DialContext(ctx, "tcp", c.host) } if err != nil { return nil, err } + // The upgrade handshake is plain conn I/O with no deadline of its own; + // closing the socket when ctx fires keeps it bounded too. + stopIO := context.AfterFunc(ctx, func() { _ = raw.Close() }) + if _, err := raw.Write([]byte(wsUpgradeRequest(c.host, c.path, c.srvMode))); err != nil { + stopIO() _ = raw.Close() return nil, err } br := bufio.NewReader(raw) if err := wsReadHandshake(br); err != nil { + stopIO() _ = raw.Close() return nil, err } + if !stopIO() { + // ctx fired while the handshake was completing; raw is closed (or + // about to be) — treat as a failed dial. + return nil, ctx.Err() + } return &wsConn{raw: raw, br: br}, nil } diff --git a/ws_test.go b/ws_test.go index 100e8fa..e9dc7aa 100644 --- a/ws_test.go +++ b/ws_test.go @@ -5,6 +5,7 @@ import ( "context" "crypto/sha1" "encoding/base64" + "errors" "net" "net/http" "net/http/httptest" @@ -240,3 +241,243 @@ func withMod(c Config, f func(*Config)) Config { f(&c) return c } + +// refusedAddr returns host, port for an address that actively refuses +// connections (bound then immediately released). +func refusedAddr(t *testing.T) (string, string) { + t.Helper() + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + addr := ln.Addr().String() + _ = ln.Close() + host, port, err := net.SplitHostPort(addr) + if err != nil { + t.Fatal(err) + } + return host, port +} + +// TestWSFailFastAbortsRun drives a full Benchmarker run against a refusing +// port: with no stream ever established, the run must abort with an +// ErrNeverConnected-wrapped dial error well before Duration elapses, and +// dial attempts must be backoff-paced rather than hot-looped. +func TestWSFailFastAbortsRun(t *testing.T) { + host, port := refusedAddr(t) + + cfg := Config{ + URL: "http://" + net.JoinHostPort(host, port) + "/ws", + Mode: wsModeEcho, + scheme: "http", + } + raw, err := newWSClient(host, port, "/ws", cfg) + if err != nil { + t.Fatal(err) + } + // Shrink the fail-fast window so the test doesn't sit through the + // production 5s default. + raw.(*wsClient).failFast = newFailFastTracker(300 * time.Millisecond) + + before := connectErrorsCounter.Swap(0) + defer connectErrorsCounter.Add(before) + + b, err := New(Config{ + URL: cfg.URL, + Duration: 30 * time.Second, // must NOT be waited out + Connections: 2, + Workers: 2, + Warmup: 0, + Client: raw, + CPUMonitor: false, + RecvQProbe: false, + }) + if err != nil { + t.Fatal(err) + } + + start := time.Now() + result, err := b.Run(context.Background()) + elapsed := time.Since(start) + + if err == nil { + t.Fatal("expected fatal error from never-connected run") + } + if !errors.Is(err, ErrNeverConnected) { + t.Errorf("error must wrap ErrNeverConnected: %v", err) + } + if !strings.Contains(err.Error(), "loadgen: dial: ") || + !strings.Contains(err.Error(), "connection refused") { + t.Errorf("fatal error shape mismatch (want h1client pre-dial shape): %q", err.Error()) + } + if result != nil { + t.Errorf("expected nil result on fatal abort, got %+v", result) + } + if elapsed > 10*time.Second { + t.Errorf("abort took %v — run waited out the duration", elapsed) + } + // Hot-loop regression guard: v3.8 burned ~386k dials/sec. With 2 + // workers, a ~300ms window, and 10ms-doubling backoff, the attempt + // count must stay in the tens. + if n := b.errors.Load(); n == 0 || n > 200 { + t.Errorf("dial attempts = %d, want a small nonzero count (backoff-paced)", n) + } +} + +// TestWSReconnectBackoffAfterServerDeath: a client that HAD a live stream +// must keep retrying with backoff after the server dies — never fail fast, +// never hot-loop. +func TestWSReconnectBackoffAfterServerDeath(t *testing.T) { + srv := wsTestServer(t) + + c := wsDialClient(t, srv, wsModeEcho) + defer c.Close() + + if _, err := c.DoRequest(context.Background(), 0); err != nil { + t.Fatalf("priming request: %v", err) + } + + // Server dies mid-cell. srv.Close() releases the port but not the + // hijacked WS conn, so drop the client side too to force redials. + srv.Close() + c.(*wsClient).drop(0) + + deadline := time.Now().Add(700 * time.Millisecond) + errCount := 0 + for time.Now().Before(deadline) { + _, err := c.DoRequest(context.Background(), 0) + if err == nil { + continue + } + if errors.Is(err, ErrNeverConnected) { + t.Fatalf("had a live stream — must never fail fast: %v", err) + } + errCount++ + } + if errCount == 0 { + t.Fatal("expected reconnect errors after server death") + } + // 10ms-doubling backoff allows roughly 7 attempts in 700ms (plus the + // initial dead-conn read error); a hot loop would produce thousands. + if errCount > 100 { + t.Errorf("errCount = %d in 700ms — reconnects are not backoff-paced", errCount) + } + t.Logf("reconnect errors in 700ms: %d", errCount) +} + +// TestWSCloseAbortsDial parks a dial inside the upgrade handshake (the +// server accepts but never responds) and verifies Close() aborts it +// promptly even though neither ctx deadline nor DialTimeout is near. +func TestWSCloseAbortsDial(t *testing.T) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer func() { _ = ln.Close() }() + var held []net.Conn + defer func() { + for _, conn := range held { + _ = conn.Close() + } + }() + acceptDone := make(chan struct{}) + go func() { + defer close(acceptDone) + for { + conn, err := ln.Accept() + if err != nil { + return + } + held = append(held, conn) // hold open, never answer the upgrade + } + }() + + host, port, err := net.SplitHostPort(ln.Addr().String()) + if err != nil { + t.Fatal(err) + } + cfg := Config{ + URL: "http://" + ln.Addr().String() + "/ws", + Mode: wsModeEcho, + scheme: "http", + DialTimeout: 30 * time.Second, // Close, not the timeout, must abort + } + c, err := newWSClient(host, port, "/ws", cfg) + if err != nil { + t.Fatal(err) + } + + done := make(chan error, 1) + go func() { + _, derr := c.DoRequest(context.Background(), 0) + done <- derr + }() + + time.Sleep(100 * time.Millisecond) // let the dial park in the handshake read + start := time.Now() + c.Close() + + select { + case derr := <-done: + if derr == nil { + t.Error("expected an error from the aborted dial") + } + if elapsed := time.Since(start); elapsed > 2*time.Second { + t.Errorf("Close took %v to abort the in-flight dial", elapsed) + } + case <-time.After(5 * time.Second): + t.Fatal("Close() did not abort the in-flight dial") + } + _ = ln.Close() + <-acceptDone +} + +// TestWSConnectErrorsAccounting runs a short, non-fatal window against a +// refusing port and checks the new error-class plumbing end to end: +// Result.ConnectErrors, Result.Warmup, and bounded (non-hot-loop) totals. +func TestWSConnectErrorsAccounting(t *testing.T) { + host, port := refusedAddr(t) + + before := connectErrorsCounter.Swap(0) + defer connectErrorsCounter.Add(before) + + cfg := Config{ + URL: "http://" + net.JoinHostPort(host, port) + "/ws", + Mode: wsModeEcho, + Duration: 900 * time.Millisecond, // well under the 5s fail-fast window + Connections: 2, + Workers: 2, + Warmup: 400 * time.Millisecond, + CPUMonitor: false, + RecvQProbe: false, + } + b, err := New(cfg) + if err != nil { + t.Fatal(err) + } + result, err := b.Run(context.Background()) + if err != nil { + t.Fatalf("run should not trip fail-fast inside the 5s window: %v", err) + } + + if result.Errors == 0 { + t.Error("expected errors against a refusing port") + } + if result.ConnectErrors == 0 { + t.Error("expected ConnectErrors > 0 (dial failures must land in the connect class)") + } + if result.Warmup == nil { + t.Fatal("expected warmup stats") + } + if result.Warmup.Requests != 0 { + t.Errorf("warmup requests = %d, want 0 against a dead port", result.Warmup.Requests) + } + if result.Warmup.Errors == 0 || result.Warmup.ConnectErrors == 0 { + t.Errorf("100%%-failing warmup must be visible: %+v", result.Warmup) + } + // Hot-loop regression guard (v3.8: ~386k dials/sec). + if result.Errors > 500 { + t.Errorf("errors = %d in ~1.3s — dials are not backoff-paced", result.Errors) + } + t.Logf("errors=%d connect=%d warmup=%+v", result.Errors, result.ConnectErrors, result.Warmup) +}