From 92cc69f4bfe3d73d2f09f4cef21a20a5afe994c8 Mon Sep 17 00:00:00 2001 From: Haffi Mazhar Date: Sun, 17 May 2026 16:28:44 +0100 Subject: [PATCH] add support for packet trailer features --- cmd/lk/agent.go | 5 ++-- cmd/lk/join.go | 78 +++++++++++++++++++++++++++++++++++++++++-------- cmd/lk/room.go | 18 ++++++++++-- go.mod | 8 ++--- go.sum | 8 +++++ 5 files changed, 96 insertions(+), 21 deletions(-) diff --git a/cmd/lk/agent.go b/cmd/lk/agent.go index 84b6f462..afc51e46 100644 --- a/cmd/lk/agent.go +++ b/cmd/lk/agent.go @@ -672,7 +672,7 @@ func createAgent(ctx context.Context, cmd *cli.Command) error { return err } else if viewLogs { fmt.Println("Tailing runtime logs...safe to exit at any time") - return agentsClient.StreamLogs(ctx, "deploy", lkConfig.Agent.ID, os.Stdout, resp.ServerRegions[0]) + return agentsClient.StreamLogs(ctx, "deploy", lkConfig.Agent.ID, "", os.Stdout, resp.ServerRegions[0]) } } return nil @@ -1022,7 +1022,8 @@ func getLogs(ctx context.Context, cmd *cli.Command) error { return fmt.Errorf("no agent deployments found") } - return agentsClient.StreamLogs(ctx, cmd.String("log-type"), agentID, os.Stdout, response.Agents[0].AgentDeployments[0].ServerRegion) + deployment := response.Agents[0].AgentDeployments[0] + return agentsClient.StreamLogs(ctx, cmd.String("log-type"), agentID, deployment.Environment, os.Stdout, deployment.ServerRegion) } func deleteAgent(ctx context.Context, cmd *cli.Command) error { diff --git a/cmd/lk/join.go b/cmd/lk/join.go index fb534f30..166db4e7 100644 --- a/cmd/lk/join.go +++ b/cmd/lk/join.go @@ -57,7 +57,7 @@ var ( &cli.StringSliceFlag{ Name: "publish", TakesFile: true, - Usage: "`FILES` to publish as tracks to room (supports .h264, .ivf, .ogg). " + + Usage: "`FILES` to publish as tracks to room (supports .h264, .h265, .ivf, .ogg). " + "can be used multiple times to publish multiple files. " + "can publish from Unix or TCP socket using the format '://' or '://' respectively. Valid codecs are \"h264\", \"h265\", \"vp8\", \"opus\"", }, @@ -76,6 +76,15 @@ var ( const mimeDelimiter = "://" +type packetTrailerPublishOptions struct { + AttachUserTimestamp bool + AttachFrameID bool +} + +func (o packetTrailerPublishOptions) Enabled() bool { + return o.AttachUserTimestamp || o.AttachFrameID +} + func _deprecatedJoinRoom(ctx context.Context, cmd *cli.Command) error { pc, err := loadProjectDetails(cmd) if err != nil { @@ -182,7 +191,7 @@ func _deprecatedJoinRoom(ctx context.Context, cmd *cli.Command) error { _ = room.LocalParticipant.UnpublishTrack(pub.SID()) } } - if err = handlePublish(room, pub, fps, h26xStreamingFormat, onPublishComplete); err != nil { + if err = handlePublish(room, pub, fps, h26xStreamingFormat, packetTrailerPublishOptions{}, onPublishComplete); err != nil { return err } } @@ -196,6 +205,7 @@ func handlePublish(room *lksdk.Room, name string, fps float64, h26xStreamingFormat string, + packetTrailerOpts packetTrailerPublishOptions, onPublishComplete func(pub *lksdk.LocalTrackPublication), ) error { if isSocketFormat(name) { @@ -203,9 +213,9 @@ func handlePublish(room *lksdk.Room, if err != nil { return err } - return publishSocket(room, mimeType, socketType, address, fps, h26xStreamingFormat, onPublishComplete) + return publishSocket(room, mimeType, socketType, address, fps, h26xStreamingFormat, packetTrailerOpts, onPublishComplete) } - return publishFile(room, name, fps, h26xStreamingFormat, onPublishComplete) + return publishFile(room, name, fps, h26xStreamingFormat, packetTrailerOpts, onPublishComplete) } func publishDemo(room *lksdk.Room) error { @@ -238,8 +248,11 @@ func publishFile(room *lksdk.Room, filename string, fps float64, h26xStreamingFormat string, + packetTrailerOpts packetTrailerPublishOptions, onPublishComplete func(pub *lksdk.LocalTrackPublication), ) error { + isH26x := isH26xFile(filename) + // Configure provider opts := []lksdk.ReaderSampleProviderOption{ lksdk.ReaderTrackWithRTCPHandler(func(packet rtcp.Packet) { @@ -255,15 +268,20 @@ func publishFile(room *lksdk.Room, onPublishComplete(pub) })) } + if isH26x && packetTrailerOpts.Enabled() { + opts = append(opts, lksdk.ReaderTrackWithPacketTrailer(true)) + } // Set frame rate if it's a video stream and FPS is set ext := filepath.Ext(filename) - if ext == ".h264" || ext == ".ivf" { + if isH26x || ext == ".ivf" { if fps != 0 { frameDuration := time.Second / time.Duration(fps) opts = append(opts, lksdk.ReaderTrackWithFrameDuration(frameDuration)) } + } + if isH26x { switch h26xStreamingFormat { case "annex-b": opts = append(opts, lksdk.ReaderTrackWithH26xStreamingFormat(lksdk.H26xStreamingFormatAnnexB)) @@ -280,7 +298,9 @@ func publishFile(room *lksdk.Room, return err } pub, err = room.LocalParticipant.PublishTrack(track, &lksdk.TrackPublicationOptions{ - Name: filename, + Name: filename, + AttachUserTimestamp: isH26x && packetTrailerOpts.AttachUserTimestamp, + AttachFrameId: isH26x && packetTrailerOpts.AttachFrameID, }) return err } @@ -325,6 +345,7 @@ func publishSocket(room *lksdk.Room, address string, fps float64, h26xStreamingFormat string, + packetTrailerOpts packetTrailerPublishOptions, onPublishComplete func(pub *lksdk.LocalTrackPublication), ) error { var mime string @@ -348,7 +369,7 @@ func publishSocket(room *lksdk.Room, } // Publish to room - err = publishReader(room, sock, mime, fps, h26xStreamingFormat, onPublishComplete) + err = publishReader(room, sock, mime, fps, h26xStreamingFormat, packetTrailerOpts, onPublishComplete) return err } @@ -357,16 +378,21 @@ func publishReader(room *lksdk.Room, mime string, fps float64, h26xStreamingFormat string, + packetTrailerOpts packetTrailerPublishOptions, onPublishComplete func(pub *lksdk.LocalTrackPublication), ) error { // Configure provider var opts []lksdk.ReaderSampleProviderOption + isH26x := isH26xMime(mime) var pub *lksdk.LocalTrackPublication if onPublishComplete != nil { opts = append(opts, lksdk.ReaderTrackWithOnWriteComplete(func() { onPublishComplete(pub) })) } + if isH26x && packetTrailerOpts.Enabled() { + opts = append(opts, lksdk.ReaderTrackWithPacketTrailer(true)) + } // Set frame rate if it's a video stream and FPS is set if strings.HasPrefix(strings.ToLower(mime), "video") { @@ -374,7 +400,9 @@ func publishReader(room *lksdk.Room, frameDuration := time.Second / time.Duration(fps) opts = append(opts, lksdk.ReaderTrackWithFrameDuration(frameDuration)) } + } + if isH26x { switch h26xStreamingFormat { case "annex-b": opts = append(opts, lksdk.ReaderTrackWithH26xStreamingFormat(lksdk.H26xStreamingFormatAnnexB)) @@ -390,13 +418,34 @@ func publishReader(room *lksdk.Room, if err != nil { return err } - pub, err = room.LocalParticipant.PublishTrack(track, &lksdk.TrackPublicationOptions{}) + pub, err = room.LocalParticipant.PublishTrack(track, &lksdk.TrackPublicationOptions{ + AttachUserTimestamp: isH26x && packetTrailerOpts.AttachUserTimestamp, + AttachFrameId: isH26x && packetTrailerOpts.AttachFrameID, + }) if err != nil { return err } return nil } +func isH26xFile(filename string) bool { + switch filepath.Ext(filename) { + case ".h264", ".h265": + return true + default: + return false + } +} + +func isH26xMime(mime string) bool { + switch strings.ToLower(mime) { + case strings.ToLower(webrtc.MimeTypeH264), strings.ToLower(webrtc.MimeTypeH265): + return true + default: + return false + } +} + // simulcastURLParts represents the parsed components of a simulcast URL type simulcastURLParts struct { codec string // "h264" or "h265" @@ -442,13 +491,16 @@ func parseSimulcastURL(url string) (*simulcastURLParts, error) { } // createSimulcastVideoTrack creates a simulcast video track from a TCP or Unix socket H.264/H.265 streams -func createSimulcastVideoTrack(urlParts *simulcastURLParts, quality livekit.VideoQuality, fps float64, h26xStreamingFormat string, onComplete func()) (*lksdk.LocalTrack, error) { +func createSimulcastVideoTrack(urlParts *simulcastURLParts, quality livekit.VideoQuality, fps float64, h26xStreamingFormat string, packetTrailerOpts packetTrailerPublishOptions, onComplete func()) (*lksdk.LocalTrack, error) { conn, err := net.Dial(urlParts.network, urlParts.address) if err != nil { return nil, fmt.Errorf("failed to connect to %s://%s: %w", urlParts.network, urlParts.address, err) } var opts []lksdk.ReaderSampleProviderOption + if packetTrailerOpts.Enabled() { + opts = append(opts, lksdk.ReaderTrackWithPacketTrailer(true)) + } // Add completion handler if provided if onComplete != nil { @@ -493,7 +545,7 @@ type simulcastLayer struct { } // handleSimulcastPublish handles publishing multiple H.264 streams as a simulcast track -func handleSimulcastPublish(room *lksdk.Room, urls []string, fps float64, h26xStreamingFormat string, onPublishComplete func(*lksdk.LocalTrackPublication)) error { +func handleSimulcastPublish(room *lksdk.Room, urls []string, fps float64, h26xStreamingFormat string, packetTrailerOpts packetTrailerPublishOptions, onPublishComplete func(*lksdk.LocalTrackPublication)) error { // Parse all URLs var layers []simulcastLayer for _, url := range urls { @@ -564,7 +616,7 @@ func handleSimulcastPublish(room *lksdk.Room, urls []string, fps float64, h26xSt } for _, layer := range layers { - track, err := createSimulcastVideoTrack(layer.parts, layer.quality, fps, h26xStreamingFormat, signalCompletion) + track, err := createSimulcastVideoTrack(layer.parts, layer.quality, fps, h26xStreamingFormat, packetTrailerOpts, signalCompletion) if err != nil { // Clean up any tracks we've already created for _, t := range tracks { @@ -580,7 +632,9 @@ func handleSimulcastPublish(room *lksdk.Room, urls []string, fps float64, h26xSt // Publish simulcast track var err error pub, err = room.LocalParticipant.PublishSimulcastTrack(tracks, &lksdk.TrackPublicationOptions{ - Name: "simulcast", + Name: "simulcast", + AttachUserTimestamp: packetTrailerOpts.AttachUserTimestamp, + AttachFrameId: packetTrailerOpts.AttachFrameID, }) if err != nil { // Clean up tracks on publish failure diff --git a/cmd/lk/room.go b/cmd/lk/room.go index 105bb772..48f8db60 100644 --- a/cmd/lk/room.go +++ b/cmd/lk/room.go @@ -155,7 +155,7 @@ var ( &cli.StringSliceFlag{ Name: "publish", TakesFile: true, - Usage: "`FILES` to publish as tracks to room (supports .h264, .ivf, .ogg). " + + Usage: "`FILES` to publish as tracks to room (supports .h264, .h265, .ivf, .ogg). " + "Can be used multiple times to publish multiple files. " + "Can publish from Unix or TCP socket using the format ':///' or '://' respectively. Valid codecs are \"h264\", \"h265\", \"vp8\", \"opus\". " + "For simulcast: use 2-3 h264:// or h265:// URLs with format ':///x' or ':///path/to//x' (all layers must use the same codec; quality determined by width order)", @@ -177,6 +177,10 @@ var ( Usage: "Format to use when reading H.264 from file or socket, \"annex-b\" OR \"length-prefixed\"", Value: "annex-b", }, + &cli.BoolFlag{ + Name: "publish-user-timestamp", + Usage: "When publishing H.264/H.265, attach LKTS user timestamp and frame ID packet trailers from embedded SEI metadata", + }, &cli.BoolFlag{ Name: "exit-after-publish", Usage: "When publishing, exit after file or stream is complete", @@ -1004,6 +1008,10 @@ func joinRoom(ctx context.Context, cmd *cli.Command) error { // Handle simulcast publishing fps := cmd.Float("fps") h26xStreamingFormat := cmd.String("h26x-streaming-format") + packetTrailerOpts := packetTrailerPublishOptions{ + AttachUserTimestamp: cmd.Bool("publish-user-timestamp"), + AttachFrameID: cmd.Bool("publish-user-timestamp"), + } onPublishComplete := func(pub *lksdk.LocalTrackPublication) { if exitAfterPublish { close(done) @@ -1015,13 +1023,17 @@ func joinRoom(ctx context.Context, cmd *cli.Command) error { } } - if err = handleSimulcastPublish(room, publishUrls, fps, h26xStreamingFormat, onPublishComplete); err != nil { + if err = handleSimulcastPublish(room, publishUrls, fps, h26xStreamingFormat, packetTrailerOpts, onPublishComplete); err != nil { return err } } else { // Handle single publish fps := cmd.Float("fps") h26xStreamingFormat := cmd.String("h26x-streaming-format") + packetTrailerOpts := packetTrailerPublishOptions{ + AttachUserTimestamp: cmd.Bool("publish-user-timestamp"), + AttachFrameID: cmd.Bool("publish-user-timestamp"), + } for _, pub := range publishUrls { onPublishComplete := func(pub *lksdk.LocalTrackPublication) { if exitAfterPublish { @@ -1033,7 +1045,7 @@ func joinRoom(ctx context.Context, cmd *cli.Command) error { _ = room.LocalParticipant.UnpublishTrack(pub.SID()) } } - if err = handlePublish(room, pub, fps, h26xStreamingFormat, onPublishComplete); err != nil { + if err = handlePublish(room, pub, fps, h26xStreamingFormat, packetTrailerOpts, onPublishComplete); err != nil { return err } } diff --git a/go.mod b/go.mod index 28027440..c30e99f0 100644 --- a/go.mod +++ b/go.mod @@ -19,14 +19,14 @@ require ( github.com/google/go-querystring v1.2.0 github.com/joho/godotenv v1.5.1 github.com/livekit/protocol v1.45.9-0.20260508203311-a249893d6a5d - github.com/livekit/server-sdk-go/v2 v2.16.2 + github.com/livekit/server-sdk-go/v2 v2.16.3 github.com/mattn/go-isatty v0.0.21 github.com/moby/patternmatcher v0.6.1 github.com/modelcontextprotocol/go-sdk v1.4.1 github.com/pelletier/go-toml v1.9.5 github.com/pion/rtcp v1.2.16 github.com/pion/rtp v1.10.1 - github.com/pion/webrtc/v4 v4.2.9 + github.com/pion/webrtc/v4 v4.2.11 github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.11.1 @@ -189,12 +189,12 @@ require ( github.com/pierrec/lz4/v4 v4.1.26 // indirect github.com/pion/datachannel v1.6.0 // indirect github.com/pion/dtls/v3 v3.1.2 // indirect - github.com/pion/ice/v4 v4.2.1 // indirect + github.com/pion/ice/v4 v4.2.2 // indirect github.com/pion/interceptor v0.1.44 // indirect github.com/pion/logging v0.2.4 // indirect github.com/pion/mdns/v2 v2.1.0 // indirect github.com/pion/randutil v0.1.0 // indirect - github.com/pion/sctp v1.9.3 // indirect + github.com/pion/sctp v1.9.4 // indirect github.com/pion/sdp/v3 v3.0.18 // indirect github.com/pion/srtp/v3 v3.0.10 // indirect github.com/pion/stun/v3 v3.1.1 // indirect diff --git a/go.sum b/go.sum index 18dd9cfa..13b8f5ec 100644 --- a/go.sum +++ b/go.sum @@ -372,6 +372,8 @@ github.com/livekit/psrpc v0.7.1 h1:ms37az0QTD3UXIWuUC5D/SkmKOlRMVRsI261eBWu/Vw= github.com/livekit/psrpc v0.7.1/go.mod h1:bZ4iHFQptTkbPnB0LasvRNu/OBYXEu1NA6O5BMFo9kk= github.com/livekit/server-sdk-go/v2 v2.16.2 h1:eQe24cka3X+5zUivezyL72nwtAJTWFXgibeiyJ/Jm+Y= github.com/livekit/server-sdk-go/v2 v2.16.2/go.mod h1:/HOUG9AXJeCbMCdtw0dr37AB+3xXUlj/OLeXS/0p7rA= +github.com/livekit/server-sdk-go/v2 v2.16.3 h1:WFR7TQDNTVFZX0UIvZInYggC0dvcbLLwC0/BOHH89+E= +github.com/livekit/server-sdk-go/v2 v2.16.3/go.mod h1:Ua6WRLYw8U+27pm+FPN68ogW+KsMXTQ9tPVGfTPjDCg= github.com/lucasb-eyer/go-colorful v1.4.0 h1:UtrWVfLdarDgc44HcS7pYloGHJUjHV/4FwW4TvVgFr4= github.com/lucasb-eyer/go-colorful v1.4.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= github.com/magefile/mage v1.17.0 h1:dS4tkq997Ism03akafC8509iqDjeE7TNTexI25Y7sXM= @@ -454,6 +456,8 @@ github.com/pion/dtls/v3 v3.1.2 h1:gqEdOUXLtCGW+afsBLO0LtDD8GnuBBjEy6HRtyofZTc= github.com/pion/dtls/v3 v3.1.2/go.mod h1:Hw/igcX4pdY69z1Hgv5x7wJFrUkdgHwAn/Q/uo7YHRo= github.com/pion/ice/v4 v4.2.1 h1:XPRYXaLiFq3LFDG7a7bMrmr3mFr27G/gtXN3v/TVfxY= github.com/pion/ice/v4 v4.2.1/go.mod h1:2quLV1S5v1tAx3VvAJaH//KGitRXvo4RKlX6D3tnN+c= +github.com/pion/ice/v4 v4.2.2 h1:dQJzzcgTFHDYyV3BoCfjPeX+JEtr58BWPi4PGyo6Vjg= +github.com/pion/ice/v4 v4.2.2/go.mod h1:2quLV1S5v1tAx3VvAJaH//KGitRXvo4RKlX6D3tnN+c= github.com/pion/interceptor v0.1.44 h1:sNlZwM8dWXU9JQAkJh8xrarC0Etn8Oolcniukmuy0/I= github.com/pion/interceptor v0.1.44/go.mod h1:4atVlBkcgXuUP+ykQF0qOCGU2j7pQzX2ofvPRFsY5RY= github.com/pion/logging v0.2.4 h1:tTew+7cmQ+Mc1pTBLKH2puKsOvhm32dROumOZ655zB8= @@ -468,6 +472,8 @@ github.com/pion/rtp v1.10.1 h1:xP1prZcCTUuhO2c83XtxyOHJteISg6o8iPsE2acaMtA= github.com/pion/rtp v1.10.1/go.mod h1:rF5nS1GqbR7H/TCpKwylzeq6yDM+MM6k+On5EgeThEM= github.com/pion/sctp v1.9.3 h1:tjuOX9K/U4udMR2I7QDqr4sLE0tFzegtou7OF4a7p8A= github.com/pion/sctp v1.9.3/go.mod h1:N20Dq6LY+JvJDAh9VVh1JELngb2rQ8dPgds5yBWiPgw= +github.com/pion/sctp v1.9.4 h1:cMxEu0F5tbP4qH07bKf1Zjf4rUih9LIo0qQt424e258= +github.com/pion/sctp v1.9.4/go.mod h1:N20Dq6LY+JvJDAh9VVh1JELngb2rQ8dPgds5yBWiPgw= github.com/pion/sdp/v3 v3.0.18 h1:l0bAXazKHpepazVdp+tPYnrsy9dfh7ZbT8DxesH5ZnI= github.com/pion/sdp/v3 v3.0.18/go.mod h1:ZREGo6A9ZygQ9XkqAj5xYCQtQpif0i6Pa81HOiAdqQ8= github.com/pion/srtp/v3 v3.0.10 h1:tFirkpBb3XccP5VEXLi50GqXhv5SKPxqrdlhDCJlZrQ= @@ -482,6 +488,8 @@ github.com/pion/turn/v4 v4.1.4 h1:EU11yMXKIsK43FhcUnjLlrhE4nboHZq+TXBIi3QpcxQ= github.com/pion/turn/v4 v4.1.4/go.mod h1:ES1DXVFKnOhuDkqn9hn5VJlSWmZPaRJLyBXoOeO/BmQ= github.com/pion/webrtc/v4 v4.2.9 h1:DZIh1HAhPIL3RvwEDFsmL5hfPSLEpxsQk9/Jir2vkJE= github.com/pion/webrtc/v4 v4.2.9/go.mod h1:9EmLZve0H76eTzf8v2FmchZ6tcBXtDgpfTEu+drW6SY= +github.com/pion/webrtc/v4 v4.2.11 h1:QUX1QZKlNIn4O7U5JxLPGP0sV5RTncZkzu9SPR3jVNU= +github.com/pion/webrtc/v4 v4.2.11/go.mod h1:s/rAiyy77GyRFrZMx+Ls6aua26dIBPudH8/ZHYbIRWY= github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ= github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=