Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions protocol/triple/triple_protocol/duplex_http_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,9 @@ func (d *duplexHTTPCall) CloseRead() error {
if d.response == nil {
return nil
}
if err := discard(d.response.Body); err != nil {
return wrapIfRSTError(err)
}
// Return incoming data via context, if set outgoing data.
// Do not read the response body here. CloseRead must return even when the
// peer leaves a streaming response open; callers that need trailers should
// read to EOF before closing.
if ExtractFromOutgoingContext(d.ctx) != nil {
newIncomingContext(d.ctx, d.ResponseTrailer())
}
Expand Down
90 changes: 90 additions & 0 deletions protocol/triple/triple_protocol/duplex_http_call_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,22 @@
package triple_protocol

import (
"context"
"io"
"net/http"
"net/http/httptest"
"net/url"
"sync"
"testing"
"time"
)

import (
"dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol/internal/assert"
)

const closeTestTimeout = time.Second

// newTestDuplexClientCall builds a duplexHTTPCall for a client-streaming RPC
// backed by a server that simply drains the request body and replies 200. It is
// the minimal setup needed to exercise the Write/CloseWrite/CloseRead lifecycle
Expand Down Expand Up @@ -182,3 +186,89 @@ func TestDuplexHTTPCallConcurrentWriteCloseRace(t *testing.T) {
_ = call.CloseRead()
}
}

// TestDuplexHTTPCallCloseReadDoesNotDrainResponseBody verifies that CloseRead
// closes the response body directly instead of reading until EOF. This guards
// the stream-close hang fixed in connect-go#791.
//
// A streaming peer may keep the response open after it stops sending data, so
// draining the body during close can block forever. Callers that need final
// trailers should read to EOF before closing the read side.
func TestDuplexHTTPCallCloseReadDoesNotDrainResponseBody(t *testing.T) {
t.Parallel()
body := newBlockingReadCloser()
call := &duplexHTTPCall{
ctx: context.Background(),
responseReady: make(chan struct{}),
response: &http.Response{
Body: body,
Trailer: make(http.Header),
},
}
close(call.responseReady)

done := make(chan error, 1)
go func() {
done <- call.CloseRead()
}()

select {
case err := <-done:
assert.Nil(t, err)
case <-body.readStarted:
body.unblock()
assert.Nil(t, <-done)
t.Fatal("CloseRead attempted to drain the response body")
case <-time.After(closeTestTimeout):
body.unblock()
assert.Nil(t, <-done)
t.Fatal("CloseRead blocked while closing the response body")
}

select {
case <-body.closed:
default:
t.Fatal("CloseRead did not close the response body")
}
}

// blockingReadCloser records attempted reads and then blocks. It lets the test
// catch drain-before-close behavior without depending on a real stalled stream.
type blockingReadCloser struct {
readStarted chan struct{}
readUnblock chan struct{}
closed chan struct{}

startReadOnce sync.Once
unblockOnce sync.Once
closeOnce sync.Once
}

func newBlockingReadCloser() *blockingReadCloser {
return &blockingReadCloser{
readStarted: make(chan struct{}),
readUnblock: make(chan struct{}),
closed: make(chan struct{}),
}
}

func (b *blockingReadCloser) Read([]byte) (int, error) {
b.startReadOnce.Do(func() {
close(b.readStarted)
})
<-b.readUnblock
return 0, io.EOF
}

func (b *blockingReadCloser) Close() error {
b.closeOnce.Do(func() {
close(b.closed)
})
return nil
}

func (b *blockingReadCloser) unblock() {
b.unblockOnce.Do(func() {
close(b.readUnblock)
})
}
251 changes: 251 additions & 0 deletions protocol/triple/triple_protocol/stream_close_ext_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package triple_protocol_test

import (
"context"
"errors"
"io"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"
)

