From 98c041526cb74a824048c4a3bf08f467daf45946 Mon Sep 17 00:00:00 2001 From: liuhy Date: Wed, 17 Jun 2026 15:17:37 +0800 Subject: [PATCH] fix(remoting): clean up pendingResponses on error paths to prevent map leak The global pendingResponses sync.Map is cleaned up on the success path via Response.Handle() -> removePendingResponse(), but entries were never removed when requests fail: - ExchangeClient.Request(): AddPendingResponse called before client.Request(), but on error the entry was left in the map - ExchangeClient.AsyncRequest(): same pattern, same leak - getty heartbeat(): timeout branch never called removePendingResponse, leaving the entry to accumulate indefinitely Add removePendingResponse calls in all three error/timeout paths. Export RemovePendingResponse so the getty package (external to remoting) can call it from its heartbeat timeout handler. --- remoting/exchange.go | 6 ++++++ remoting/exchange_client.go | 2 ++ remoting/getty/listener.go | 1 + 3 files changed, 9 insertions(+) diff --git a/remoting/exchange.go b/remoting/exchange.go index 60dd11b473..0647f8ac1a 100644 --- a/remoting/exchange.go +++ b/remoting/exchange.go @@ -184,6 +184,12 @@ func removePendingResponse(seq SequenceType) *PendingResponse { return nil } +// RemovePendingResponse removes and returns the pending response for the given sequence ID. +// It is the exported version of removePendingResponse for use by external packages. +func RemovePendingResponse(seq SequenceType) *PendingResponse { + return removePendingResponse(seq) +} + // GetPendingResponse gets the response func GetPendingResponse(seq SequenceType) *PendingResponse { if presp, ok := pendingResponses.Load(seq); ok { diff --git a/remoting/exchange_client.go b/remoting/exchange_client.go index a7cb53cf24..cb535ec758 100644 --- a/remoting/exchange_client.go +++ b/remoting/exchange_client.go @@ -132,6 +132,7 @@ func (client *ExchangeClient) Request(invocation *base.Invocation, url *common.U err := client.client.Request(request, timeout, rsp) // request error if err != nil { + removePendingResponse(SequenceType(request.ID)) res.Err = err return err } @@ -165,6 +166,7 @@ func (client *ExchangeClient) AsyncRequest(invocation *base.Invocation, url *com err := client.client.Request(request, timeout, rsp) if err != nil { + removePendingResponse(SequenceType(request.ID)) result.Err = err return err } diff --git a/remoting/getty/listener.go b/remoting/getty/listener.go index ed3a691145..4964463aa7 100644 --- a/remoting/getty/listener.go +++ b/remoting/getty/listener.go @@ -354,6 +354,7 @@ func heartbeat(session getty.Session, timeout time.Duration, callBack func(err e select { case <-gxtime.After(timeout): err1 = errHeartbeatReadTimeout + remoting.RemovePendingResponse(remoting.SequenceType(req.ID)) case <-resp.Done: err1 = resp.Err }