From df37dd053b8386b95f1fba7a2042f4a83fce0539 Mon Sep 17 00:00:00 2001 From: Marc Seiler Date: Thu, 2 Jul 2026 19:00:17 -0400 Subject: [PATCH] fix: flush telemetry for short-lived runs --- src/index.ts | 21 +++++++++++++++++++-- src/otel.ts | 8 ++++++++ tests/otel.test.ts | 17 ++++++++++++++++- 3 files changed, 43 insertions(+), 3 deletions(-) diff --git a/src/index.ts b/src/index.ts index 1ca32cf..e192e2b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -19,7 +19,7 @@ import type { import { LEVELS, type Level, type HandlerContext } from "./types.ts" import { loadConfig, parseAttributePairs, resolveHelperPath, resolveLogLevel } from "./config.ts" import { probeEndpoint } from "./probe.ts" -import { setupOtel, createInstruments } from "./otel.ts" +import { setupOtel, createInstruments, forceFlushOtel } from "./otel.ts" import { remoteParentContext } from "./trace-context.ts" import { handleSessionCreated, handleSessionIdle, handleSessionError, handleSessionStatus, handleRunStarted } from "./handlers/session.ts" import { handleMessageUpdated, handleMessagePartUpdated, startMessageSpan } from "./handlers/message.ts" @@ -77,7 +77,7 @@ export const OtelPlugin: Plugin = async ({ project, client, directory, worktree }) } - const { meterProvider, loggerProvider, tracerProvider } = await setupOtel( + const providers = await setupOtel( config.endpoint, config.protocol, config.metricsInterval, @@ -86,6 +86,7 @@ export const OtelPlugin: Plugin = async ({ project, client, directory, worktree config.otlpHeaders, otlpHeadersHelper, ) + const { meterProvider, loggerProvider, tracerProvider } = providers await log("info", "OTel SDK initialized") const instruments = createInstruments(config.metricPrefix) @@ -158,7 +159,18 @@ export const OtelPlugin: Plugin = async ({ project, client, directory, worktree messageOutputs, } + let shuttingDown = false + + async function flushTelemetry(reason: string) { + if (shuttingDown) return + await forceFlushOtel(providers) + await log("debug", "otel: telemetry flushed", { reason }) + } + async function shutdown() { + if (shuttingDown) return + shuttingDown = true + await forceFlushOtel(providers) await Promise.allSettled([meterProvider.shutdown(), loggerProvider.shutdown(), tracerProvider.shutdown()]) } @@ -272,9 +284,11 @@ export const OtelPlugin: Plugin = async ({ project, client, directory, worktree break case "session.idle": handleSessionIdle(event as EventSessionIdle, ctx) + await flushTelemetry("session.idle") break case "session.error": handleSessionError(event as EventSessionError, ctx) + await flushTelemetry("session.error") break case "session.status": handleSessionStatus(event as EventSessionStatus, ctx) @@ -321,6 +335,9 @@ export const OtelPlugin: Plugin = async ({ project, client, directory, worktree ) } await handleMessageUpdated(msgEvt, ctx) + if (info.role === "assistant" && info.time?.completed) { + await flushTelemetry("message.completed") + } break } case "message.part.updated": diff --git a/src/otel.ts b/src/otel.ts index f5a9775..27a2bcf 100644 --- a/src/otel.ts +++ b/src/otel.ts @@ -50,6 +50,14 @@ export type OtelProviders = { tracerProvider: BasicTracerProvider } +export async function forceFlushOtel(providers: OtelProviders) { + await Promise.allSettled([ + providers.meterProvider.forceFlush(), + providers.loggerProvider.forceFlush(), + providers.tracerProvider.forceFlush(), + ]) +} + export function buildHttpSignalUrl(endpoint: string, signal: "traces" | "metrics" | "logs") { const url = new URL(endpoint) const normalizedPath = url.pathname.endsWith("/") ? url.pathname.slice(0, -1) : url.pathname diff --git a/tests/otel.test.ts b/tests/otel.test.ts index 1f0d177..e49fdd2 100644 --- a/tests/otel.test.ts +++ b/tests/otel.test.ts @@ -8,7 +8,7 @@ import { OTLPMetricExporter as OTLPProtoMetricExporter } from "@opentelemetry/ex import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-grpc" import { OTLPTraceExporter as OTLPHttpTraceExporter } from "@opentelemetry/exporter-trace-otlp-http" import { OTLPTraceExporter as OTLPProtoTraceExporter } from "@opentelemetry/exporter-trace-otlp-proto" -import { buildResource, setupOtel, type OtelProviders } from "../src/otel.ts" +import { buildResource, forceFlushOtel, setupOtel, type OtelProviders } from "../src/otel.ts" let providers: OtelProviders | undefined @@ -123,3 +123,18 @@ describe("setupOtel", () => { expect(exporters.trace).toBeInstanceOf(OTLPHttpTraceExporter) }) }) + +describe("forceFlushOtel", () => { + test("flushes metrics, logs, and traces", async () => { + const calls: string[] = [] + const fakeProviders = { + meterProvider: { forceFlush: async () => { calls.push("metrics") } }, + loggerProvider: { forceFlush: async () => { calls.push("logs") } }, + tracerProvider: { forceFlush: async () => { calls.push("traces") } }, + } as unknown as OtelProviders + + await forceFlushOtel(fakeProviders) + + expect(calls.sort()).toEqual(["logs", "metrics", "traces"]) + }) +})