import (
triple "dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol/internal/assert"
pingv1 "dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol/internal/gen/proto/connect/ping/v1"
"dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol/internal/gen/proto/connect/ping/v1/pingv1connect"
)

const closeTestTimeout = time.Second

// TestServerStreamCloseDoesNotDrainResponse verifies that Close returns
// promptly after the client stops reading a server-streaming response.
//
// The server sends one message and then keeps the stream open. Close should
// release the response body without waiting for the stream to reach EOF.
func TestServerStreamCloseDoesNotDrainResponse(t *testing.T) {
release, unblock := newCloseRelease()
t.Cleanup(unblock)
sent := make(chan struct{})
client := newCloseLifecyclePingClient(t, &pluggablePingServer{
countUp: func(ctx context.Context, req *triple.Request, stream *triple.ServerStream) error {
if err := stream.Send(&pingv1.CountUpResponse{Number: 1}); err != nil {
return err
}
close(sent)
<-release
return nil
},
})

stream, err := client.CountUp(context.Background(), triple.NewRequest(&pingv1.CountUpRequest{}))
assert.Nil(t, err)

select {
case <-sent:
case <-time.After(closeTestTimeout):
t.Fatal("server did not send the first response before close verification")
}

if !stream.Receive(&pingv1.CountUpResponse{}) {
t.Fatalf("failed to receive first response before close verification: %v", stream.Err())
}
msg := stream.Msg().(*pingv1.CountUpResponse)
assert.Equal(t, msg.Number, int64(1))

done := make(chan error, 1)
go func() {
done <- stream.Close()
}()
assert.Nil(t, assertCloseReturnsPromptly(t, done, unblock, "ServerStreamForClient.Close blocked while draining the response"))
}

// TestBidiStreamCloseResponseDoesNotDrainResponse verifies that CloseResponse
// does not wait for EOF on a bidi response stream.
//
// After one successful receive, the server keeps the response side open.
// Closing the client receive side should still return promptly.
func TestBidiStreamCloseResponseDoesNotDrainResponse(t *testing.T) {
release, unblock := newCloseRelease()
t.Cleanup(unblock)
received := make(chan struct{})
sent := make(chan struct{})
client := newCloseLifecyclePingClient(t, &pluggablePingServer{
cumSum: func(ctx context.Context, stream *triple.BidiStream) error {
req := &pingv1.CumSumRequest{}
if err := stream.Receive(req); err != nil {
return err
}
close(received)
if err := stream.Send(&pingv1.CumSumResponse{Sum: req.Number}); err != nil {
return err
}
close(sent)
<-release
return nil
},
})

stream, err := client.CumSum(context.Background())
assert.Nil(t, err)
if err := stream.Send(&pingv1.CumSumRequest{Number: 2}); err != nil {
t.Fatalf("failed to send setup request before close verification: %v", err)
}

select {
case <-received:
case <-time.After(closeTestTimeout):
t.Fatal("server did not receive the setup request before close verification")
}
select {
case <-sent:
case <-time.After(closeTestTimeout):
t.Fatal("server did not send the setup response before close verification")
}

res := &pingv1.CumSumResponse{}
if err := stream.Receive(res); err != nil {
t.Fatalf("failed to receive setup response before close verification: %v", err)
}
assert.Equal(t, res.Sum, int64(2))

done := make(chan error, 1)
go func() {
done <- stream.CloseResponse()
}()
assert.Nil(t, assertCloseReturnsPromptly(t, done, unblock, "BidiStreamForClient.CloseResponse blocked while draining the response"))
assert.Nil(t, stream.CloseRequest())
}

