Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package loadgen

import (
"context"
"errors"
"fmt"
"math/rand/v2"
"sync"
"time"
)

// Reconnect backoff bounds: the first retry waits ~reconnectBackoffMin,
// doubling per consecutive failure up to reconnectBackoffMax. Sleeps are
// jittered uniformly over [d/2, d] so a fleet of workers does not redial in
// lockstep against a recovering server. v3.8 post-mortem: without pacing,
// the ws/sse drivers redialled a dead port at ~386k dials/sec (34.7M errors
// per 90s cell).
const (
reconnectBackoffMin = 10 * time.Millisecond
reconnectBackoffMax = time.Second
)

// failFastWindow is how long a streaming client (ws/sse) tolerates
// nothing-but-connect-failures before declaring the target dead. It only
// applies while NO stream was ever established — a client that had live
// streams keeps retrying with backoff indefinitely (the server may come
// back mid-cell).
const failFastWindow = 5 * time.Second

// ErrNeverConnected marks the fatal fail-fast condition: every connect
// attempt failed and not a single stream was ever established within
// failFastWindow. Benchmarker.Run aborts the whole run when a driver
// surfaces an error wrapping this, so a harness can classify the cell as
// did-not-finish instead of burning the full duration against a dead port.
var ErrNeverConnected = errors.New("no stream ever established")

// connectBackoff is per-connection (or per-worker) retry pacing state.
// Not safe for concurrent use — each instance is owned by one goroutine.
type connectBackoff struct {
next time.Duration
}

// sleep blocks for the current backoff interval (jittered down to half) and
// doubles the interval for the following call, capped at
// reconnectBackoffMax. It returns early — reporting false — when ctx is
// cancelled or abort is closed, so Close() and run shutdown are never stuck
// behind a sleep. A nil abort channel is valid and never fires.
func (b *connectBackoff) sleep(ctx context.Context, abort <-chan struct{}) bool {
d := b.next
if d <= 0 {
d = reconnectBackoffMin
}
b.next = min(d*2, reconnectBackoffMax)

d = d/2 + rand.N(d/2+1) // uniform in [d/2, d]
t := time.NewTimer(d)
defer t.Stop()
select {
case <-ctx.Done():
return false
case <-abort:
return false
case <-t.C:
return true
}
}

// reset returns the backoff to its initial interval after a successful
// connect.
func (b *connectBackoff) reset() { b.next = 0 }

// failFastTracker decides when a streaming client that has NEVER had a live
// stream should give up on the whole run. One instance is shared by all
// workers of a client; any single established stream disarms it permanently.
type failFastTracker struct {
window time.Duration

mu sync.Mutex
everConnected bool
failingSince time.Time // zero when there is no active failure streak
}

func newFailFastTracker(window time.Duration) *failFastTracker {
return &failFastTracker{window: window}
}

// success records an established stream; fail-fast is permanently disarmed.
func (f *failFastTracker) success() {
f.mu.Lock()
f.everConnected = true
f.failingSince = time.Time{}
f.mu.Unlock()
}

// failure records a connect failure observed at now. When the client never
// had a single live stream and the failure streak spans the configured
// window, it returns a fatal error wrapping both err and ErrNeverConnected;
// otherwise nil (caller backs off and retries).
func (f *failFastTracker) failure(now time.Time, err error) error {
f.mu.Lock()
defer f.mu.Unlock()
if f.everConnected {
return nil
}
if f.failingSince.IsZero() {
f.failingSince = now
return nil
}
if now.Sub(f.failingSince) < f.window {
return nil
}
// Message shape mirrors the h1client pre-dial failure
// ("loadgen: dial: ...: connection refused") so harness-side
// classification treats the two identically.
return fmt.Errorf("loadgen: dial: %w (fail-fast: %w within %v)", err, ErrNeverConnected, f.window)
}
132 changes: 132 additions & 0 deletions backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package loadgen

import (
"context"
"errors"
"strings"
"testing"
"time"
)

