Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import type {
import { LEVELS, type Level, type HandlerContext } from "./types.ts"
import { loadConfig, parseAttributePairs, resolveHelperPath, resolveLogLevel } from "./config.ts"
import { probeEndpoint } from "./probe.ts"
import { setupOtel, createInstruments } from "./otel.ts"
import { setupOtel, createInstruments, forceFlushOtel } from "./otel.ts"
import { remoteParentContext } from "./trace-context.ts"
import { handleSessionCreated, handleSessionIdle, handleSessionError, handleSessionStatus, handleRunStarted } from "./handlers/session.ts"
import { handleMessageUpdated, handleMessagePartUpdated, startMessageSpan } from "./handlers/message.ts"
Expand Down Expand Up @@ -77,7 +77,7 @@ export const OtelPlugin: Plugin = async ({ project, client, directory, worktree
})
}

const { meterProvider, loggerProvider, tracerProvider } = await setupOtel(
const providers = await setupOtel(
config.endpoint,
config.protocol,
config.metricsInterval,
Expand All @@ -86,6 +86,7 @@ export const OtelPlugin: Plugin = async ({ project, client, directory, worktree
config.otlpHeaders,
otlpHeadersHelper,
)
const { meterProvider, loggerProvider, tracerProvider } = providers
await log("info", "OTel SDK initialized")

const instruments = createInstruments(config.metricPrefix)
Expand Down Expand Up @@ -158,7 +159,18 @@ export const OtelPlugin: Plugin = async ({ project, client, directory, worktree
messageOutputs,
}

let shuttingDown = false

async function flushTelemetry(reason: string) {
if (shuttingDown) return
await forceFlushOtel(providers)
await log("debug", "otel: telemetry flushed", { reason })
}

async function shutdown() {
if (shuttingDown) return
shuttingDown = true
await forceFlushOtel(providers)
await Promise.allSettled([meterProvider.shutdown(), loggerProvider.shutdown(), tracerProvider.shutdown()])
}

Expand Down Expand Up @@ -272,9 +284,11 @@ export const OtelPlugin: Plugin = async ({ project, client, directory, worktree
break
case "session.idle":
handleSessionIdle(event as EventSessionIdle, ctx)
await flushTelemetry("session.idle")
break
case "session.error":
handleSessionError(event as EventSessionError, ctx)
await flushTelemetry("session.error")
break
case "session.status":
handleSessionStatus(event as EventSessionStatus, ctx)
Expand Down Expand Up @@ -321,6 +335,9 @@ export const OtelPlugin: Plugin = async ({ project, client, directory, worktree
)
}
await handleMessageUpdated(msgEvt, ctx)
if (info.role === "assistant" && info.time?.completed) {
await flushTelemetry("message.completed")
}
break
}
case "message.part.updated":
Expand Down
8 changes: 8 additions & 0 deletions src/otel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ export type OtelProviders = {
tracerProvider: BasicTracerProvider
}

export async function forceFlushOtel(providers: OtelProviders) {
await Promise.allSettled([
providers.meterProvider.forceFlush(),
providers.loggerProvider.forceFlush(),
providers.tracerProvider.forceFlush(),
])
}

export function buildHttpSignalUrl(endpoint: string, signal: "traces" | "metrics" | "logs") {
const url = new URL(endpoint)
const normalizedPath = url.pathname.endsWith("/") ? url.pathname.slice(0, -1) : url.pathname
Expand Down
17 changes: 16 additions & 1 deletion tests/otel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { OTLPMetricExporter as OTLPProtoMetricExporter } from "@opentelemetry/ex
import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-grpc"
import { OTLPTraceExporter as OTLPHttpTraceExporter } from "@opentelemetry/exporter-trace-otlp-http"
import { OTLPTraceExporter as OTLPProtoTraceExporter } from "@opentelemetry/exporter-trace-otlp-proto"
import { buildResource, setupOtel, type OtelProviders } from "../src/otel.ts"
import { buildResource, forceFlushOtel, setupOtel, type OtelProviders } from "../src/otel.ts"

let providers: OtelProviders | undefined

Expand Down Expand Up @@ -123,3 +123,18 @@ describe("setupOtel", () => {
expect(exporters.trace).toBeInstanceOf(OTLPHttpTraceExporter)
})
})

describe("forceFlushOtel", () => {
test("flushes metrics, logs, and traces", async () => {
const calls: string[] = []
const fakeProviders = {
meterProvider: { forceFlush: async () => { calls.push("metrics") } },
loggerProvider: { forceFlush: async () => { calls.push("logs") } },
tracerProvider: { forceFlush: async () => { calls.push("traces") } },
} as unknown as OtelProviders

await forceFlushOtel(fakeProviders)

expect(calls.sort()).toEqual(["logs", "metrics", "traces"])
})
})
Loading