observer-js is a server-side Node.js library for monitoring WebRTC sessions. A WebRTC
application (typically an SFU or a signaling/stats backend) feeds it ClientSample objects —
periodic snapshots of each participant's RTCPeerConnection.getStats() output plus
application events — and observer-js maintains a live, in-memory model of every call,
participant, peer connection, and media stream, derives per-interval and cumulative metrics,
and emits a single, unified stream of typed events the application can react to.
Status:
1.0.0-beta. The API described here is current and intended to be implemented against directly. This document is written to be self-sufficient: an engineer (or an AI agent) should be able to integrate the library, or develop it further, from this file alone. A companion doc,docs/logging.md, covers logging integration in depth.
Packaging: server-side, Node.js ≥ 22, shipped as a dual ESM + CommonJS build — so it works whether your project uses
import(ESM) orrequire()(CommonJS). Everything — including the built-in file sink — is exported from the single@observertc/observer-jsentry.
- Installation
- Mental model
- Quick start
- Data flow
- Entity hierarchy
- Ingestion:
accept(), context & lifecycle - Update policies
- The event bus ← the core of the API
- API reference
- Schema types (
ClientSample) - Detectors (server-side extension point)
- Remote track resolution (mediasoup / SFU)
- Sinks (per-client sample persistence)
- Logging
- Error-handling philosophy
- Public exports
- Development & extension guide
- Not yet implemented / roadmap
npm install @observertc/observer-js
# or
yarn add @observertc/observer-jsServer-side, Node.js ≥ 22, dual ESM + CommonJS. The package ships both module formats, so it works the same whether your project is ESM or CommonJS — your import line is unchanged either way:
import { Observer, ClientSample, createJsonlFileSinkFactory } from '@observertc/observer-js';In an ESM project this resolves to the .mjs build; in a CommonJS project (where TypeScript
compiles your import down to require()) it resolves to the .js build. Everything is exported
from the single @observertc/observer-js entry. Written in TypeScript; ships type declarations for
both formats (dist/index.d.ts for require, dist/index.d.mts for import). Runtime
dependencies: @bufbuild/protobuf, events, uuid. The library does not bundle a logger or
any transport — see Logging.
ClientSample and friends are re-exported from this package, and are also published as the
shared schema in @observertc/schemas; samples
produced on the client (e.g. by @observertc/client-monitor-js) conform to the same shape.
Five ideas are enough to use the whole library:
-
One ingestion method.
observer.accept(sample, context?)is how data gets in. Calls, clients, and peer connections are created automatically the first time their id appears in a sample. -
A live entity tree.
Observer → ObservedCall → ObservedClient → ObservedPeerConnection → {inbound/outbound RTP, tracks, data channels, ICE, codecs, …}. Every node holds current and cumulative metrics and is reachable by id throughMaps on its parent. -
One event bus. Everything worth subscribing to is emitted on the
Observeritself (it is anEventEmitter). Each event payload is an object carrying the full ancestry of the entity it came from. You never have to walk the tree to subscribe. -
Pull or react. You can read fields off the entities at any time (pull), and/or react to events (push). The
*-updatedevents fire on each processing tick. -
Warn, don't throw. Operational problems (bad config, duplicate ids, closed entities, malformed samples) never throw; they warn through the pluggable logger and degrade gracefully (returning
undefinedor emittingsample-rejected).
import { Observer, ClientSample } from '@observertc/observer-js';
// 1. Create an observer.
const observer = new Observer({
// when the observer aggregates call/client metrics:
updatePolicy: 'update-when-all-call-updated',
// default policy applied to calls created automatically by accept():
defaultCallUpdatePolicy: 'update-on-any-client-updated',
// optional auto-teardown:
closeCallIfEmptyForMs: 20_000,
closeClientIfIdleForMs: 60_000,
});
// 2. Subscribe on the single bus. Every payload is an object with the ancestry.
observer.on('call-added', ({ observedCall }) => {
console.log('new call', observedCall.callId);
});
observer.on('client-issue', ({ observedClient, issue }) => {
console.warn(`[${observedClient.clientId}] ${issue.type}`, issue.payload);
});
observer.on('peer-connection-updated', ({ observedClient, observedPeerConnection }) => {
console.log(observedClient.clientId, 'RTT(ms):', observedPeerConnection.currentRttInMs);
});
observer.on('sample-rejected', ({ reason, sample }) => {
console.warn('dropped a sample:', reason);
});
// 3. Feed samples. `context` (optional) is transient per-accept data, carried to the
// `*-updated` events this accept triggers (never written to appData).
function onClientStats(sample: ClientSample) {
observer.accept(sample, { studioVersion: '1.2.3' });
}
// 4. Tear down.
process.on('SIGINT', () => observer.close());client getStats() ──► ClientSample ──► observer.accept(sample, ctx?)
│
┌────────────────────────────────┘
▼
get-or-create ObservedCall ──► get-or-create ObservedClient ──► client.accept(sample, ctx)
│
per peerConnections[] in the sample
▼
get-or-create ObservedPeerConnection
.accept(pcSample, ctx) updates all sub-stats,
derives deltas/bitrates/RTT, correlates remote RTP
│
metrics roll up: PeerConnection → Client → Call → Observer
│
events emitted on the Observer bus ──► your handlers
- A sample must have
callIdandclientId(the library sets them, or the app does). If either is missing, the sample is dropped andsample-rejectedis emitted. - Sub-entities that stop appearing in samples are garbage-collected via a "visited"
mark-and-sweep on each
ObservedPeerConnection.accept(), emitting the corresponding*-removedevents.
| Class | Created by | Keyed on its parent as | Holds |
|---|---|---|---|
Observer |
new Observer(config?) |
— (root) | observedCalls: Map<string, ObservedCall>, global counters, the event bus |
ObservedCall |
observer.createObservedCall(settings) / lazily by accept |
observedCalls |
observedClients: Map<string, ObservedClient>, call-wide metrics, detectors, scoreCalculator |
ObservedClient |
call.createObservedClient(settings) / lazily |
observedClients |
observedPeerConnections: Map<string, ObservedPeerConnection>, per-client metrics |
ObservedPeerConnection |
lazily, from sample.peerConnections[] |
observedPeerConnections |
the 15 sub-stat maps below, transport/RTT/bitrate metrics |
| Sub-stats | lazily, from the PeerConnectionSample |
maps on the PC | individual WebRTC stat objects |
ObservedPeerConnection sub-stat maps (all public readonly):
observedCertificates, observedCodecs, observedDataChannels,
observedIceCandidates, observedIceCandidatesPair, observedIceTransports,
observedInboundRtps, observedInboundTracks, observedMediaPlayouts,
observedMediaSources, observedOutboundRtps, observedOutboundTracks,
observedPeerConnectionTransports, observedRemoteInboundRtps, observedRemoteOutboundRtps
Each sub-stat class (ObservedInboundRtp, ObservedOutboundRtp, ObservedInboundTrack,
ObservedOutboundTrack, ObservedDataChannel, ObservedIceCandidate,
ObservedIceCandidatePair, ObservedIceTransport, ObservedCertificate, ObservedCodec,
ObservedMediaSource, ObservedMediaPlayout, ObservedPeerConnectionTransport,
ObservedRemoteInboundRtp, ObservedRemoteOutboundRtp) mirrors the corresponding stat
fields from the schema plus derived fields (deltas, bitrates).
The single entry point. It:
- drops + emits
sample-rejectedif the observer is closed; - runs the sample through the global accept-middleware chain (see below);
- (chain terminal) drops + emits
sample-rejectedifcallId/clientIdis missing; - gets or lazily creates the
ObservedCallandObservedClient(theirappDatacomes from the configured factories, never fromcontext); - delegates to
client.accept(sample, context), which fans out to eachObservedPeerConnection.accept(pcSample, context).
observer.addAcceptMiddleware(...) registers middlewares run on every sample inside
accept(), in order, before the sample is dispatched to any call or client. Each middleware
gets a { sample, context } payload; it can inspect or mutate the sample (set/normalize
callId/clientId, enrich, redact) or the context, then call next(payload) to continue.
Not calling next drops the sample — nothing is created and no event fires. A throwing
middleware is caught and warns (the sample is dropped), never crashing accept().
import { Observer, AcceptMiddleware } from '@observertc/observer-js';
const observer = new Observer();
// derive callId/clientId from the app's own attachment, before dispatch
const route: AcceptMiddleware = ({ sample }, next) => {
sample.callId ??= sample.attachments?.roomId as string;
sample.clientId ??= sample.attachments?.peerId as string;
next({ sample });
};
// drop samples from a blocklisted client (never dispatched)
const filter: AcceptMiddleware = (payload, next) => {
if (blocked.has(payload.sample.clientId)) return; // no next() => dropped
next(payload);
};
observer.addAcceptMiddleware(route, filter);
// observer.removeAcceptMiddleware(route);This is a lightweight global injection point, distinct from the larger (not-yet-built)
ClientSampleProcessor pipeline in the roadmap. When no middleware is registered, accept()
dispatches directly with no overhead.
type AcceptContext = Record<string, unknown>;A single, optional, free-form object threaded down the whole accept chain
(Observer → Client → PeerConnection). It is transient request-scoped data — temporary or
contextual information the application wants available while an update is processed.
context is never written to appData and is not stored on any entity. The two are
deliberately distinct:
appData— application-assigned extra info that identifies/decorates an entity, fixed at creation (viasettings.appDataor thecreateCallAppData/createClientAppDatafactories), or assigned by the app on the*-addedevents. The library never changes it.context— passed peraccept(), may differ on every call, and is carried straight through to the*-updatedevents that theaccept()triggers, then discarded.
client-updated and peer-connection-updated carry the exact context of that sample;
call-updated carries the context of the client accept() that drove the call update (absent
for interval- or teardown-driven call updates). When no context is given, the field is absent.
If you want to create/configure entities yourself before/without samples:
const call = observer.getOrCreateObservedCall({ callId, appData }); // ObservedCall | undefined
const client = call?.getOrCreateObservedClient({ clientId, appData }); // ObservedClient | undefinedThese return undefined (and warn) when the parent is closed; createObservedCall/
createObservedClient return the existing instance (and warn) if the id already exists.
closeClientIfIdleForMs— a client with no sample for this long auto-closes.closeCallIfEmptyForMs— a call with zero clients for this long auto-closes.- Closing cascades down (call → clients → peer connections → sub-stats), unsubscribing
listeners and emitting the
*-closed/*-removedevents.
"Update" means recompute aggregated metrics and emit the *-updated event at that level.
Both the observer and each call have a configurable trigger. Updates are event-driven — there
is no built-in timer. An app that wants a fixed cadence can call observer.update() /
call.update() from its own setInterval. With 'none', nothing auto-updates — the level
updates only when the application calls the public update() itself.
Observer-level (ObserverConfig.updatePolicy, default update-when-all-call-updated):
| Policy | Triggers observer.update() when… |
|---|---|
update-on-any-call-updated |
any call updates |
update-when-all-call-updated |
every call has updated since the last observer update |
none |
never automatically — only when the app calls observer.update() |
Call-level (ObservedCallSettings.updatePolicy, defaulted from
ObserverConfig.defaultCallUpdatePolicy):
| Policy | Triggers call.update() when… |
|---|---|
update-on-any-client-updated |
any client in the call updates |
update-when-all-client-updated |
every client has updated since the last call update |
none |
never automatically — only when the app calls call.update() |
This is the primary API. Subscribe on the Observer instance — it is the single emitter
for the entire hierarchy. The ObservedCall / ObservedClient / ObservedPeerConnection
objects are themselves EventEmitters too, but those local events are reserved for internal
lifecycle/teardown wiring (see Local lifecycle events); application
code should use the Observer bus.
Every Observer event delivers exactly one argument: a payload object. The payload always contains the ancestry from the observer down to the entity that raised it, plus any event- specific subject:
type ObserverEventBase = { observer: Observer };
type ObservedCallScope = ObserverEventBase & { observedCall: ObservedCall };
type ObservedClientScope = ObservedCallScope & { observedClient: ObservedClient };
type ObservedPeerConnectionScope = ObservedClientScope & { observedPeerConnection: ObservedPeerConnection };So a peer-connection-level event hands you the observer, call, client, and peer connection:
observer.on('inbound-rtp-added', ({ observer, observedCall, observedClient, observedPeerConnection, observedInboundRtp }) => {
// all five are present and correctly typed
});observer.on/off/once/emit are fully typed against the event map — the handler argument is
inferred per event name.
All payloads include the ancestry for their level (above). The Extra column lists the additional field(s) on top of that scope.
| Event | Extra payload | Fires when |
|---|---|---|
observer-updated |
— | observer.update() ran (per the observer update policy) |
observer-closed |
— | observer.close() |
sample-rejected |
{ reason: 'observer-closed' | 'missing-callId' | 'missing-clientId', sample: ClientSample } |
a sample was dropped by accept() |
| Event | Extra | Fires when |
|---|---|---|
call-added |
— | a call is created |
call-updated |
{ context?: AcceptContext } |
call.update() ran |
call-closed |
— | the call closed |
call-empty |
— | last client left the call |
call-not-empty |
— | first client joined a previously-empty call |
call-issue |
{ issue: ClientIssue } |
call.addIssue(...) (server-side detector finding) |
| Event | Extra | Fires when |
|---|---|---|
client-added |
— | a client is created |
client-sink-created |
{ sink: ClientSampleSink } |
a per-client sink was created (only when createClientSink returns one); fires right after client-added |
client-updated |
{ sample: ClientSample, elapsedTimeInMs: number, context?: AcceptContext } |
the client processed a sample |
client-closed |
— | the client closed |
client-joined |
— | first CLIENT_JOINED event seen |
client-left |
— | CLIENT_LEFT seen (or inferred on close) |
client-rejoined |
{ timestamp: number } |
a later CLIENT_JOINED after an earlier join |
client-issue |
{ issue: ClientIssue } |
a client-reported issue arrived, or client.addIssue(...) |
client-metadata |
{ metaData: ClientMetaData } |
a client meta item arrived |
client-extension-stats |
{ extensionStats: ExtensionStat } |
an app-defined extension stat arrived |
client-event |
{ event: ClientEvent } |
any client event was processed |
| Event | Extra | Notes |
|---|---|---|
peer-connection-added / peer-connection-closed |
— | lifecycle of the PC |
peer-connection-updated |
{ context?: AcceptContext } |
the PC processed a sample |
ice-connection-state-changed / ice-gathering-state-changed / connection-state-changed |
{ state: string } |
driven by client events |
selected-candidate-pair-changed |
— | declared; not currently emitted |
inbound-track-added / -updated / -removed / -muted / -unmuted |
{ observedInboundTrack } |
|
outbound-track-added / -updated / -removed / -muted / -unmuted |
{ observedOutboundTrack } |
|
inbound-rtp-added / -updated / -removed |
{ observedInboundRtp } |
-updated fires every tick |
outbound-rtp-added / -updated / -removed |
{ observedOutboundRtp } |
-updated fires every tick |
remote-inbound-rtp-added / -updated / -removed |
{ observedRemoteInboundRtp } |
|
remote-outbound-rtp-added / -updated / -removed |
{ observedRemoteOutboundRtp } |
|
data-channel-added / -updated / -removed |
{ observedDataChannel } |
|
ice-candidate-added / -updated / -removed |
{ observedIceCandidate } |
|
ice-candidate-pair-added / -updated / -removed |
{ observedIceCandidatePair } |
|
ice-transport-added / -updated / -removed |
{ observedIceTransport } |
|
codec-added / -updated / -removed |
{ observedCodec } |
|
media-source-added / -updated / -removed |
{ observedMediaSource } |
|
media-playout-added / -updated / -removed |
{ observedMediaPlayout } |
|
peer-connection-transport-added / -updated / -removed |
{ observedPeerConnectionTransport } |
|
certificate-added / -updated / -removed |
{ observedCertificate } |
Volume note. The
*-updatedsub-stat events fire on every peer-connectionaccept()(i.e. per sample, per stream). For high-throughput servers, subscribe only to what you need, or read fields off the entities onclient-updated/call-updatedinstead.
These remain on the individual entities (not the bus), for teardown/coordination. You can listen to them, but prefer the bus equivalents above for application logic.
| Entity | Local events |
|---|---|
ObservedCall |
update, newclient, empty, not-empty, close |
ObservedClient |
update (sample, elapsedTimeInMs), close, joined, left |
ObservedPeerConnection |
removed-inbound-track, removed-outbound-track, close |
new Observer<AppData>(config?: ObserverConfig<AppData>)
type ObserverConfig<AppData = Record<string, unknown>> = {
updatePolicy?: 'update-on-any-call-updated' | 'update-when-all-call-updated' | 'none';
defaultCallUpdatePolicy?: ObservedCallSettings['updatePolicy'];
appData?: AppData;
closeClientIfIdleForMs?: number;
closeCallIfEmptyForMs?: number;
// appData factories — run when an entity is created without explicit appData
// (incl. lazily by accept()). appData is application-owned; accept `context` never touches it.
createCallAppData?: (p: { callId: string; observer: Observer }) => Record<string, unknown>;
createClientAppData?: (p: { clientId: string; observedCall: ObservedCall }) => Record<string, unknown>;
// sink factory — produces a per-client sink that receives every accepted sample (see Sinks).
createClientSink?: (p: { clientId: string; observedCall: ObservedCall }) => ClientSampleSink | undefined;
// track-resolver factory — produces a call's RemoteTrackResolver (see Remote track resolution).
createTrackResolver?: (observedCall: ObservedCall) => RemoteTrackResolver | undefined;
};appData factories. Instead of pre-creating a call/client (or assigning on call-added /
client-added) just to enrich its appData, register a factory once. It runs in the entity's
constructor whenever it's created without an explicit settings.appData — including the lazy
creation inside accept(). The client factory receives the already-created parent
observedCall, so it can derive fields from it. appData is application-owned and is never
modified by the accept() context.
const observer = new Observer({
createCallAppData: ({ callId }) => ({ callId, startedAt: Date.now(), region: 'eu' }),
createClientAppData: ({ clientId, observedCall }) => ({ clientId, region: observedCall.appData.region }),
});Key members:
accept(sample: ClientSample, context?: AcceptContext): voidaddAcceptMiddleware(...mw: AcceptMiddleware[]): this/removeAcceptMiddleware(...mw): this— global pre-dispatch sample hooks (see Accept middlewares)getObservedCall<T>(callId): ObservedCall<T> | undefinedcreateObservedCall<T>(settings): ObservedCall<T> | undefinedgetOrCreateObservedCall<T>(settings): ObservedCall<T> | undefinedupdate(): void— force an aggregation/observer-updatedtickclose(): voidreadonly observedCalls: Map<string, ObservedCall>readonly observedTURN: ObservedTURNget appData(),get numberOfCalls()- counters:
numberOfClients,numberOfClientsUsingTurn,numberOfInboundRtpStreams,numberOfOutboundRtpStreams,numberOfDataChannels,numberOfPeerConnections,totalAddedCall,totalRemovedCall,closed on/off/once/emittyped against the event map
type ObservedCallSettings<AppData = Record<string, unknown>> = {
updatePolicy?: 'update-on-any-client-updated' | 'update-when-all-client-updated' | 'none';
callId: string;
appData?: AppData;
closeCallIfEmptyForMs?: number;
};Key members:
readonly callId: string,appData: AppDatareadonly observedClients: Map<string, ObservedClient>,get numberOfClients()getObservedClient<T>(clientId),createObservedClient<T>(settings),getOrCreateObservedClient<T>(settings)(all… | undefined)addIssue(issue: ClientIssue): void— raise a call-level issue → emitscall-issuereadonly detectors: Detectors— server-side detector registry (empty by default; see Detectors)scoreCalculator: ScoreCalculator,get score(),readonly calculatedScoreremoteTrackResolver?: RemoteTrackResolver— set fromObserverConfig.createTrackResolverat call creation (see Remote track resolution)- aggregates:
numberOfIssues,numberOfPeerConnections,numberOfInboundRtpStreams,numberOfOutboundRtpStreams,numberOfDataChannels,maxNumberOfClients,clientsUsedTurn: Set<string>,startedAt?,endedAt?,closedAt?,closed update(),close()
type ObservedClientSettings<AppData = Record<string, unknown>> = {
clientId: string;
appData?: AppData;
closeClientIfIdleForMs?: number;
};Key members:
readonly clientId: string,appData: AppData,readonly call: ObservedCallreadonly observedPeerConnections: Map<string, ObservedPeerConnection>readonly sink?: ClientSampleSink— the per-client sink (see Sinks), ifcreateClientSinkis configured; listen on it forclose/error- Injection API (queue app data to be merged into the next sample processing):
injectEvent(ClientEvent),injectIssue(ClientIssue),injectMetaData(ClientMetaData),injectExtensionStat(ExtensionStat),injectAttachment(key, value) - Direct add API (process immediately):
addIssue(ClientIssue),addMetadata(ClientMetaData),addExtensionStats(ExtensionStat) - Metrics (current/derived):
currentAvgRttInMs?,currentMinRttInMs?,currentMaxRttInMs?,receivingAudioBitrate,receivingVideoBitrate,sendingAudioBitrate,sendingVideoBitrate,usingTURN,usingTCP,availableIncomingBitrate,availableOutgoingBitrate - Counts:
numberOfInboundRtpStreams,numberOfOutboundRtpStreams,numberOfInbundTracks,numberOfOutboundTracks,numberOfDataChannels,numberOfPeerConnections - Per-tick deltas:
deltaReceivedAudioBytes,deltaSentAudioBytes, … (see source for the full set) - Lifecycle:
joinedAt?,leftAt?,closedAt?,closed,get score() - Metadata:
browser?,engine?,platform?,operationSystem?,mediaDevices,mediaConstraints accept(sample, context?),close()
Key members:
readonly peerConnectionId: string,readonly client: ObservedClient,appData?- The 15
observed*sub-statMaps (listed above), plus array getters:codecs,inboundRtps,outboundRtps,remoteInboundRtps,remoteOutboundRtps,mediaSources,mediaPlayouts,dataChannels,peerConnectionTransports,iceTransports,iceCandidates,iceCandidatePairs,certificates,selectedIceCandidatePairs,selectedIceCandiadtePairForTurn - State:
connectionState?,iceConnectionState?,iceGatheringState?,usingTURN,usingTCP - Metrics:
currentRttInMs?,currentJitter?,availableIncomingBitrate,availableOutgoingBitrate, sending/receiving bitrates, packet rates, andtotal*/delta*byte/packet counters accept(pcSample, context?),close(),get score()
Remote-RTP correlation (derived). During accept(), receiver/sender reports are linked
to the local streams by remoteId (fallback SSRC) and surfaced as fields:
- on
ObservedOutboundRtp:remoteRttInMs?,remoteFractionLost?,remoteJitter?,remotePacketsLost? - on
ObservedInboundRtp:remoteRttInMs?,remoteBytesSent?,remotePacketsSent?,remoteTimestamp?
These are reset each tick and only set when the matching remote report is present.
The shape of an accepted sample (re-exported from this package; identical to
@observertc/schemas). Only the top level is shown — each stat object mirrors the standard
WebRTC getStats() dictionaries plus a few extensions.
type ClientSample = {
timestamp: number; // client wall-clock (ms epoch)
callId?: string; // set by you or the library
clientId?: string; // set by you or the library
score?: number; // optional client-computed score (0..5)
attachments?: Record<string, unknown>;
peerConnections?: PeerConnectionSample[];
clientEvents?: ClientEvent[];
clientIssues?: ClientIssue[];
clientMetaItems?: ClientMetaData[];
extensionStats?: ExtensionStat[];
};
type PeerConnectionSample = {
peerConnectionId: string;
attachments?: Record<string, unknown>; // e.g. { direction: 'send'|'recv', producerId, consumerId, label }
score?: number;
inboundTracks?; outboundTracks?;
codecs?;
inboundRtps?; remoteInboundRtps?;
outboundRtps?; remoteOutboundRtps?;
mediaSources?; mediaPlayouts?;
peerConnectionTransports?; dataChannels?;
iceTransports?; iceCandidates?; iceCandidatePairs?;
certificates?;
};
type ClientEvent = { type: string; payload?: string; timestamp?: number; /* +ids */ };
type ClientIssue = { type: string; payload?: string; timestamp?: number }; // also used for call-issue
type ClientMetaData = { type: string; payload?: string; timestamp?: number; /* +ids */ };
type ExtensionStat = { type: string; payload?: string };payload fields are JSON strings; the library parses the ones it understands.
ClientEventTypes (enum of known event.type values): CLIENT_JOINED, CLIENT_LEFT,
PEER_CONNECTION_OPENED/CLOSED/STATE_CHANGED, MEDIA_TRACK_ADDED/REMOVED/MUTED/UNMUTED/RESUMED,
ICE_GATHERING_STATE_CHANGED, ICE_CONNECTION_STATE_CHANGED, DATA_CHANNEL_OPEN/CLOSED/ERROR,
NEGOTIATION_NEEDED, SIGNALING_STATE_CHANGE, ICE_CANDIDATE, ICE_CANDIDATE_ERROR, and the
mediasoup set PRODUCER_* / CONSUMER_* / DATA_PRODUCER_* / DATA_CONSUMER_*.
ClientMetaTypes (enum of known meta type values): MEDIA_CONSTRAINT, MEDIA_DEVICE,
MEDIA_DEVICES_SUPPORTED_CONSTRAINTS, USER_MEDIA_ERROR, LOCAL_SDP, OPERATION_SYSTEM,
ENGINE, PLATFORM, BROWSER.
observer-js deliberately ships no built-in detectors. Per-client signals — packet loss,
jitter, RTT, freezes, etc. — are already detectable on the client and arrive on samples as
clientIssues (surfaced via client-issue). Server-side detection should focus on what only
the server can see by correlating data across the clients of a call.
The hook lives on ObservedCall:
import { Observer, Detector } from '@observertc/observer-js';
class MyCrossClientDetector implements Detector {
readonly name = 'my-detector';
constructor(private readonly call /* : ObservedCall */) {}
update() { // called on every call.update()
// …inspect this.call.observedClients across participants…
if (/* condition only visible server-side */ false) {
this.call.addIssue({ type: this.name, payload: JSON.stringify({ /* … */ }), timestamp: Date.now() });
// → emitted on the bus as 'call-issue'
}
}
}
const observer = new Observer();
observer.on('call-added', ({ observedCall }) => {
observedCall.detectors.add(new MyCrossClientDetector(observedCall));
});
observer.on('call-issue', ({ observedCall, issue }) => { /* react */ });Detector interface and the registry:
interface Detector { readonly name: string; update(): void; }
class Detectors {
add(d: Detector): void;
remove(d: Detector): void;
clear(): void;
update(): void; // called by ObservedCall.update(); guards each detector in try/catch
get listOfNames(): string[];
}In an SFU, one participant's outbound track is delivered to other participants as inbound
tracks (one publisher → many subscribers). Correlation is opt-in per observer: set
ObserverConfig.createTrackResolver, a factory invoked when each call is created that returns the
call's RemoteTrackResolver (or undefined for none).
RemoteTrackResolver is a generic, strategy-driven class. It subscribes to the bus (filtered to
its call) and links tracks by publisher id — the link key — maintaining the links directly on
the tracks: inboundTrack.remoteOutboundTrack and outboundTrack.remoteInboundTracks: Set.
import { Observer, createDefaultMediasoupRemoteTrackResolverFactory } from '@observertc/observer-js';
const observer = new Observer({
createTrackResolver: createDefaultMediasoupRemoteTrackResolverFactory(),
});
// later, given tracks (links are kept up to date as tracks come and go):
const source = inboundTrack.remoteOutboundTrack; // the publishing ObservedOutboundTrack
const receivers = [ ...outboundTrack.remoteInboundTracks ]; // the subscribing ObservedInboundTrack[]Two built-in factories ship: createDefaultMediasoupRemoteTrackResolverFactory() (publisher =
attachments.producerId, subscriber = attachments.consumerId) and
createP2pRemoteTrackResolverFactory() (matches by RTP SSRC, preserved end-to-end in p2p).
For any other topology, build a RemoteTrackResolver with your own key resolvers — the publisher
id is just whatever links a subscribed track to the published one:
import { Observer, RemoteTrackResolver } from '@observertc/observer-js';
const observer = new Observer({
createTrackResolver: (observedCall) => new RemoteTrackResolver(observedCall, {
resolveOutboundTrackPublisherId: (out) => out.attachments?.mediaId as string | undefined,
resolveInboundTrackPublisherId: (inb) => inb.attachments?.mediaId as string | undefined,
resolveInboundTrackSubscriberId: (inb) => inb.attachments?.subId as string | undefined, // optional
}),
});For the mediasoup factory, the application puts producerId / consumerId (and optionally
direction, label) into the track attachments.
A sink receives the samples a client accepts — for archival, streaming, or later offline
replay. Each ObservedClient gets its own sink, produced by the
ObserverConfig.createClientSink factory when the client is created (return undefined for no
sink). The client pushes every accepted sample to its sink, and end()s it on close.
ClientSampleSink is an abstract base class (a typed EventEmitter). You create a sink by
subclassing it and implementing write and end. It is object-mode: write receives
the ClientSample object, so each sink decides how (or whether) to serialize it — JSON line,
protobuf, a remote POST body, an in-memory push, etc.
import { ClientSampleSink, ClientSample } from '@observertc/observer-js';
abstract class ClientSampleSink /* extends EventEmitter */ {
abstract write(sample: ClientSample): boolean; // accept one sample; `false` = backpressure
abstract end(): void; // flush; emit `close` when the destination is ready
// typed events (inherited): the listener signature is inferred from the event name
on(event: 'close' | 'finish' | 'drain', listener: () => void): this;
on(event: 'error', listener: (err: Error) => void): this;
// ...and the matching `once` / `off` / `emit`
}| Event | Meaning |
|---|---|
close |
the destination is fully written and closed (e.g. a file flushed and its fd closed) — "ready" |
error |
the destination failed |
finish |
end() was processed and queued data flushed (before close) |
drain |
the buffer drained after backpressure; safe to write more |
The library calls write(sample) synchronously per accepted sample (it is not awaited),
end()s the sink when the client closes, and attaches an error listener so a failing sink
can't crash the process (it also catches throws from write/end). The application — which
created the sink — listens for close (destination ready) and error. Because write isn't
awaited in the accept() hot path, backpressure and batching are the sink's concern.
import { Observer, createJsonlFileSinkFactory } from '@observertc/observer-js';
const observer = new Observer({
// one ./stats/<callId>__<clientId>.jsonl per client
createClientSink: createJsonlFileSinkFactory({ directory: './stats' }),
});
// React when a sink is created for a client:
observer.on('client-sink-created', ({ observedClient, sink }) => {
sink.on('close', () => {
// the file is fully flushed and its fd closed — ready to upload, move, etc.
});
});| Export | Signature | Notes |
|---|---|---|
createJsonlFileSinkFactory |
({ directory, flags?, getFileName?, serializeSample? }) => ClientSampleSinkFactory |
per-client JSONL files; path defaults to ${callId}__${clientId}.jsonl under directory (which must exist) |
createJsonlFileSink |
({ path, flags?, serializeSample? }) => ClientSampleSink |
a single JSONL file; wraps fs.WriteStream and re-emits its close/finish/drain/error |
JsonlFileSink |
class extends ClientSampleSink |
the underlying class; exposes readonly path so a close handler knows which file is ready |
createInMemorySink / InMemorySink |
(samples?: ClientSample[]) => InMemorySink |
collects the accepted sample objects into .samples: ClientSample[]; emits close on end() |
serializeSample?: (sample: ClientSample) => string overrides the default JSON.stringify for
the JSONL sinks (e.g. to redact or reshape before writing).
The bus hands you the sink as the base ClientSampleSink. To read information specific to a sink
type — for a file sink, where it was written — narrow with instanceof and read the sink's
public fields. JsonlFileSink exposes path:
import { JsonlFileSink } from '@observertc/observer-js';
observer.on('client-sink-created', ({ observedClient, sink }) => {
if (sink instanceof JsonlFileSink) {
const { path } = sink; // the file this client's samples go to
sink.once('close', () => uploadFile(path)); // close = flushed & fd closed → ready
}
});The general pattern: each concrete sink exposes whatever it wants as public readonly fields, and
consumers narrow (instanceof YourSink) to read them. Your own sinks do the same.
Subclass ClientSampleSink and emit the lifecycle events yourself — for any non-file
destination (a remote endpoint, a message queue, an object store, …):
import { ClientSampleSink, ClientSample, ClientSampleSinkFactory } from '@observertc/observer-js';
class HttpSink extends ClientSampleSink {
private buffer: ClientSample[] = [];
constructor(private readonly url: string) { super(); }
write(sample: ClientSample): boolean {
this.buffer.push(sample); // batch; decide your own backpressure
return true;
}
end(): void {
fetch(this.url, { method: 'POST', body: JSON.stringify(this.buffer) })
.then(() => this.emit('close')) // signal "destination ready"
.catch((err) => this.emit('error', err));
}
}
const createClientSink: ClientSampleSinkFactory = ({ clientId, observedCall }) =>
new HttpSink(`https://stats.example.com/${observedCall.callId}/${clientId}`);
const observer = new Observer({ createClientSink });observedClient.sink? exposes the created sink; the client-sink-created event delivers it on
the bus with full ancestry. ClientSampleSinkFactory is
(p: { clientId: string; observedCall: ObservedCall }) => ClientSampleSink | undefined.
observer-js logs through a single, swappable sink. Out of the box it writes debug and
above to console (verbose — install your own sink for production). Funnel everything into your
logger:
import { setObserverLogger, type ObserverLogger } from '@observertc/observer-js';
setObserverLogger({
trace: (m, ...a) => myLogger.trace(`[${m}]`, ...a),
debug: (m, ...a) => myLogger.debug(`[${m}]`, ...a),
info: (m, ...a) => myLogger.info(`[${m}]`, ...a),
warn: (m, ...a) => myLogger.warn(`[${m}]`, ...a),
error: (m, ...a) => myLogger.error(`[${m}]`, ...a),
});createLogger(moduleName) is also exported for your own modules. See
docs/logging.md for pino / winston / console recipes, level
filtering, per-module routing, and full silencing.
The library warns and degrades; it does not throw on operational problems:
createObservedCall/createObservedClienton a closed parent → warn + returnundefined.- Duplicate id → warn + return the existing instance.
accept()on a closed client → warn + no-op.- Sample missing
callId/clientId, or observer closed →sample-rejectedevent. - A throwing accept-middleware → warn + drop that sample (never crashes
accept()).
Therefore create* and getOrCreate* return T | undefined; guard the result. The
Middleware utility's internal invariants (e.g. calling next() twice) throw, but those throws
are caught by accept() and surfaced as a warning.
// Entry: src/index.ts
export { Observer } from './Observer';
export type { ObserverEvents, SampleRejectedReason, AcceptContext, CallAppDataFactory, ClientAppDataFactory } from './Observer';
export type { ObserverEventBase, ObservedCallScope, ObservedClientScope, ObservedPeerConnectionScope } from './ObserverEvents';
export { ObservedCall, ObservedClient, ObservedPeerConnection } from './…';
export { ObservedInboundTrack, ObservedOutboundTrack } from './…';
export { ObservedInboundRtp, ObservedOutboundRtp, ObservedRemoteInboundRtp, ObservedRemoteOutboundRtp } from './…';
export { ObservedMediaSource, ObservedMediaPlayout, ObservedCodec, ObservedCertificate, ObservedDataChannel } from './…';
export { ObservedIceCandidate, ObservedIceCandidatePair, ObservedIceTransport, ObservedPeerConnectionTransport } from './…';
export { ClientSample, ClientIssue, ClientEvent, ClientMetaData } from './schema/ClientSample';
export { ClientEventTypes } from './schema/ClientEventTypes';
export { ClientMetaTypes } from './schema/ClientMetaTypes';
export { ScoreCalculator } from './scores/ScoreCalculator';
export { Detectors } from './detectors/Detectors';
export type { Detector } from './detectors/Detector';
export { createLogger, setObserverLogger } from './common/logger';
export type { Logger, ObserverLogger } from './common/logger';
// sinks: base class (subclass it for a custom destination) + built-ins
export { ClientSampleSink } from './sinks/ClientSampleSink';
export type { ClientSampleSinkEvents, ClientSampleSinkFactory } from './sinks/ClientSampleSink';
export { JsonlFileSink, createJsonlFileSink, createJsonlFileSinkFactory } from './sinks/JsonlFileSink';
export type { JsonlFileSinkOptions, JsonlFileSinkFactoryOptions } from './sinks/JsonlFileSink';
export { InMemorySink, createInMemorySink } from './sinks/InMemorySink';
export { Middleware } from './common/Middleware';
// remote track correlation
export { RemoteTrackResolver } from './utils/RemoteTrackResolver';
export type { RemoteTrackResolvers, RemoteTrackResolverFactory } from './utils/RemoteTrackResolver';
export { createDefaultMediasoupRemoteTrackResolverFactory, createP2pRemoteTrackResolverFactory } from './utils/RemoteTrackResolverFactories';yarn install
yarn build # tsup → dist/ (dual ESM .mjs + CJS .js, single entry, .d.ts/.d.mts + sourcemaps)
yarn lint # eslint -c .eslintrc.json "src/**/*.ts"
yarn typecheck # tsc --noEmit
yarn test # jestThe build is driven by tsup (config in tsup.config.ts): a single
entry (src/index.ts), dual ESM + CommonJS output to dist/ (index.mjs / index.js) with
.d.mts / .d.ts types and sourcemaps, targeting Node 20. CI (.github/workflows/ci.yml) runs
lint + typecheck + build + test on every push/PR.
Project layout (src/): Observer.ts, ObservedCall.ts, ObservedClient.ts,
ObservedPeerConnection.ts, the Observed* sub-stat classes, ObserverEvents.ts (the typed
event map + scope types), detectors/ (Detector, Detectors), scores/, updaters/
(update-policy strategies), utils/ (remote-track resolvers), common/ (logger, utils,
Middleware), schema/ (sample/event/meta types), and sinks/ (the ClientSampleSink base +
JsonlFileSink / InMemorySink, re-exported from the package root).
Conventions to follow when developing further:
- Single event bus. New consumer-facing events go in
ObserverEvents.tswith an object payload[<Scope> & { …subject }], and are emitted via the component's_notify(type, { ...this.eventScope, …subject }). Each component has a precomputedeventScopefield and a thin_notifywrapper around the right emitter. Keep purely internal coordination as local EventEmitter events (and remember tooffthem on close). - Warn, don't throw on operational/edge conditions; return
undefinedwhere a value can't be produced. - Counter-reset-safe deltas. When computing a delta from a cumulative counter, never emit a
negative value (guard
curr >= prev), to survive counter resets / SSRC reuse. - Explicit accumulation. The per-sample metric accumulation in
accept()is intentionally explicit and not abstracted — match that style. - Detectors are server-side. Add cross-client detectors on
ObservedCall.detectors; don't re-implement client-detectable signals.
Recipes:
- Add an event: add the key + payload to
ObserverEvents; in the owning component callthis._notify('my-event', { ...this.eventScope, subject }). - Add a per-stream metric: add the field to the relevant
Observed*Rtp/track class, populate it in itsupdate()(reset at the top ofupdate()if it's per-tick), and read it from a*-updatedhandler. - Add a detector: implement
Detector, register it oncall-addedviaobservedCall.detectors.add(...), surface findings withobservedCall.addIssue(...).
For an agent continuing the work, these are explicitly not present yet:
- Tests. There is currently only a placeholder spec. The two
accept()methods (ObservedClient,ObservedPeerConnection) are the priority for characterization tests. (A CI gate running lint + typecheck + test is already in place — see.github/workflows/ci.yml.) - Built-in detectors / quality classifier. The registry exists; concrete server-side detectors (e.g. producer→consumer delivery mismatch, quality outlier, asymmetric media) are to be designed.
- Per-tick snapshot API. A serializable snapshot per
*-updatedtick to replace consuming the fine-grained*-updatedevents (under consideration). - Monotonic-timestamp / clock-skew handling for client clock jumps.
- Batch
processSamples()entry point for offline analysis of recorded sample streams. - Expanded derived metrics (jitter-buffer delay, concealment, freeze fraction, encode/decode
CPU, quality-limitation breakdown, etc.) and first-class producer/consumer & PC-direction
fields beyond
attachments.
Apache-2.0. Part of the ObserverTC ecosystem.