diff --git a/go/adk/pkg/models/bedrock.go b/go/adk/pkg/models/bedrock.go index d9db5a842..fd0463711 100644 --- a/go/adk/pkg/models/bedrock.go +++ b/go/adk/pkg/models/bedrock.go @@ -243,9 +243,10 @@ func (m *BedrockModel) generateStreaming(ctx context.Context, modelId string, me var finishReason genai.FinishReason var usageMetadata *genai.GenerateContentResponseUsageMetadata - // Track tool calls during streaming - // Map of content block index -> tool call being built + // Track tool calls and reasoning blocks during streaming. + // Maps of content block index -> in-flight block being built. toolCalls := make(map[int32]*streamingToolCall) + reasoningBlocks := make(map[int32]*streamingReasoningBlock) var completedToolCalls []*genai.Part // Get the event stream and read events from the channel @@ -254,19 +255,20 @@ func (m *BedrockModel) generateStreaming(ctx context.Context, modelId string, me // Read events from the channel for event := range stream.Events() { - // Handle content block start (tool use start) + // Handle content block start (tool use or reasoning start) if start, ok := event.(*types.ConverseStreamOutputMemberContentBlockStart); ok { + blockIdx := aws.ToInt32(start.Value.ContentBlockIndex) if toolStart, ok := start.Value.Start.(*types.ContentBlockStartMemberToolUse); ok { - // A new tool use block is starting - initialize tracking - blockIdx := aws.ToInt32(start.Value.ContentBlockIndex) toolCalls[blockIdx] = &streamingToolCall{ ID: aws.ToString(toolStart.Value.ToolUseId), Name: aws.ToString(toolStart.Value.Name), } } + // Reasoning blocks have no start payload; we initialize on first delta. + _ = blockIdx } - // Handle content block delta (streaming text or tool input) + // Handle content block delta (streaming text, tool input, or reasoning) if chunk, ok := event.(*types.ConverseStreamOutputMemberContentBlockDelta); ok { blockIdx := aws.ToInt32(chunk.Value.ContentBlockIndex) @@ -295,10 +297,25 @@ func (m *BedrockModel) generateStreaming(ctx context.Context, modelId string, me if tc, ok := toolCalls[blockIdx]; ok && delta.Value.Input != nil { tc.InputJSON += aws.ToString(delta.Value.Input) } + + case *types.ContentBlockDeltaMemberReasoningContent: + // Reasoning (thinking) delta — accumulate text and signature. + if _, exists := reasoningBlocks[blockIdx]; !exists { + reasoningBlocks[blockIdx] = &streamingReasoningBlock{} + } + rb := reasoningBlocks[blockIdx] + switch rd := delta.Value.(type) { + case *types.ReasoningContentBlockDeltaMemberText: + rb.Text += rd.Value + case *types.ReasoningContentBlockDeltaMemberSignature: + rb.Signature = rd.Value + case *types.ReasoningContentBlockDeltaMemberRedactedContent: + rb.Redacted = rd.Value + } } } - // Handle content block stop (tool use complete) + // Handle content block stop (tool use or reasoning complete) if stop, ok := event.(*types.ConverseStreamOutputMemberContentBlockStop); ok { blockIdx := aws.ToInt32(stop.Value.ContentBlockIndex) if tc, ok := toolCalls[blockIdx]; ok { @@ -316,8 +333,10 @@ func (m *BedrockModel) generateStreaming(ctx context.Context, modelId string, me Args: args, } completedToolCalls = append(completedToolCalls, &genai.Part{FunctionCall: functionCall}) - delete(toolCalls, blockIdx) // Clean up + delete(toolCalls, blockIdx) } + // Reasoning blocks are finalized at message-stop (collected in reasoningBlocks map). + _ = stop } // Handle message stop (includes stop reason) @@ -337,8 +356,15 @@ func (m *BedrockModel) generateStreaming(ctx context.Context, modelId string, me } } - // Build final response + // Build final response — reasoning parts first so they precede toolUse blocks, + // matching the order Bedrock requires when echoing them back in tool-result turns. finalParts := []*genai.Part{} + for _, rb := range reasoningBlocks { + part := rb.toPart() + if part != nil { + finalParts = append(finalParts, part) + } + } text := aggregatedText.String() if text != "" { finalParts = append(finalParts, &genai.Part{Text: text}) @@ -379,6 +405,27 @@ func (tc *streamingToolCall) parseArgs() map[string]any { return args } +// streamingReasoningBlock tracks a reasoning (thinking) block being built during streaming. +type streamingReasoningBlock struct { + Text string + Signature string + Redacted []byte +} + +func (rb *streamingReasoningBlock) toPart() *genai.Part { + if len(rb.Redacted) > 0 { + return &genai.Part{Thought: true, ThoughtSignature: rb.Redacted} + } + if rb.Text == "" && rb.Signature == "" { + return nil + } + part := &genai.Part{Thought: true, Text: rb.Text} + if rb.Signature != "" { + part.ThoughtSignature = []byte(rb.Signature) + } + return part +} + // generateNonStreaming handles non-streaming responses from Bedrock Converse. // reverseNameMap maps sanitized Bedrock tool names back to their original names. func (m *BedrockModel) generateNonStreaming(ctx context.Context, modelId string, messages []types.Message, systemPrompt []types.SystemContentBlock, inferenceConfig *types.InferenceConfiguration, toolConfig *types.ToolConfiguration, additionalFields document.Interface, reverseNameMap map[string]string, yield func(*model.LLMResponse, error) bool) { @@ -403,6 +450,26 @@ func (m *BedrockModel) generateNonStreaming(ctx context.Context, modelId string, parts := []*genai.Part{} if message, ok := output.Output.(*types.ConverseOutputMemberMessage); ok { for _, block := range message.Value.Content { + // Handle reasoning (thinking) content — must be preserved and echoed back + // in subsequent tool-result turns or Bedrock returns ValidationException. + if reasoningBlock, ok := block.(*types.ContentBlockMemberReasoningContent); ok { + if textBlock, ok := reasoningBlock.Value.(*types.ReasoningContentBlockMemberReasoningText); ok { + part := &genai.Part{ + Thought: true, + Text: aws.ToString(textBlock.Value.Text), + } + if textBlock.Value.Signature != nil { + part.ThoughtSignature = []byte(aws.ToString(textBlock.Value.Signature)) + } + parts = append(parts, part) + } else if redacted, ok := reasoningBlock.Value.(*types.ReasoningContentBlockMemberRedactedContent); ok { + parts = append(parts, &genai.Part{ + Thought: true, + ThoughtSignature: redacted.Value, + }) + } + continue + } // Handle text content if textBlock, ok := block.(*types.ContentBlockMemberText); ok { parts = append(parts, &genai.Part{Text: textBlock.Value}) @@ -473,6 +540,22 @@ func documentToMap(doc document.Interface) map[string]any { return result } +const ( + // historyToolResultMaxLen is the maximum character length of a tool result + // in any turn that is NOT the most recent assistant+tool-result pair. + // Older tool results are truncated to keep total prompt tokens manageable. + // ~4 chars per token → 2000 chars ≈ 500 tokens per historical tool call. + historyToolResultMaxLen = 2000 +) + +// truncateToolResult truncates s to maxLen characters, appending a note when truncated. +func truncateToolResult(s string, maxLen int) string { + if len(s) <= maxLen { + return s + } + return s[:maxLen] + fmt.Sprintf("\n... [truncated, %d chars omitted from history]", len(s)-maxLen) +} + // convertGenaiContentsToBedrockMessages converts genai.Content to Bedrock Converse API message format. // nameMap is the original->sanitized tool name map produced by convertGenaiToolsToBedrock. // Any FunctionCall found in the conversation history is written with the sanitized name so @@ -486,7 +569,43 @@ func convertGenaiContentsToBedrockMessages(contents []*genai.Content, nameMap ma idMap := make(map[string]string) idCounter := 0 - for _, content := range contents { + // AWS requires thinking blocks only for the last assistant turn before tool results. + // Preserving them in earlier turns causes token counts to explode across multi-turn + // conversations. Find the last assistant content index that contains thinking parts. + lastThinkingAssistantIdx := -1 + for i, content := range contents { + if content == nil { + continue + } + if content.Role == "model" || content.Role == "assistant" { + for _, part := range content.Parts { + if part != nil && part.Thought { + lastThinkingAssistantIdx = i + break + } + } + } + } + + // Tool responses (kubectl output, YAML blobs, etc.) can be enormous. + // Only the most recent user turn carrying tool results needs full fidelity; + // older ones are truncated to prevent token count explosion across sessions. + lastToolResultIdx := -1 + for i, content := range contents { + if content == nil { + continue + } + if content.Role == "user" { + for _, part := range content.Parts { + if part != nil && part.FunctionResponse != nil { + lastToolResultIdx = i + break + } + } + } + } + + for i, content := range contents { if content == nil || len(content.Parts) == 0 { continue } @@ -497,6 +616,11 @@ func convertGenaiContentsToBedrockMessages(contents []*genai.Content, nameMap ma role = types.ConversationRoleAssistant } + // Only echo thinking blocks for the last assistant turn that contains them. + emitThinking := i == lastThinkingAssistantIdx + // Truncate tool results in all but the most recent user turn that contains them. + truncateTools := i != lastToolResultIdx + var contentBlocks []types.ContentBlock for _, part := range content.Parts { @@ -517,6 +641,39 @@ func convertGenaiContentsToBedrockMessages(contents []*genai.Content, nameMap ma continue } + // Handle reasoning (thinking) parts — echo them back unmodified so Bedrock + // can maintain reasoning continuity across tool-result turns. + // AWS docs: "you must pass thinking blocks back to the API for the last + // assistant message … include the complete unmodified block." + // Earlier turns have thinking stripped to prevent token explosion. + if part.Thought { + if !emitThinking { + continue + } + if len(part.ThoughtSignature) > 0 && part.Text == "" { + // Redacted block + contentBlocks = append(contentBlocks, &types.ContentBlockMemberReasoningContent{ + Value: &types.ReasoningContentBlockMemberRedactedContent{ + Value: part.ThoughtSignature, + }, + }) + } else { + textBlock := &types.ReasoningTextBlock{ + Text: aws.String(part.Text), + } + if len(part.ThoughtSignature) > 0 { + sig := string(part.ThoughtSignature) + textBlock.Signature = aws.String(sig) + } + contentBlocks = append(contentBlocks, &types.ContentBlockMemberReasoningContent{ + Value: &types.ReasoningContentBlockMemberReasoningText{ + Value: *textBlock, + }, + }) + } + continue + } + // Handle function call (tool use in Bedrock terminology). // Use the sanitized name from nameMap so Bedrock can correlate the // tool call with the tool spec sent in the same request. @@ -538,9 +695,12 @@ func convertGenaiContentsToBedrockMessages(contents []*genai.Content, nameMap ma // Handle function response (tool result in Bedrock terminology) if part.FunctionResponse != nil { - // Extract response content result := extractFunctionResponseContent(part.FunctionResponse.Response) + if truncateTools { + result = truncateToolResult(result, historyToolResultMaxLen) + } toolResult := types.ToolResultBlock{ + ToolUseId: aws.String(sanitizeBedrockToolID(part.FunctionResponse.ID, idMap, &idCounter)), Content: []types.ToolResultContentBlock{ &types.ToolResultContentBlockMemberText{ diff --git a/go/adk/pkg/telemetry/tracing.go b/go/adk/pkg/telemetry/tracing.go index 695ec0711..81e06d9ae 100644 --- a/go/adk/pkg/telemetry/tracing.go +++ b/go/adk/pkg/telemetry/tracing.go @@ -156,7 +156,7 @@ func newTracerProvider(ctx context.Context, res *resource.Resource) (*sdktrace.T return sdktrace.NewTracerProvider( sdktrace.WithSpanProcessor(kagentAttributesSpanProcessor{}), - sdktrace.WithBatcher(exporter), + sdktrace.WithBatcher(newTruncatingExporter(exporter)), sdktrace.WithResource(res), ), nil } diff --git a/go/adk/pkg/telemetry/truncating_exporter.go b/go/adk/pkg/telemetry/truncating_exporter.go new file mode 100644 index 000000000..5c723db28 --- /dev/null +++ b/go/adk/pkg/telemetry/truncating_exporter.go @@ -0,0 +1,89 @@ +package telemetry + +import ( + "context" + "fmt" + + "go.opentelemetry.io/otel/attribute" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" +) + +const ( + // maxSpanAttributeBytes is the maximum size of any single string span attribute. + // Tempo's default gRPC message limit is 4MB; a single large tool response can + // exceed that. Cap individual attributes to keep spans well within the limit. + maxSpanAttributeBytes = 16 * 1024 // 16 KB +) + +// truncatingExporter wraps a SpanExporter and truncates oversized string attributes +// before forwarding spans. This prevents large tool responses (kubectl output, YAML +// blobs, etc.) from producing spans that exceed Tempo's gRPC message size limit. +type truncatingExporter struct { + inner sdktrace.SpanExporter +} + +func newTruncatingExporter(inner sdktrace.SpanExporter) sdktrace.SpanExporter { + return &truncatingExporter{inner: inner} +} + +func (e *truncatingExporter) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error { + truncated := make([]sdktrace.ReadOnlySpan, len(spans)) + for i, s := range spans { + truncated[i] = truncateSpanAttributes(s) + } + return e.inner.ExportSpans(ctx, truncated) +} + +func (e *truncatingExporter) Shutdown(ctx context.Context) error { + return e.inner.Shutdown(ctx) +} + +// truncateSpanAttributes returns a copy of the span with string attributes +// longer than maxSpanAttributeBytes replaced by a truncated version. +func truncateSpanAttributes(s sdktrace.ReadOnlySpan) sdktrace.ReadOnlySpan { + attrs := s.Attributes() + needsTruncation := false + for _, a := range attrs { + if a.Value.Type() == attribute.STRING && len(a.Value.AsString()) > maxSpanAttributeBytes { + needsTruncation = true + break + } + } + if !needsTruncation { + return s + } + + newAttrs := make([]attribute.KeyValue, len(attrs)) + for i, a := range attrs { + if a.Value.Type() == attribute.STRING { + s := a.Value.AsString() + if len(s) > maxSpanAttributeBytes { + newAttrs[i] = attribute.String(string(a.Key), s[:maxSpanAttributeBytes]+ + fmt.Sprintf(" ...[truncated, %d bytes omitted]", len(s)-maxSpanAttributeBytes)) + continue + } + } + newAttrs[i] = a + } + + stub := tracetest.SpanStub{ + Name: s.Name(), + SpanContext: s.SpanContext(), + Parent: s.Parent(), + SpanKind: s.SpanKind(), + StartTime: s.StartTime(), + EndTime: s.EndTime(), + Attributes: newAttrs, + Events: s.Events(), + Links: s.Links(), + Status: s.Status(), + DroppedAttributes: s.DroppedAttributes(), + DroppedEvents: s.DroppedEvents(), + DroppedLinks: s.DroppedLinks(), + ChildSpanCount: s.ChildSpanCount(), + Resource: s.Resource(), + InstrumentationLibrary: s.InstrumentationScope(), + } + return stub.Snapshot() +}