Fix issues in langfuse observability#468
Open
nuwangeek wants to merge 8 commits into
Open
Conversation
get update from llm-461
Get update from llm-464
Sync wip branches
Get update from wip
There was a problem hiding this comment.
Pull request overview
This PR improves Langfuse observability and inference/budget tracking by standardizing on vault_uuid (instead of internal DB IDs), adding reusable observation helpers for streaming/non-streaming flows, and expanding usage/cost capture across tool-classifier, guardrails, and response generation.
Changes:
- Refactor LLM connection identification to use
vault_uuidacross inference result storage and budget updates (incl. Resql + Ruuter DSL updates). - Add
src/utils/observation_utils.pyand integrate Langfuse observation updates across streaming + non-streaming LLM call sites. - Improve streaming pipeline handling (budget updates based on DSPy history deltas; unified streaming inference storage hooks across workflows).
Reviewed changes
Copilot reviewed 28 out of 28 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| src/utils/production_store.py | Switch inference storage payload from llm_connection_id to vault_uuid; remove connection ID fetcher usage. |
| src/utils/observation_utils.py | New canonical helpers for safe Langfuse observation context + updates (streaming/non-streaming patterns). |
| src/utils/cost_utils.py | Add streaming-safe cost extraction fallbacks (including token-based estimation fallback). |
| src/utils/budget_tracker.py | Update budget tracking to post vault_uuid to Resql update endpoint. |
| src/tool_classifier/workflows/service_workflow.py | Add Langfuse @observe spans and attach observation metadata/usage to workflow steps. |
| src/tool_classifier/workflows/context_workflow.py | Add Langfuse observation updates and store streaming inference outputs for context workflow. |
| src/tool_classifier/workflows/api_tool_workflow.py | Ensure final streamed answer (incl. guardrail-blocked output) is what gets stored as inference. |
| src/tool_classifier/param_extractor.py | Add Langfuse generation tracing and usage capture for param extraction (sync + streaming). |
| src/tool_classifier/multi_response_formatter.py | Add Langfuse generation tracing + safe observation context for streaming formatter. |
| src/tool_classifier/intent_detector.py | Add Langfuse generation tracing + usage capture for service intent detection LLM calls. |
| src/tool_classifier/intent_decomposer.py | Add safe observation context + generation updates for async decomposition step. |
| src/tool_classifier/context_analyzer.py | Add Langfuse tracing across context detection/summary/streaming generation paths. |
| src/tool_classifier/classifier.py | Pass shared context through streaming fallback chain to preserve metadata. |
| src/tool_classifier/api_semantic_searcher.py | Wrap disambiguation with safe observation context and usage capture; add result wrapper for DSPy callback compatibility. |
| src/tool_classifier/api_response_formatter.py | Add Langfuse tracing + safe observation context for streaming response formatter. |
| src/response_generator/response_generate.py | Add Langfuse tracing for streaming/non-streaming response generation and quick scope checks. |
| src/llm_orchestrator_config/llm_manager.py | Expose connection_id property (vault UUID) and enable DSPy usage tracking (track_usage=True). |
| src/llm_orchestration_service.py | Cache retrievers, improve streaming budget updates via DSPy history deltas, and unify streaming inference storage calls. |
| src/llm_orchestration_service_api.py | Wrap streaming endpoint with safe Langfuse observation context; improve error logging context. |
| src/guardrails/dspy_nemo_adapter.py | Add Langfuse generation tracing + usage capture for NeMo guardrails LLM calls (sync + async). |
| pyproject.toml | Add Ruff ANN401 exemption for the new observation utils helper module. |
| DSL/Ruuter.public/rag-search/POST/inference/results/store.yml | Replace llm_connection_id with vault_uuid in public inference storage contract. |
| DSL/Ruuter.private/rag-search/POST/inference/results/test/store.yml | Add private testing inference storage endpoint using vault_uuid. |
| DSL/Ruuter.private/rag-search/POST/inference/results/production/store.yml | Add private production inference storage endpoint (comprehensive payload). |
| DSL/Resql/rag-search/POST/update-llm-connection-used-budget.sql | Update budget update query to target connections by vault_uuid. |
| DSL/Resql/rag-search/POST/store-testing-inference-result.sql | Store testing inference results by resolving DB connection id via vault_uuid. |
| DSL/Resql/rag-search/POST/store-inference-result.sql | Store inference results by resolving DB connection id via vault_uuid. |
| DSL/Resql/rag-search/POST/deactivate-llm-connection-budget-exceed.sql | Update deactivation query to target connections by vault_uuid. |
…odelabs/RAG-Module into llm/langfuse-observability get update from remote branch
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This pull request introduces several important changes to how LLM connection identifiers are handled across the codebase, shifting from using internal database IDs to using
vault_uuidfor better consistency and external reference. It also adds new Ruuter endpoints for storing inference results in both production and testing environments, and enhances observability and usage tracking for LLM calls, especially in the NeMo Guardrails integration. Additionally, there are improvements to streaming error handling and code quality configurations.Key changes:
LLM Connection Identifier Refactor
vault_uuidinstead of the internalidfor LLM connection identification. This affects updating connection status, budget usage, and storing inference results. (DSL/Resql/rag-search/POST/deactivate-llm-connection-budget-exceed.sql,DSL/Resql/rag-search/POST/update-llm-connection-used-budget.sql,DSL/Resql/rag-search/POST/store-inference-result.sql,DSL/Resql/rag-search/POST/store-testing-inference-result.sql,DSL/Ruuter.public/rag-search/POST/inference/results/store.yml) [1] [2] [3] [4] [5] [6] [7]New Ruuter Endpoints for Inference Result Storage
DSL/Ruuter.private/rag-search/POST/inference/results/production/store.yml)vault_uuididentifier. (DSL/Ruuter.private/rag-search/POST/inference/results/test/store.yml)Observability and Usage Tracking Enhancements
observedecorator and improved usage tracking in the NeMo Guardrails LLM adapter, capturing prompt/response previews, usage stats, and metadata for both sync and async calls. (src/guardrails/dspy_nemo_adapter.py,src/utils/observation_utils.py) [1] [2] [3] [4] [5] [6] [7] [8]src/llm_orchestrator_config/llm_manager.py) [1] [2]Streaming and Error Handling Improvements
src/llm_orchestration_service_api.py) [1] [2] [3] [4]Code Quality and Minor Improvements
connection_idas a property in the LLM manager for easier access. (pyproject.toml,src/llm_orchestrator_config/llm_manager.py) [1] [2]These changes collectively improve the maintainability, observability, and consistency of LLM connection management and result tracking throughout the system.