Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -936,11 +936,9 @@ private Flux<Event> createEventStream(StreamOptions options, Supplier<Mono<Msg>>
finalMsg,
true));
}

// Complete the stream
sink.complete();
},
sink::error);
sink::error,
sink::complete);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] Consider also emitting a synthetic AGENT_RESULT (or at least a placeholder empty assistant Msg) when the upstream Mono completes empty, so downstream SSE / AG-UI adapters that key off AGENT_RESULT as the terminal event still receive a final marker. The current fix correctly unblocks completion, but consumers that loop until they observe an AGENT_RESULT will now silently terminate without one. If keeping the current behavior intentionally, please document it in the Javadoc of createEventStream (e.g. "if the agent call produces no final message, the stream completes without emitting an AGENT_RESULT event").

},
FluxSink.OverflowStrategy.BUFFER)
.publishOn(Schedulers.boundedElastic()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import io.agentscope.core.ReActAgent;
import io.agentscope.core.agent.test.MockModel;
import io.agentscope.core.interruption.InterruptContext;
import io.agentscope.core.message.Msg;
import io.agentscope.core.message.MsgRole;
import io.agentscope.core.message.TextBlock;
import io.agentscope.core.model.ChatResponse;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -248,4 +251,38 @@ void testStreamEventCount() {
// Should have at least one event (the agent result)
assertTrue(eventCount.get() >= 1);
}

@Test
void testReActAgentStreamCompletesWhenModelReturnsEmptyText() {
MockModel model =
new MockModel(
messages ->
List.of(
ChatResponse.builder()
.id("empty-msg")
.content(
List.of(
TextBlock.builder()
.text("")
.build()))
.build()));
ReActAgent agent = ReActAgent.builder().name("test-agent").model(model).build();

Msg inputMsg =
Msg.builder()
.name("user")
.role(MsgRole.USER)
.content(List.of(TextBlock.builder().text("hello").build()))
.build();

StepVerifier.create(agent.stream(List.of(inputMsg)))
.expectNextMatches(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] The test relies on at least one REASONING event being emitted by StreamingHook before completion. Per the issue analysis, an empty TextBlock does not satisfy TextAccumulator.hasContent(), so this assumption depends on hook implementation details (e.g. the start-of-reasoning event still firing). Consider making the assertion more defensive, e.g. .thenConsumeWhile(event -> true).expectComplete() without the upfront expectNextMatches, since the fix's contract is purely about completion, not about which events fire. As written the test couples the regression assertion to the StreamingHook event sequence and may become flaky if hook emission semantics change.

event ->
event.getType() == EventType.REASONING
&& !event.isLast()
&& event.getMessage() != null)
.thenConsumeWhile(event -> true)
.expectComplete()
.verify(Duration.ofSeconds(2));
}
}
Loading