// TestBidiStreamCloseResponseAfterServerStopsReading covers the case where the
// server returns an error before consuming the rest of the request stream.
//
// Once the client has received that error, CloseResponse should be local cleanup
// and should not wait for the server to continue the stream.
func TestBidiStreamCloseResponseAfterServerStopsReading(t *testing.T) {
serverReceived := make(chan struct{})
serverReturn := make(chan struct{})
client := newCloseLifecyclePingClient(t, &pluggablePingServer{
cumSum: func(ctx context.Context, stream *triple.BidiStream) error {
req := &pingv1.CumSumRequest{}
if err := stream.Receive(req); err != nil {
return err
}
close(serverReceived)
<-serverReturn
return triple.NewError(triple.CodeUnavailable, errors.New("server stopped reading"))
},
})

stream, err := client.CumSum(context.Background())
assert.Nil(t, err)
if sendErr := stream.Send(&pingv1.CumSumRequest{Number: 1}); sendErr != nil {
t.Fatalf("failed to send setup request before close verification: %v", sendErr)
}

select {
case <-serverReceived:
case <-time.After(closeTestTimeout):
t.Fatal("server did not receive the setup request before close verification")
}
close(serverReturn)

err = stream.Receive(&pingv1.CumSumResponse{})
assert.NotNil(t, err)
assert.Equal(t, triple.CodeOf(err), triple.CodeUnavailable)

done := make(chan error, 1)
go func() {
done <- stream.CloseResponse()
}()
assert.Nil(t, assertCloseReturnsPromptly(t, done, nil, "BidiStreamForClient.CloseResponse blocked after server stopped reading"))
assert.Nil(t, stream.CloseRequest())
}

// TestClientStreamCloseAndReceiveAfterServerReturnsError covers a client-stream
// RPC where the server returns an error before the client finishes sending all
// messages. CloseAndReceive should surface the server error and finish cleanup
// without hanging in CloseResponse.
func TestClientStreamCloseAndReceiveAfterServerReturnsError(t *testing.T) {
client := newCloseLifecyclePingClient(t, &pluggablePingServer{
sum: func(ctx context.Context, stream *triple.ClientStream) (*triple.Response, error) {
return nil, triple.NewError(triple.CodeUnavailable, errors.New("server returned early"))
},
})

stream, err := client.Sum(context.Background())
assert.Nil(t, err)
if err = stream.Send(&pingv1.SumRequest{Number: 1}); err != nil {
assert.ErrorIs(t, err, io.EOF)
}

done := make(chan error, 1)
go func() {
done <- stream.CloseAndReceive(triple.NewResponse(&pingv1.SumResponse{}))
}()
err = assertCloseReturnsPromptly(t, done, nil, "ClientStreamForClient.CloseAndReceive blocked after server returned early")
assert.NotNil(t, err)
assert.Equal(t, triple.CodeOf(err), triple.CodeUnavailable)
}

func newCloseLifecyclePingClient(t *testing.T, pingServer pingv1connect.PingServiceHandler) pingv1connect.PingServiceClient {
t.Helper()
mux := http.NewServeMux()
mux.Handle(pingv1connect.NewPingServiceHandler(pingServer))
server := httptest.NewUnstartedServer(mux)
server.EnableHTTP2 = true
server.StartTLS()
t.Cleanup(server.Close)
return pingv1connect.NewPingServiceClient(server.Client(), server.URL)
}

// newCloseRelease returns a channel that lets tests keep the server handler
// alive until the client-side close call has completed. The unblock function is
// idempotent so cleanup can safely release the handler after failures.
func newCloseRelease() (<-chan struct{}, func()) {
release := make(chan struct{})
var once sync.Once
return release, func() {
once.Do(func() {
close(release)
})
}
}

// assertCloseReturnsPromptly treats a timeout as evidence that close attempted
// to drain the response stream. On failure it releases the server first so the
// goroutine running the close call can finish before the test exits.
func assertCloseReturnsPromptly(t *testing.T, done <-chan error, unblock func(), failure string) error {
t.Helper()
select {
case err := <-done:
return err
case <-time.After(closeTestTimeout):
if unblock != nil {
unblock()
err := <-done
assert.Nil(t, err)
}
t.Fatal(failure)
return nil
}
}
Loading