From bba62177b55e6cfe645ba941770f3e12a24b65ee Mon Sep 17 00:00:00 2001 From: Marco Franssen Date: Fri, 15 May 2026 16:08:22 +0200 Subject: [PATCH 1/4] fix: Bedrock extended thinking configuration --- go/adk/pkg/models/bedrock.go | 114 ++++++++++++++++++++++++++++++++--- 1 file changed, 105 insertions(+), 9 deletions(-) diff --git a/go/adk/pkg/models/bedrock.go b/go/adk/pkg/models/bedrock.go index d9db5a842..7706c8f38 100644 --- a/go/adk/pkg/models/bedrock.go +++ b/go/adk/pkg/models/bedrock.go @@ -243,9 +243,10 @@ func (m *BedrockModel) generateStreaming(ctx context.Context, modelId string, me var finishReason genai.FinishReason var usageMetadata *genai.GenerateContentResponseUsageMetadata - // Track tool calls during streaming - // Map of content block index -> tool call being built + // Track tool calls and reasoning blocks during streaming. + // Maps of content block index -> in-flight block being built. toolCalls := make(map[int32]*streamingToolCall) + reasoningBlocks := make(map[int32]*streamingReasoningBlock) var completedToolCalls []*genai.Part // Get the event stream and read events from the channel @@ -254,19 +255,20 @@ func (m *BedrockModel) generateStreaming(ctx context.Context, modelId string, me // Read events from the channel for event := range stream.Events() { - // Handle content block start (tool use start) + // Handle content block start (tool use or reasoning start) if start, ok := event.(*types.ConverseStreamOutputMemberContentBlockStart); ok { + blockIdx := aws.ToInt32(start.Value.ContentBlockIndex) if toolStart, ok := start.Value.Start.(*types.ContentBlockStartMemberToolUse); ok { - // A new tool use block is starting - initialize tracking - blockIdx := aws.ToInt32(start.Value.ContentBlockIndex) toolCalls[blockIdx] = &streamingToolCall{ ID: aws.ToString(toolStart.Value.ToolUseId), Name: aws.ToString(toolStart.Value.Name), } } + // Reasoning blocks have no start payload; we initialize on first delta. + _ = blockIdx } - // Handle content block delta (streaming text or tool input) + // Handle content block delta (streaming text, tool input, or reasoning) if chunk, ok := event.(*types.ConverseStreamOutputMemberContentBlockDelta); ok { blockIdx := aws.ToInt32(chunk.Value.ContentBlockIndex) @@ -295,10 +297,25 @@ func (m *BedrockModel) generateStreaming(ctx context.Context, modelId string, me if tc, ok := toolCalls[blockIdx]; ok && delta.Value.Input != nil { tc.InputJSON += aws.ToString(delta.Value.Input) } + + case *types.ContentBlockDeltaMemberReasoningContent: + // Reasoning (thinking) delta — accumulate text and signature. + if _, exists := reasoningBlocks[blockIdx]; !exists { + reasoningBlocks[blockIdx] = &streamingReasoningBlock{} + } + rb := reasoningBlocks[blockIdx] + switch rd := delta.Value.(type) { + case *types.ReasoningContentBlockDeltaMemberText: + rb.Text += rd.Value + case *types.ReasoningContentBlockDeltaMemberSignature: + rb.Signature = rd.Value + case *types.ReasoningContentBlockDeltaMemberRedactedContent: + rb.Redacted = rd.Value + } } } - // Handle content block stop (tool use complete) + // Handle content block stop (tool use or reasoning complete) if stop, ok := event.(*types.ConverseStreamOutputMemberContentBlockStop); ok { blockIdx := aws.ToInt32(stop.Value.ContentBlockIndex) if tc, ok := toolCalls[blockIdx]; ok { @@ -316,8 +333,10 @@ func (m *BedrockModel) generateStreaming(ctx context.Context, modelId string, me Args: args, } completedToolCalls = append(completedToolCalls, &genai.Part{FunctionCall: functionCall}) - delete(toolCalls, blockIdx) // Clean up + delete(toolCalls, blockIdx) } + // Reasoning blocks are finalized at message-stop (collected in reasoningBlocks map). + _ = stop } // Handle message stop (includes stop reason) @@ -337,8 +356,15 @@ func (m *BedrockModel) generateStreaming(ctx context.Context, modelId string, me } } - // Build final response + // Build final response — reasoning parts first so they precede toolUse blocks, + // matching the order Bedrock requires when echoing them back in tool-result turns. finalParts := []*genai.Part{} + for _, rb := range reasoningBlocks { + part := rb.toPart() + if part != nil { + finalParts = append(finalParts, part) + } + } text := aggregatedText.String() if text != "" { finalParts = append(finalParts, &genai.Part{Text: text}) @@ -379,6 +405,27 @@ func (tc *streamingToolCall) parseArgs() map[string]any { return args } +// streamingReasoningBlock tracks a reasoning (thinking) block being built during streaming. +type streamingReasoningBlock struct { + Text string + Signature string + Redacted []byte +} + +func (rb *streamingReasoningBlock) toPart() *genai.Part { + if len(rb.Redacted) > 0 { + return &genai.Part{Thought: true, ThoughtSignature: rb.Redacted} + } + if rb.Text == "" && rb.Signature == "" { + return nil + } + part := &genai.Part{Thought: true, Text: rb.Text} + if rb.Signature != "" { + part.ThoughtSignature = []byte(rb.Signature) + } + return part +} + // generateNonStreaming handles non-streaming responses from Bedrock Converse. // reverseNameMap maps sanitized Bedrock tool names back to their original names. func (m *BedrockModel) generateNonStreaming(ctx context.Context, modelId string, messages []types.Message, systemPrompt []types.SystemContentBlock, inferenceConfig *types.InferenceConfiguration, toolConfig *types.ToolConfiguration, additionalFields document.Interface, reverseNameMap map[string]string, yield func(*model.LLMResponse, error) bool) { @@ -403,6 +450,26 @@ func (m *BedrockModel) generateNonStreaming(ctx context.Context, modelId string, parts := []*genai.Part{} if message, ok := output.Output.(*types.ConverseOutputMemberMessage); ok { for _, block := range message.Value.Content { + // Handle reasoning (thinking) content — must be preserved and echoed back + // in subsequent tool-result turns or Bedrock returns ValidationException. + if reasoningBlock, ok := block.(*types.ContentBlockMemberReasoningContent); ok { + if textBlock, ok := reasoningBlock.Value.(*types.ReasoningContentBlockMemberReasoningText); ok { + part := &genai.Part{ + Thought: true, + Text: aws.ToString(textBlock.Value.Text), + } + if textBlock.Value.Signature != nil { + part.ThoughtSignature = []byte(aws.ToString(textBlock.Value.Signature)) + } + parts = append(parts, part) + } else if redacted, ok := reasoningBlock.Value.(*types.ReasoningContentBlockMemberRedactedContent); ok { + parts = append(parts, &genai.Part{ + Thought: true, + ThoughtSignature: redacted.Value, + }) + } + continue + } // Handle text content if textBlock, ok := block.(*types.ContentBlockMemberText); ok { parts = append(parts, &genai.Part{Text: textBlock.Value}) @@ -517,6 +584,35 @@ func convertGenaiContentsToBedrockMessages(contents []*genai.Content, nameMap ma continue } + // Handle reasoning (thinking) parts — echo them back unmodified so Bedrock + // can maintain reasoning continuity across tool-result turns. + // AWS docs: "you must pass thinking blocks back to the API for the last + // assistant message … include the complete unmodified block." + if part.Thought { + if len(part.ThoughtSignature) > 0 && part.Text == "" { + // Redacted block + contentBlocks = append(contentBlocks, &types.ContentBlockMemberReasoningContent{ + Value: &types.ReasoningContentBlockMemberRedactedContent{ + Value: part.ThoughtSignature, + }, + }) + } else { + textBlock := &types.ReasoningTextBlock{ + Text: aws.String(part.Text), + } + if len(part.ThoughtSignature) > 0 { + sig := string(part.ThoughtSignature) + textBlock.Signature = aws.String(sig) + } + contentBlocks = append(contentBlocks, &types.ContentBlockMemberReasoningContent{ + Value: &types.ReasoningContentBlockMemberReasoningText{ + Value: *textBlock, + }, + }) + } + continue + } + // Handle function call (tool use in Bedrock terminology). // Use the sanitized name from nameMap so Bedrock can correlate the // tool call with the tool spec sent in the same request. From f53dd42be8b08f24e25aa74e97416e6a526b0630 Mon Sep 17 00:00:00 2001 From: Marco Franssen Date: Fri, 15 May 2026 16:47:12 +0200 Subject: [PATCH 2/4] fix: strip thinking blocks from earlier turns to prevent token explosion Bedrock only requires thinking blocks for the last assistant message before tool results. Preserving them in all prior turns causes token counts to compound across multi-turn conversations (1.4M+ tokens seen in practice). Find the last assistant turn containing thinking parts and only emit ReasoningContent blocks there; earlier turns have them stripped. --- go/adk/pkg/models/bedrock.go | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/go/adk/pkg/models/bedrock.go b/go/adk/pkg/models/bedrock.go index 7706c8f38..5793e9c3f 100644 --- a/go/adk/pkg/models/bedrock.go +++ b/go/adk/pkg/models/bedrock.go @@ -553,7 +553,25 @@ func convertGenaiContentsToBedrockMessages(contents []*genai.Content, nameMap ma idMap := make(map[string]string) idCounter := 0 - for _, content := range contents { + // AWS requires thinking blocks only for the last assistant turn before tool results. + // Preserving them in earlier turns causes token counts to explode across multi-turn + // conversations. Find the last assistant content index that contains thinking parts. + lastThinkingAssistantIdx := -1 + for i, content := range contents { + if content == nil { + continue + } + if content.Role == "model" || content.Role == "assistant" { + for _, part := range content.Parts { + if part != nil && part.Thought { + lastThinkingAssistantIdx = i + break + } + } + } + } + + for i, content := range contents { if content == nil || len(content.Parts) == 0 { continue } @@ -564,6 +582,9 @@ func convertGenaiContentsToBedrockMessages(contents []*genai.Content, nameMap ma role = types.ConversationRoleAssistant } + // Only echo thinking blocks for the last assistant turn that contains them. + emitThinking := i == lastThinkingAssistantIdx + var contentBlocks []types.ContentBlock for _, part := range content.Parts { @@ -588,7 +609,11 @@ func convertGenaiContentsToBedrockMessages(contents []*genai.Content, nameMap ma // can maintain reasoning continuity across tool-result turns. // AWS docs: "you must pass thinking blocks back to the API for the last // assistant message … include the complete unmodified block." + // Earlier turns have thinking stripped to prevent token explosion. if part.Thought { + if !emitThinking { + continue + } if len(part.ThoughtSignature) > 0 && part.Text == "" { // Redacted block contentBlocks = append(contentBlocks, &types.ContentBlockMemberReasoningContent{ From 2a9dfe0b9735bc331a1756d4512ed9aded6a0d70 Mon Sep 17 00:00:00 2001 From: Marco Franssen Date: Fri, 15 May 2026 17:02:22 +0200 Subject: [PATCH 3/4] fix: truncate historical tool results to prevent token explosion Kubernetes tool responses (kubectl output, YAML, logs) can be many KBs each. With no history limit, long sessions accumulate millions of tokens across replayed tool results. Truncate tool responses in all but the most recent user turn to 2000 chars (~500 tokens), keeping full fidelity only where the model actually needs it for the current reasoning step. --- go/adk/pkg/models/bedrock.go | 41 +++++++++++++++++++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/go/adk/pkg/models/bedrock.go b/go/adk/pkg/models/bedrock.go index 5793e9c3f..fd0463711 100644 --- a/go/adk/pkg/models/bedrock.go +++ b/go/adk/pkg/models/bedrock.go @@ -540,6 +540,22 @@ func documentToMap(doc document.Interface) map[string]any { return result } +const ( + // historyToolResultMaxLen is the maximum character length of a tool result + // in any turn that is NOT the most recent assistant+tool-result pair. + // Older tool results are truncated to keep total prompt tokens manageable. + // ~4 chars per token → 2000 chars ≈ 500 tokens per historical tool call. + historyToolResultMaxLen = 2000 +) + +// truncateToolResult truncates s to maxLen characters, appending a note when truncated. +func truncateToolResult(s string, maxLen int) string { + if len(s) <= maxLen { + return s + } + return s[:maxLen] + fmt.Sprintf("\n... [truncated, %d chars omitted from history]", len(s)-maxLen) +} + // convertGenaiContentsToBedrockMessages converts genai.Content to Bedrock Converse API message format. // nameMap is the original->sanitized tool name map produced by convertGenaiToolsToBedrock. // Any FunctionCall found in the conversation history is written with the sanitized name so @@ -571,6 +587,24 @@ func convertGenaiContentsToBedrockMessages(contents []*genai.Content, nameMap ma } } + // Tool responses (kubectl output, YAML blobs, etc.) can be enormous. + // Only the most recent user turn carrying tool results needs full fidelity; + // older ones are truncated to prevent token count explosion across sessions. + lastToolResultIdx := -1 + for i, content := range contents { + if content == nil { + continue + } + if content.Role == "user" { + for _, part := range content.Parts { + if part != nil && part.FunctionResponse != nil { + lastToolResultIdx = i + break + } + } + } + } + for i, content := range contents { if content == nil || len(content.Parts) == 0 { continue @@ -584,6 +618,8 @@ func convertGenaiContentsToBedrockMessages(contents []*genai.Content, nameMap ma // Only echo thinking blocks for the last assistant turn that contains them. emitThinking := i == lastThinkingAssistantIdx + // Truncate tool results in all but the most recent user turn that contains them. + truncateTools := i != lastToolResultIdx var contentBlocks []types.ContentBlock @@ -659,9 +695,12 @@ func convertGenaiContentsToBedrockMessages(contents []*genai.Content, nameMap ma // Handle function response (tool result in Bedrock terminology) if part.FunctionResponse != nil { - // Extract response content result := extractFunctionResponseContent(part.FunctionResponse.Response) + if truncateTools { + result = truncateToolResult(result, historyToolResultMaxLen) + } toolResult := types.ToolResultBlock{ + ToolUseId: aws.String(sanitizeBedrockToolID(part.FunctionResponse.ID, idMap, &idCounter)), Content: []types.ToolResultContentBlock{ &types.ToolResultContentBlockMemberText{ From ee8fc7efd3eb324d5931add6eb3d9f4db837c623 Mon Sep 17 00:00:00 2001 From: Marco Franssen Date: Fri, 15 May 2026 17:17:55 +0200 Subject: [PATCH 4/4] fix: truncate oversized span attributes before OTLP export Tool responses (kubectl output, YAML blobs) are serialized verbatim into span attributes by the upstream ADK. A single large response can exceed Tempo's 4MB gRPC message limit. Wrap the exporter with a truncating layer that caps any string attribute at 16KB before forwarding to the collector. --- go/adk/pkg/telemetry/tracing.go | 2 +- go/adk/pkg/telemetry/truncating_exporter.go | 89 +++++++++++++++++++++ 2 files changed, 90 insertions(+), 1 deletion(-) create mode 100644 go/adk/pkg/telemetry/truncating_exporter.go diff --git a/go/adk/pkg/telemetry/tracing.go b/go/adk/pkg/telemetry/tracing.go index 695ec0711..81e06d9ae 100644 --- a/go/adk/pkg/telemetry/tracing.go +++ b/go/adk/pkg/telemetry/tracing.go @@ -156,7 +156,7 @@ func newTracerProvider(ctx context.Context, res *resource.Resource) (*sdktrace.T return sdktrace.NewTracerProvider( sdktrace.WithSpanProcessor(kagentAttributesSpanProcessor{}), - sdktrace.WithBatcher(exporter), + sdktrace.WithBatcher(newTruncatingExporter(exporter)), sdktrace.WithResource(res), ), nil } diff --git a/go/adk/pkg/telemetry/truncating_exporter.go b/go/adk/pkg/telemetry/truncating_exporter.go new file mode 100644 index 000000000..5c723db28 --- /dev/null +++ b/go/adk/pkg/telemetry/truncating_exporter.go @@ -0,0 +1,89 @@ +package telemetry + +import ( + "context" + "fmt" + + "go.opentelemetry.io/otel/attribute" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" +) + +const ( + // maxSpanAttributeBytes is the maximum size of any single string span attribute. + // Tempo's default gRPC message limit is 4MB; a single large tool response can + // exceed that. Cap individual attributes to keep spans well within the limit. + maxSpanAttributeBytes = 16 * 1024 // 16 KB +) + +// truncatingExporter wraps a SpanExporter and truncates oversized string attributes +// before forwarding spans. This prevents large tool responses (kubectl output, YAML +// blobs, etc.) from producing spans that exceed Tempo's gRPC message size limit. +type truncatingExporter struct { + inner sdktrace.SpanExporter +} + +func newTruncatingExporter(inner sdktrace.SpanExporter) sdktrace.SpanExporter { + return &truncatingExporter{inner: inner} +} + +func (e *truncatingExporter) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error { + truncated := make([]sdktrace.ReadOnlySpan, len(spans)) + for i, s := range spans { + truncated[i] = truncateSpanAttributes(s) + } + return e.inner.ExportSpans(ctx, truncated) +} + +func (e *truncatingExporter) Shutdown(ctx context.Context) error { + return e.inner.Shutdown(ctx) +} + +// truncateSpanAttributes returns a copy of the span with string attributes +// longer than maxSpanAttributeBytes replaced by a truncated version. +func truncateSpanAttributes(s sdktrace.ReadOnlySpan) sdktrace.ReadOnlySpan { + attrs := s.Attributes() + needsTruncation := false + for _, a := range attrs { + if a.Value.Type() == attribute.STRING && len(a.Value.AsString()) > maxSpanAttributeBytes { + needsTruncation = true + break + } + } + if !needsTruncation { + return s + } + + newAttrs := make([]attribute.KeyValue, len(attrs)) + for i, a := range attrs { + if a.Value.Type() == attribute.STRING { + s := a.Value.AsString() + if len(s) > maxSpanAttributeBytes { + newAttrs[i] = attribute.String(string(a.Key), s[:maxSpanAttributeBytes]+ + fmt.Sprintf(" ...[truncated, %d bytes omitted]", len(s)-maxSpanAttributeBytes)) + continue + } + } + newAttrs[i] = a + } + + stub := tracetest.SpanStub{ + Name: s.Name(), + SpanContext: s.SpanContext(), + Parent: s.Parent(), + SpanKind: s.SpanKind(), + StartTime: s.StartTime(), + EndTime: s.EndTime(), + Attributes: newAttrs, + Events: s.Events(), + Links: s.Links(), + Status: s.Status(), + DroppedAttributes: s.DroppedAttributes(), + DroppedEvents: s.DroppedEvents(), + DroppedLinks: s.DroppedLinks(), + ChildSpanCount: s.ChildSpanCount(), + Resource: s.Resource(), + InstrumentationLibrary: s.InstrumentationScope(), + } + return stub.Snapshot() +}