From c34fb9287e07e6abc7dd781beb7c15f60df9d9ba Mon Sep 17 00:00:00 2001 From: Oliver Ortlieb Date: Wed, 11 Feb 2026 21:15:11 -0500 Subject: [PATCH 1/3] feat: flags to log stats --- cmd/lk/room.go | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/cmd/lk/room.go b/cmd/lk/room.go index 2f206ccd..dcc96707 100644 --- a/cmd/lk/room.go +++ b/cmd/lk/room.go @@ -24,6 +24,7 @@ import ( "regexp" "strings" "syscall" + "time" "github.com/pion/webrtc/v4" "github.com/urfave/cli/v3" @@ -199,6 +200,15 @@ var ( Name: "metadata", Usage: "`JSON` metadata which will be passed to participant", }, + &cli.BoolFlag{ + Name: "stats", + Usage: "Periodically log publisher WebRTC GetStats as one JSON object per line to stderr", + }, + &cli.FloatFlag{ + Name: "stats-interval", + Usage: "Seconds between stats logs when --stats is set", + Value: 5.0, + }, }, }, { @@ -1060,6 +1070,30 @@ func joinRoom(ctx context.Context, cmd *cli.Command) error { } } + if cmd.Bool("stats") { + interval := cmd.Float("stats-interval") + if interval <= 0 { + interval = 5.0 + } + ticker := time.NewTicker(time.Duration(interval * float64(time.Second))) + go func() { + defer ticker.Stop() + for { + select { + case <-done: + return + case <-ticker.C: + pc := room.LocalParticipant.GetPublisherPeerConnection() + if pc == nil { + continue + } + report := pc.GetStats() + logger.Infow("stats", "stats", report) + } + } + }() + } + if cmd.IsSet("open") { switch cmd.String("open") { case string(util.OpenTargetMeet): From 3e88a073adc3c0286f86efebf4669fd5d649ae6a Mon Sep 17 00:00:00 2001 From: Sandeep Jain Date: Wed, 20 May 2026 10:45:33 -0400 Subject: [PATCH 2/3] stats: use pion interceptor to populate remote-inbound-rtp per-track stats pc.GetStats() on a publisher PeerConnection only returns ICE/transport stats because pion's rtpsender has no collectStats implementation. The per-track RTCP Receiver Report data (packetsLost, jitter, RTT, fractionLost) exists in the pion interceptor stats recorder but is never surfaced. Register a stats.InterceptorFactory via WithInterceptors and capture the Getter in OnNewPeerConnection. In the stats ticker, enumerate senders via pc.GetSenders(), look up each SSRC in the Getter, and append remote-inbound-rtp entries to the combined stats map alongside the existing ICE/transport stats. The JSON output now includes entries with type "remote-inbound-rtp" containing packetsLost, jitter, roundTripTime, and fractionLost for each published track's SSRC. --- cmd/lk/room.go | 58 +++++++++++++++++++++++++++++++++++++++++++++++--- go.mod | 2 +- 2 files changed, 56 insertions(+), 4 deletions(-) diff --git a/cmd/lk/room.go b/cmd/lk/room.go index dcc96707..164d97c6 100644 --- a/cmd/lk/room.go +++ b/cmd/lk/room.go @@ -23,9 +23,12 @@ import ( "os/signal" "regexp" "strings" + "sync" "syscall" "time" + "github.com/pion/interceptor" + pionStats "github.com/pion/interceptor/pkg/stats" "github.com/pion/webrtc/v4" "github.com/urfave/cli/v3" "google.golang.org/protobuf/encoding/protojson" @@ -983,6 +986,26 @@ func joinRoom(ctx context.Context, cmd *cli.Command) error { maps.Copy(participantAttributes, fileAttrs) } + var connectOpts []lksdk.ConnectOption + connectOpts = append(connectOpts, lksdk.WithAutoSubscribe(autoSubscribe)) + + var ( + statsGetterMu sync.Mutex + statsGetter pionStats.Getter + ) + if cmd.Bool("stats") { + statsFactory, err := pionStats.NewInterceptor() + if err != nil { + return err + } + statsFactory.OnNewPeerConnection(func(_ string, g pionStats.Getter) { + statsGetterMu.Lock() + statsGetter = g + statsGetterMu.Unlock() + }) + connectOpts = append(connectOpts, lksdk.WithInterceptors([]interceptor.Factory{statsFactory})) + } + room, err := lksdk.ConnectToRoom(project.URL, lksdk.ConnectInfo{ APIKey: project.APIKey, APISecret: project.APISecret, @@ -990,7 +1013,7 @@ func joinRoom(ctx context.Context, cmd *cli.Command) error { ParticipantIdentity: participantIdentity, ParticipantAttributes: participantAttributes, ParticipantMetadata: cmd.String("metadata"), - }, roomCB, lksdk.WithAutoSubscribe(autoSubscribe)) + }, roomCB, connectOpts...) if err != nil { return err } @@ -1087,8 +1110,37 @@ func joinRoom(ctx context.Context, cmd *cli.Command) error { if pc == nil { continue } - report := pc.GetStats() - logger.Infow("stats", "stats", report) + combined := make(map[string]interface{}) + for k, v := range pc.GetStats() { + combined[k] = v + } + statsGetterMu.Lock() + g := statsGetter + statsGetterMu.Unlock() + if g != nil { + for _, sender := range pc.GetSenders() { + for _, enc := range sender.GetParameters().Encodings { + ssrc := uint32(enc.SSRC) + if ssrc == 0 { + continue + } + s := g.Get(ssrc) + if s == nil { + continue + } + id := fmt.Sprintf("RTCRemoteInboundRtp_%d", ssrc) + combined[id] = map[string]interface{}{ + "type": "remote-inbound-rtp", + "ssrc": ssrc, + "packetsLost": s.RemoteInboundRTPStreamStats.PacketsLost, + "jitter": s.RemoteInboundRTPStreamStats.Jitter, + "roundTripTime": s.RemoteInboundRTPStreamStats.RoundTripTime.Seconds(), + "fractionLost": s.RemoteInboundRTPStreamStats.FractionLost, + } + } + } + } + logger.Infow("stats", "stats", combined) } } }() diff --git a/go.mod b/go.mod index d3f65cdd..c941c351 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/livekit/server-sdk-go/v2 v2.13.3 github.com/moby/patternmatcher v0.6.0 github.com/pelletier/go-toml v1.9.5 + github.com/pion/interceptor v0.1.43 github.com/pion/rtcp v1.2.16 github.com/pion/rtp v1.10.0 github.com/pion/webrtc/v4 v4.2.3 @@ -137,7 +138,6 @@ require ( github.com/pion/datachannel v1.6.0 // indirect github.com/pion/dtls/v3 v3.0.10 // indirect github.com/pion/ice/v4 v4.2.0 // indirect - github.com/pion/interceptor v0.1.43 // indirect github.com/pion/logging v0.2.4 // indirect github.com/pion/mdns/v2 v2.1.0 // indirect github.com/pion/randutil v0.1.0 // indirect From a4e4be0838e11d41d454c840a0561615fe93c4e7 Mon Sep 17 00:00:00 2001 From: Sandeep Jain Date: Wed, 20 May 2026 14:20:59 -0400 Subject: [PATCH 3/3] fix: collect getters from all PCs; use publisher getter for sender SSRCs OnNewPeerConnection fires for both publisher and subscriber PCs since WithInterceptors applies to both. Previously we overwrote statsGetter with the last (subscriber) getter, so g.Get(ssrc) always returned nil for sender SSRCs. Now we collect all getters and try each one per SSRC, breaking on the first hit (the publisher getter). Co-Authored-By: Claude Sonnet 4.6 --- cmd/lk/room.go | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/cmd/lk/room.go b/cmd/lk/room.go index 164d97c6..6b84e6c6 100644 --- a/cmd/lk/room.go +++ b/cmd/lk/room.go @@ -991,7 +991,7 @@ func joinRoom(ctx context.Context, cmd *cli.Command) error { var ( statsGetterMu sync.Mutex - statsGetter pionStats.Getter + statsGetters []pionStats.Getter ) if cmd.Bool("stats") { statsFactory, err := pionStats.NewInterceptor() @@ -1000,7 +1000,7 @@ func joinRoom(ctx context.Context, cmd *cli.Command) error { } statsFactory.OnNewPeerConnection(func(_ string, g pionStats.Getter) { statsGetterMu.Lock() - statsGetter = g + statsGetters = append(statsGetters, g) statsGetterMu.Unlock() }) connectOpts = append(connectOpts, lksdk.WithInterceptors([]interceptor.Factory{statsFactory})) @@ -1115,15 +1115,16 @@ func joinRoom(ctx context.Context, cmd *cli.Command) error { combined[k] = v } statsGetterMu.Lock() - g := statsGetter + getters := make([]pionStats.Getter, len(statsGetters)) + copy(getters, statsGetters) statsGetterMu.Unlock() - if g != nil { - for _, sender := range pc.GetSenders() { - for _, enc := range sender.GetParameters().Encodings { - ssrc := uint32(enc.SSRC) - if ssrc == 0 { - continue - } + for _, sender := range pc.GetSenders() { + for _, enc := range sender.GetParameters().Encodings { + ssrc := uint32(enc.SSRC) + if ssrc == 0 { + continue + } + for _, g := range getters { s := g.Get(ssrc) if s == nil { continue @@ -1137,6 +1138,7 @@ func joinRoom(ctx context.Context, cmd *cli.Command) error { "roundTripTime": s.RemoteInboundRTPStreamStats.RoundTripTime.Seconds(), "fractionLost": s.RemoteInboundRTPStreamStats.FractionLost, } + break } } }