diff --git a/h1client.go b/h1client.go index 30971d1..7d3ad73 100644 --- a/h1client.go +++ b/h1client.go @@ -248,6 +248,18 @@ func (c *h1Client) DoRequest(ctx context.Context, workerID int) (int, error) { if ctx.Err() != nil { return 0, ctx.Err() } + // Close-after-response / churn guard (mirrors the write-error path + // above): a Connection:close server — or one closing mid-response under + // churn-close — yields a read EOF here. Reconnect so the NEXT request + // gets a fresh conn, and back off ONLY if the server is genuinely down. + // Without this, a close-after-one server (drogon collapsed to 0 req from + // exactly this) spins read-EOFs with no pacing and never recovers. + if reconnErr := hc.reconnect(); reconnErr != nil { + recordConnectError() + hc.backoff.sleep(ctx, nil) + return 0, fmt.Errorf("h1client: conn[%d] read status (reconnect failed): %w", connIdx, reconnErr) + } + hc.backoff.reset() return 0, fmt.Errorf("h1client: conn[%d] read status: %w", connIdx, err) } @@ -267,6 +279,18 @@ func (c *h1Client) DoRequest(ctx context.Context, workerID int) (int, error) { for { line, err := hc.reader.ReadSlice('\n') if err != nil { + if ctx.Err() != nil { + return 0, ctx.Err() + } + // Same close-after-response / churn guard as the status read: + // reconnect for the next request, back off only if the server is + // genuinely down. + if reconnErr := hc.reconnect(); reconnErr != nil { + recordConnectError() + hc.backoff.sleep(ctx, nil) + return 0, fmt.Errorf("h1client: conn[%d] read header (reconnect failed): %w", connIdx, reconnErr) + } + hc.backoff.reset() return 0, fmt.Errorf("h1client: conn[%d] read header: %w", connIdx, err) } if len(line) <= 2 { diff --git a/h2client.go b/h2client.go index aefdc82..3da09d8 100644 --- a/h2client.go +++ b/h2client.go @@ -88,11 +88,20 @@ func resetError(code uint32) error { // Uses pre-encoded HPACK headers, dedicated writer goroutine per connection, // lock-free stream slot dispatch, and batched WINDOW_UPDATE writes. type h2Client struct { - conns []*h2Conn + conns []*h2ConnSlot headerBlock []byte // pre-encoded HPACK header block (immutable) dataPayload []byte // body bytes for POST (nil for GET) hasBody bool + // redial re-establishes a single connection (prior-knowledge or h2c + // upgrade, whichever this client was built with). A server that closes + // or GOAWAYs a connection mid-cell — hypercorn does this periodically — + // would otherwise strand the slot dead and the worker would spin + // closed-conn errors with no recovery (fastapi-h2 logged ~1.1B errors / + // 0 requests from exactly this). reconnectSlot calls it under the slot + // lock, paced by the slot's backoff. + redial func() (*h2Conn, error) + // dialedViaUpgrade reports whether this client's connections were // established via the h2c upgrade handshake vs prior-knowledge H2. // Populated by newH2CUpgradeClient so the benchmark report can show @@ -105,6 +114,17 @@ type h2Client struct { upgradeAttempted int } +// h2ConnSlot owns one logical connection that can be re-dialed in place. cur +// holds the live *h2Conn (swapped atomically on reconnect so other workers +// observe the new one without locking); mu single-flights the redial so a +// fleet of workers sharing the slot dials once, not N times; backoff paces +// retries against a server that stays down. +type h2ConnSlot struct { + cur atomic.Pointer[h2Conn] + mu sync.Mutex + backoff connectBackoff +} + // h2WriteReq is a frame write request submitted to the writer goroutine. // For HEADERS: writeLoop allocates the streamID and registers the stream slot, // eliminating the need for a mutex on the worker side. @@ -113,7 +133,6 @@ type h2WriteReq struct { block []byte // HEADERS: header block fragment data []byte // HEADERS w/ body: data payload hasBody bool // HEADERS: has data frames to follow - maxFrame uint32 // HEADERS w/ body: max frame size pingData [8]byte // PING: response data respCh *chan h2Response // HEADERS: writeLoop registers this in the stream slot } @@ -150,9 +169,27 @@ type h2Conn struct { // Flow control — readLoop accumulates via atomic add, // writeLoop flushes between processing worker requests. + // + // maxFrameSize is the SERVER's SETTINGS_MAX_FRAME_SIZE (default 16384): + // DATA frames we SEND must not exceed it or the server replies + // FRAME_SIZE_ERROR and tears down the conn (the post-64k-h2 failure — + // a 64KiB frame against a 16384-default server). maxFrameSize uint32 pendingConnWindow atomic.Uint32 + // SEND-side flow control (data WE send to the server). The conn window + // starts at 65535 (RFC 7540 §6.9.2, NOT affected by SETTINGS); the + // per-stream window starts at the server's SETTINGS_INITIAL_WINDOW_SIZE. + // The writer goroutine is sequential (one body at a time), so the + // currently-writing stream's window lives in curStreamWindow keyed by + // curStreamID; readLoop replenishes both on WINDOW_UPDATE. Without this a + // 64KiB body (post-64k-h2 = 65536 B) exceeds the 65535 window by one byte + // and the request hangs to the 5-min deadline. + connSendWindow atomic.Int64 + curStreamID atomic.Uint32 + curStreamWindow atomic.Int64 + serverInitWindow uint32 + // Concurrency limit streamSem chan struct{} @@ -302,22 +339,30 @@ func newH2ClientWithDialer(host, port, path string, cfg Config, upgrade bool) (* } } - conns := make([]*h2Conn, numConns) - for i := range numConns { - var hc *h2Conn - var err error + // redial captures the dial parameters so a slot can re-establish its + // connection after the server closes/GOAWAYs it mid-cell. Used for both + // the initial dials below and reconnectSlot. + redial := func() (*h2Conn, error) { if upgrade { - hc, err = dialH2CUpgrade(addr, scheme, path, host, port, maxStreams, cfg.DialTimeout, cfg.ReadBufferSize, cfg.WriteBufferSize, tlsCfg) - } else { - hc, err = dialH2(addr, scheme, maxStreams, cfg.DialTimeout, cfg.ReadBufferSize, cfg.WriteBufferSize, tlsCfg) + return dialH2CUpgrade(addr, scheme, path, host, port, maxStreams, cfg.DialTimeout, cfg.ReadBufferSize, cfg.WriteBufferSize, tlsCfg) } + return dialH2(addr, scheme, maxStreams, cfg.DialTimeout, cfg.ReadBufferSize, cfg.WriteBufferSize, tlsCfg) + } + + conns := make([]*h2ConnSlot, numConns) + for i := range numConns { + hc, err := redial() if err != nil { for j := range i { - conns[j].closeConn() + if c := conns[j].cur.Load(); c != nil { + c.closeConn() + } } return nil, fmt.Errorf("h2client: dial conn[%d]: %w", i, err) } - conns[i] = hc + slot := &h2ConnSlot{} + slot.cur.Store(hc) + conns[i] = slot } var payload []byte @@ -331,11 +376,42 @@ func newH2ClientWithDialer(host, port, path string, cfg Config, upgrade bool) (* headerBlock: headerBlock, dataPayload: payload, hasBody: hasBody, + redial: redial, dialedViaUpgrade: upgrade, upgradeAttempted: numConns, }, nil } +// reconnectSlot re-establishes a dead slot's connection, single-flighted under +// the slot lock and paced by the slot's backoff. Returns the live conn, or nil +// if the redial failed (caller surfaces a connect error) or ctx ended. A +// concurrent caller that finds the slot already healed returns immediately. +func (c *h2Client) reconnectSlot(ctx context.Context, slot *h2ConnSlot) *h2Conn { + slot.mu.Lock() + defer slot.mu.Unlock() + + if hc := slot.cur.Load(); hc != nil && !hc.closed.Load() { + return hc // another worker already re-dialed this slot + } + if old := slot.cur.Load(); old != nil { + old.closeConn() // release fds/goroutines of the dead conn + } + + // Pace retries so a server that stays down can't be hot-redialled. + if !slot.backoff.sleep(ctx, nil) { + return nil // ctx cancelled mid-backoff + } + hc, err := c.redial() + if err != nil { + recordConnectError() + slot.cur.Store(nil) + return nil + } + slot.backoff.reset() + slot.cur.Store(hc) + return hc +} + // h2HopByHopHeaders lists HTTP/1.1 connection-specific headers that are // forbidden in HTTP/2 (RFC 9113 Section 8.2.2). These must be stripped // when encoding request headers for H2. @@ -479,10 +555,25 @@ func completeH2Handshake(conn net.Conn, br *bufio.Reader, addr string, maxStream } serverMaxStreams := uint32(maxStreams) + // RFC 7540 defaults: SETTINGS_MAX_FRAME_SIZE 16384, INITIAL_WINDOW_SIZE 65535. + serverMaxFrame := uint32(16384) + serverInitWin := uint32(65535) if serverSettings.Type == frameSettings { serverSettings.ForeachSetting(func(id uint16, val uint32) { - if id == settingMaxConcurrentStreams && val > 0 { - serverMaxStreams = val + switch id { + case settingMaxConcurrentStreams: + if val > 0 { + serverMaxStreams = val + } + case settingMaxFrameSize: + // Spec range 16384..16777215; clamp to be safe. + if val >= 16384 && val <= 16777215 { + serverMaxFrame = val + } + case settingInitialWindowSize: + if val <= 0x7FFFFFFF { + serverInitWin = val + } } }) if err := framer.WriteSettingsAck(); err != nil { @@ -522,15 +613,17 @@ func completeH2Handshake(conn net.Conn, br *bufio.Reader, addr string, maxStream numSlots := 2 * effectiveStreams hc := &h2Conn{ - conn: conn, - framer: framer, - bufWriter: bw, - writeCh: make(chan h2WriteReq, 4096), - streamSlots: make([]h2StreamSlot, numSlots), - maxFrameSize: h2MaxFrameSize, - streamSem: make(chan struct{}, effectiveStreams), - done: make(chan struct{}), - addr: addr, + conn: conn, + framer: framer, + bufWriter: bw, + writeCh: make(chan h2WriteReq, 4096), + streamSlots: make([]h2StreamSlot, numSlots), + // DATA-send split = the SERVER's max frame size (not our 64KiB receive cap). + maxFrameSize: serverMaxFrame, + serverInitWindow: serverInitWin, + streamSem: make(chan struct{}, effectiveStreams), + done: make(chan struct{}), + addr: addr, chanPool: sync.Pool{ New: func() any { ch := make(chan h2Response, 1) @@ -539,6 +632,7 @@ func completeH2Handshake(conn net.Conn, br *bufio.Reader, addr string, maxStream }, } hc.nextStreamID.Store(initialStreamID) + hc.connSendWindow.Store(65535) // RFC 7540 §6.9.2: conn window starts at 65535 for range effectiveStreams { hc.streamSem <- struct{}{} @@ -625,7 +719,7 @@ func (hc *h2Conn) processWriteReq(req h2WriteReq) { err := hc.framer.WriteHeaders(streamID, req.block, !req.hasBody) if err == nil && req.hasBody { - err = writeDataFrames(hc.framer, streamID, req.data, req.maxFrame) + err = hc.writeBodyFlowControlled(streamID, req.data) } if err != nil { hc.streamSlots[slotIdx].ch.Store(nil) @@ -733,7 +827,16 @@ func (hc *h2Conn) readLoop() { return case frameWindowUpdate: - // Ignore server-side window updates + // Replenish our SEND window so writeBodyFlowControlled can finish a + // body larger than the initial window (post-64k-h2). streamID 0 = + // connection-level; otherwise it targets a stream — apply it to the + // currently-writing stream's window (the writer is sequential). + incr := int64(frame.WindowUpdateIncrement()) + if frame.StreamID == 0 { + hc.connSendWindow.Add(incr) + } else if frame.StreamID == hc.curStreamID.Load() { + hc.curStreamWindow.Add(incr) + } } } } @@ -743,10 +846,20 @@ func (hc *h2Conn) readLoop() { // which receives from either writeLoop (on error) or readLoop (on response). func (c *h2Client) DoRequest(ctx context.Context, workerID int) (int, error) { idx := workerID % len(c.conns) - hc := c.conns[idx] - - if hc.closed.Load() { - return 0, fmt.Errorf("h2client: conn[%d] connection closed", idx) + slot := c.conns[idx] + hc := slot.cur.Load() + + // A server that closed/GOAWAYed this connection mid-cell leaves the slot + // dead; re-dial (paced) so the next request gets a fresh conn instead of + // spinning closed-conn errors. backoff lives in reconnectSlot. + if hc == nil || hc.closed.Load() { + hc = c.reconnectSlot(ctx, slot) + if hc == nil { + if ctx.Err() != nil { + return 0, ctx.Err() + } + return 0, fmt.Errorf("h2client: conn[%d] reconnect failed", idx) + } } // Acquire stream semaphore @@ -769,12 +882,11 @@ func (c *h2Client) DoRequest(ctx context.Context, workerID int) (int, error) { // On write error, writeLoop sends to *respCh directly. select { case hc.writeCh <- h2WriteReq{ - kind: h2WriteHeaders, - block: c.headerBlock, - data: c.dataPayload, - hasBody: c.hasBody, - maxFrame: hc.maxFrameSize, - respCh: chPtr, + kind: h2WriteHeaders, + block: c.headerBlock, + data: c.dataPayload, + hasBody: c.hasBody, + respCh: chPtr, }: case <-hc.done: hc.chanPool.Put(chPtr) @@ -811,18 +923,55 @@ func (c *h2Client) DoRequest(ctx context.Context, workerID int) (int, error) { } } -// writeDataFrames writes body data, splitting into frames if needed. -func writeDataFrames(framer *h2Framer, streamID uint32, data []byte, maxFrameSize uint32) error { +// writeBodyFlowControlled sends a request body as DATA frames, respecting BOTH +// the server's SETTINGS_MAX_FRAME_SIZE and its connection + per-stream send +// windows (RFC 7540 §6.9). The writer goroutine is sequential (one body at a +// time), so the active stream's window lives in hc.curStreamWindow keyed by +// hc.curStreamID; readLoop replenishes connSendWindow/curStreamWindow on +// WINDOW_UPDATE. When a window is exhausted we flush the buffered frames (so the +// server can consume + replenish) and poll — no extra goroutine/channel. The +// conn's 5-min deadline (completeH2Handshake) is the deadlock backstop. +// +// Without this, a 64KiB body (post-64k-h2 = 65536 B) either trips FRAME_SIZE_ERROR +// (64KiB DATA frame vs a 16384-default server) or overruns the 65535 window. +func (hc *h2Conn) writeBodyFlowControlled(streamID uint32, data []byte) error { + hc.curStreamWindow.Store(int64(hc.serverInitWindow)) + hc.curStreamID.Store(streamID) + + maxFrame := int(hc.maxFrameSize) + if maxFrame < 16384 { + maxFrame = 16384 + } for len(data) > 0 { - chunk := data - endStream := true - if uint32(len(chunk)) > maxFrameSize { - chunk = data[:maxFrameSize] - endStream = false + avail := maxFrame + if cw := int(hc.connSendWindow.Load()); cw < avail { + avail = cw + } + if sw := int(hc.curStreamWindow.Load()); sw < avail { + avail = sw } - if err := framer.WriteData(streamID, endStream, chunk); err != nil { + if avail <= 0 { + // Window exhausted: flush so the peer receives what we've sent and + // can send WINDOW_UPDATE, then wait for readLoop to replenish. + _ = hc.bufWriter.Flush() + select { + case <-hc.done: + return fmt.Errorf("h2client: conn closed mid-body (flow-control wait)") + default: + } + time.Sleep(50 * time.Microsecond) + continue + } + if avail > len(data) { + avail = len(data) + } + chunk := data[:avail] + endStream := len(chunk) == len(data) + if err := hc.framer.WriteData(streamID, endStream, chunk); err != nil { return err } + hc.connSendWindow.Add(-int64(len(chunk))) + hc.curStreamWindow.Add(-int64(len(chunk))) data = data[len(chunk):] } return nil @@ -830,8 +979,12 @@ func writeDataFrames(framer *h2Framer, streamID uint32, data []byte, maxFrameSiz // Close closes all connections. func (c *h2Client) Close() { - for _, hc := range c.conns { - hc.closeConn() + for _, slot := range c.conns { + slot.mu.Lock() + if hc := slot.cur.Load(); hc != nil { + hc.closeConn() + } + slot.mu.Unlock() } } diff --git a/h2client_test.go b/h2client_test.go index a716382..895a475 100644 --- a/h2client_test.go +++ b/h2client_test.go @@ -3,6 +3,7 @@ package loadgen import ( "context" "fmt" + "io" "net" "net/http" "sync" @@ -209,6 +210,132 @@ func TestH2StressXLargeResponse(t *testing.T) { t.Logf("xlarge (2MB): %d OK, %d errors", ok, errs) } +// startStrictUploadH2CServer runs an h2c server that advertises the RFC 7540 +// default limits real bench targets (celeris, etc.) use — SETTINGS_MAX_FRAME_SIZE +// 16384 and a 65535-byte connection + per-stream receive window — rather than +// x/net's lenient 1 MiB defaults. Its /drain endpoint reads the whole request +// body and 200s only if every declared byte arrived. A client that sends an +// oversized DATA frame trips FRAME_SIZE_ERROR; one that sends past the window +// trips FLOW_CONTROL_ERROR — both surface as a DoRequest error. +func startStrictUploadH2CServer(tb testing.TB) (host, port string, cleanup func()) { + tb.Helper() + mux := http.NewServeMux() + mux.HandleFunc("/drain", func(w http.ResponseWriter, r *http.Request) { + n, err := io.Copy(io.Discard, r.Body) + if err != nil || (r.ContentLength >= 0 && n != r.ContentLength) { + w.WriteHeader(400) + return + } + w.WriteHeader(200) + }) + + h2s := &http2.Server{ + MaxConcurrentStreams: 100, + MaxReadFrameSize: 16384, // advertised SETTINGS_MAX_FRAME_SIZE + MaxUploadBufferPerStream: 65535, // advertised SETTINGS_INITIAL_WINDOW_SIZE + MaxUploadBufferPerConnection: 65535, // no extra connection-window WINDOW_UPDATE + } + handler := h2c.NewHandler(mux, h2s) //nolint:staticcheck // see import-line comment + + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + tb.Fatal(err) + } + srv := &http.Server{Handler: handler, ReadHeaderTimeout: 5 * time.Second} + go func() { _ = srv.Serve(ln) }() + h, p, _ := net.SplitHostPort(ln.Addr().String()) + return h, p, func() { _ = srv.Close(); _ = ln.Close() } +} + +// TestH2CPostBodyExceedsWindowAndFrameSize is the upload counterpart to the +// download flow-control tests above. It POSTs a body larger than BOTH +// SETTINGS_MAX_FRAME_SIZE (16384) and the initial connection/stream send +// window (65535), so a correct client must split the body into frame-sized +// DATA frames and pace them against the peer's advertised window +// (RFC 7540 §6.9). The strict server enforces both limits, so an oversized +// frame trips FRAME_SIZE_ERROR and an over-window send trips +// FLOW_CONTROL_ERROR, each surfacing as a DoRequest error; the /drain handler +// additionally 400s unless every declared byte arrived. This exercises the +// post-64k-h2 scenario that previously sent a single 64 KiB DATA frame. +func TestH2CPostBodyExceedsWindowAndFrameSize(t *testing.T) { + host, port, cleanup := startStrictUploadH2CServer(t) + defer cleanup() + + body := make([]byte, 200000) // > 65535 window and > 16384 max-frame + for i := range body { + body[i] = byte(i) + } + + client, err := newH2Client(host, port, "/drain", testH2Cfg("POST", nil, body, 1, 100)) + if err != nil { + t.Fatalf("newH2Client: %v", err) + } + defer client.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Repeat so a one-shot window mishap (e.g. window not replenished between + // requests) surfaces on a later iteration. + for i := range 10 { + if _, err := client.DoRequest(ctx, 0); err != nil { + t.Fatalf("DoRequest %d with %d-byte body: %v", i, len(body), err) + } + } +} + +// TestH2ReconnectAfterConnClosed verifies that a connection the server closes +// or GOAWAYs mid-cell is transparently re-dialed, instead of stranding the slot +// dead and spinning closed-conn errors (fastapi-h2 under hypercorn logged ~1.1B +// errors / 0 requests from exactly this — hypercorn GOAWAYs connections +// periodically). We close the live conn out from under the client, then assert +// the next requests recover against the still-running server and the slot now +// holds a fresh, live conn. +func TestH2ReconnectAfterConnClosed(t *testing.T) { + host, port, cleanup := startH2CServer(t, 100) + defer cleanup() + + client, err := newH2Client(host, port, "/simple", testH2Cfg("GET", nil, nil, 1, 100)) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + if _, err := client.DoRequest(ctx, 0); err != nil { + t.Fatalf("initial DoRequest: %v", err) + } + + // Simulate the server dropping the connection (GOAWAY + close). + old := client.conns[0].cur.Load() + old.closeConn() + if !old.closed.Load() { + t.Fatal("conn not marked closed after closeConn") + } + + // Subsequent requests must re-dial the (still-running) server and succeed. + var lastErr error + recovered := false + for range 50 { + _, err := client.DoRequest(ctx, 0) + if err == nil { + recovered = true + break + } + lastErr = err + } + if !recovered { + t.Fatalf("client did not recover after conn close; last err: %v", lastErr) + } + + // The slot now holds a fresh, live conn — not the dead one. + if cur := client.conns[0].cur.Load(); cur == nil || cur == old || cur.closed.Load() { + t.Fatalf("slot not healed after reconnect (cur==nil:%v, cur==old:%v)", cur == nil, cur == old) + } +} + // TestH2StressMultiConn tests with many connections and high stream counts, // mimicking the real benchmark configuration. func TestH2StressMultiConn(t *testing.T) { @@ -380,7 +507,7 @@ func TestH2StreamIDExhaustion(t *testing.T) { defer client.Close() // Fast-forward stream ID close to max - client.conns[0].nextStreamID.Store(0x7FFFFFF0) + client.conns[0].cur.Load().nextStreamID.Store(0x7FFFFFF0) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() diff --git a/h2framer.go b/h2framer.go index 3f0d338..f006358 100644 --- a/h2framer.go +++ b/h2framer.go @@ -73,6 +73,15 @@ func (f *h2Frame) GoAwayErrCode() uint32 { return 0 } +// WindowUpdateIncrement returns the 31-bit window-size increment from a +// WINDOW_UPDATE frame (payload[0:4], reserved high bit masked off). +func (f *h2Frame) WindowUpdateIncrement() uint32 { + if len(f.payload) >= 4 { + return binary.BigEndian.Uint32(f.payload[:4]) & 0x7FFFFFFF + } + return 0 +} + // PingData returns the 8-byte ping data. func (f *h2Frame) PingData() [8]byte { var d [8]byte