// TestConnectBackoffDoublesAndCaps walks the ladder with a pre-cancelled
// context so no real sleeping happens — only the interval progression is
// under test: 10ms doubling per call, capped at 1s.
func TestConnectBackoffDoublesAndCaps(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()

var bo connectBackoff
want := []time.Duration{
20 * time.Millisecond, 40 * time.Millisecond, 80 * time.Millisecond,
160 * time.Millisecond, 320 * time.Millisecond, 640 * time.Millisecond,
time.Second, time.Second, time.Second,
}
for i, w := range want {
if bo.sleep(ctx, nil) {
t.Fatalf("call %d: sleep returned true with cancelled ctx", i)
}
if bo.next != w {
t.Fatalf("call %d: next = %v, want %v", i, bo.next, w)
}
}

bo.reset()
if bo.next != 0 {
t.Fatalf("after reset: next = %v, want 0", bo.next)
}
}

// TestConnectBackoffSleepsJittered pins the interval and checks the actual
// sleep lands in the jitter band [d/2, d] (with CI-noise headroom above).
func TestConnectBackoffSleepsJittered(t *testing.T) {
bo := connectBackoff{next: 80 * time.Millisecond}
start := time.Now()
if !bo.sleep(context.Background(), nil) {
t.Fatal("sleep aborted without cancellation")
}
elapsed := time.Since(start)
if elapsed < 40*time.Millisecond {
t.Fatalf("slept %v, want >= 40ms (lower jitter bound)", elapsed)
}
if elapsed > 300*time.Millisecond {
t.Fatalf("slept %v, want ~<= 80ms (upper jitter bound + CI noise)", elapsed)
}
}

// TestConnectBackoffAbort verifies both abort paths — context cancellation
// and the abort channel (Close()) — cut a long sleep short.
func TestConnectBackoffAbort(t *testing.T) {
t.Run("context", func(st *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond)
defer cancel()
bo := connectBackoff{next: time.Second}
start := time.Now()
if bo.sleep(ctx, nil) {
st.Fatal("sleep completed despite ctx cancellation")
}
if elapsed := time.Since(start); elapsed > 400*time.Millisecond {
st.Fatalf("aborted sleep took %v, want well under the 0.5-1s interval", elapsed)
}
})

t.Run("abort channel", func(st *testing.T) {
abort := make(chan struct{})
go func() {
time.Sleep(20 * time.Millisecond)
close(abort)
}()
bo := connectBackoff{next: time.Second}
start := time.Now()
if bo.sleep(context.Background(), abort) {
st.Fatal("sleep completed despite abort channel close")
}
if elapsed := time.Since(start); elapsed > 400*time.Millisecond {
st.Fatalf("aborted sleep took %v, want well under the 0.5-1s interval", elapsed)
}
})
}

func TestFailFastTracker(t *testing.T) {
t0 := time.Now()
dialErr := errors.New("dial tcp 127.0.0.1:1: connect: connection refused")

t.Run("trips after window with no success", func(st *testing.T) {
f := newFailFastTracker(5 * time.Second)
if err := f.failure(t0, dialErr); err != nil {
st.Fatalf("first failure must arm, not trip: %v", err)
}
if err := f.failure(t0.Add(4*time.Second), dialErr); err != nil {
st.Fatalf("failure inside window must not trip: %v", err)
}
err := f.failure(t0.Add(5*time.Second), dialErr)
if err == nil {
st.Fatal("expected fatal error once the streak spans the window")
}
if !errors.Is(err, ErrNeverConnected) {
st.Errorf("fatal error must wrap ErrNeverConnected: %v", err)
}
if !errors.Is(err, dialErr) {
st.Errorf("fatal error must wrap the last dial error: %v", err)
}
// Shape must match the h1client pre-dial failure so the harness
// classifies both as dnf.
if !strings.Contains(err.Error(), "loadgen: dial: ") ||
!strings.Contains(err.Error(), "connection refused") {
st.Errorf("fatal error shape mismatch: %q", err.Error())
}
})

t.Run("any success disarms permanently", func(st *testing.T) {
f := newFailFastTracker(5 * time.Second)
if err := f.failure(t0, dialErr); err != nil {
st.Fatal(err)
}
f.success()
if err := f.failure(t0.Add(time.Hour), dialErr); err != nil {
st.Fatalf("post-success failure streak must never trip: %v", err)
}
if err := f.failure(t0.Add(2*time.Hour), dialErr); err != nil {
st.Fatalf("post-success failure streak must never trip: %v", err)
}
})
}
Loading