From 1824b2cfe2124ca15d436facf9be26d0bd881ad0 Mon Sep 17 00:00:00 2001 From: Ben Cairns Date: Mon, 29 Jun 2026 23:15:56 -0500 Subject: [PATCH 01/13] feat(pim): add PIM Register message serialization --- client/doublezerod/internal/pim/pim.go | 77 +++++++++++++++++++ .../doublezerod/internal/pim/register_test.go | 49 ++++++++++++ 2 files changed, 126 insertions(+) create mode 100644 client/doublezerod/internal/pim/register_test.go diff --git a/client/doublezerod/internal/pim/pim.go b/client/doublezerod/internal/pim/pim.go index 15e6f21e05..99549d6e12 100644 --- a/client/doublezerod/internal/pim/pim.go +++ b/client/doublezerod/internal/pim/pim.go @@ -5,6 +5,7 @@ import ( "errors" "log/slog" "net" + "time" "github.com/google/gopacket" "github.com/google/gopacket/layers" @@ -740,3 +741,79 @@ func bytesFromNetIP(ip net.IP, addrFamily uint8) (bytes []byte, addrLen uint8) { } return bytes, addrLen } + +// DefaultRegisterInterval is the default PIM Register beacon interval. It is +// well under the RP's register keepalive (~210s) so the source stays +// originated, and slow enough to keep control-plane (CoPP) load negligible. +const DefaultRegisterInterval = 60 * time.Second + +// RegisterMessage is a PIM-SM Register (type 1): 4 bytes of flags (Border, +// Null, reserved) followed by the encapsulated original multicast IP datagram. +type RegisterMessage struct { + layers.BaseLayer + Border bool + Null bool + Data []byte // the full encapsulated IP datagram +} + +func (p *RegisterMessage) LayerType() gopacket.LayerType { return gopacket.LayerTypePayload } + +func (p *RegisterMessage) SerializeTo(b gopacket.SerializeBuffer, opts gopacket.SerializeOptions) error { + flags, err := b.PrependBytes(4) + if err != nil { + return err + } + var word uint32 + if p.Border { + word |= 1 << 31 + } + if p.Null { + word |= 1 << 30 + } + binary.BigEndian.PutUint32(flags, word) + + data, err := b.AppendBytes(len(p.Data)) + if err != nil { + return err + } + copy(data, p.Data) + return nil +} + +// constructRegisterMessage builds a PIM Register encapsulating a UDP datagram +// from innerSrc to group on dport carrying payload. The checksum field is left +// zero; the sender fills it over the first 8 bytes only (RFC 7761 4.9.1). +func constructRegisterMessage(innerSrc, group net.IP, dport int, payload []byte) (gopacket.SerializeBuffer, error) { + inner := gopacket.NewSerializeBuffer() + ip := &layers.IPv4{ + Version: 4, + IHL: 5, + TTL: 32, + Protocol: layers.IPProtocolUDP, + SrcIP: innerSrc.To4(), + DstIP: group.To4(), + } + udp := &layers.UDP{ + SrcPort: layers.UDPPort(dport), + DstPort: layers.UDPPort(dport), + } + if err := udp.SetNetworkLayerForChecksum(ip); err != nil { + return nil, err + } + if err := gopacket.SerializeLayers(inner, + gopacket.SerializeOptions{FixLengths: true, ComputeChecksums: true}, + ip, udp, gopacket.Payload(payload)); err != nil { + return nil, err + } + + buf := gopacket.NewSerializeBuffer() + reg := &RegisterMessage{Data: inner.Bytes()} + if err := reg.SerializeTo(buf, gopacket.SerializeOptions{}); err != nil { + return nil, err + } + pimHeader := &PIMMessage{Header: PIMHeader{Version: 2, Type: Register, Checksum: 0x0000}} + if err := pimHeader.SerializeTo(buf, gopacket.SerializeOptions{}); err != nil { + return nil, err + } + return buf, nil +} diff --git a/client/doublezerod/internal/pim/register_test.go b/client/doublezerod/internal/pim/register_test.go new file mode 100644 index 0000000000..521694eb4c --- /dev/null +++ b/client/doublezerod/internal/pim/register_test.go @@ -0,0 +1,49 @@ +package pim + +import ( + "net" + "testing" + + "github.com/google/gopacket" + "github.com/google/gopacket/layers" +) + +func TestConstructRegisterMessage(t *testing.T) { + innerSrc := net.IPv4(148, 51, 122, 203) + group := net.IPv4(233, 84, 178, 5) + payload := []byte{0x44, 0x5A, 0x00, 0x01} + + buf, err := constructRegisterMessage(innerSrc, group, 5765, payload) + if err != nil { + t.Fatalf("constructRegisterMessage: %v", err) + } + b := buf.Bytes() + + // PIM common header: version 2, type Register (0x01) => first byte 0x21. + if b[0] != 0x21 { + t.Fatalf("pim header byte0 = 0x%02x, want 0x21", b[0]) + } + // Register flags word (bytes 4..7): Border=0, Null=0. + if b[4] != 0x00 { + t.Fatalf("register flags high byte = 0x%02x, want 0x00", b[4]) + } + // Bytes 8.. are the encapsulated IPv4/UDP datagram. + pkt := gopacket.NewPacket(b[8:], layers.LayerTypeIPv4, gopacket.Default) + ip, ok := pkt.Layer(layers.LayerTypeIPv4).(*layers.IPv4) + if !ok { + t.Fatal("no encapsulated IPv4 layer") + } + if !ip.SrcIP.Equal(innerSrc) || !ip.DstIP.Equal(group) { + t.Fatalf("inner IP src/dst = %s/%s, want %s/%s", ip.SrcIP, ip.DstIP, innerSrc, group) + } + if ip.Protocol != layers.IPProtocolUDP { + t.Fatalf("inner proto = %v, want UDP", ip.Protocol) + } + udp, ok := pkt.Layer(layers.LayerTypeUDP).(*layers.UDP) + if !ok { + t.Fatal("no encapsulated UDP layer") + } + if udp.DstPort != 5765 { + t.Fatalf("inner UDP dport = %d, want 5765", udp.DstPort) + } +} From 51a81bcc7749210c30c09a055cdab90f94b914fc Mon Sep 17 00:00:00 2001 From: Ben Cairns Date: Mon, 29 Jun 2026 23:20:57 -0500 Subject: [PATCH 02/13] feat(pim): add RegisterSender beacon Implements RegisterSender: periodic PIM Register message sender that beacons publisher groups to the RP so the RP can originate the source into MSDP. Includes mock-based unit test exercising sendRegister directly. Co-Authored-By: Claude Sonnet 4.6 --- client/doublezerod/internal/pim/register.go | 145 ++++++++++++++++++ .../doublezerod/internal/pim/register_test.go | 63 ++++++++ 2 files changed, 208 insertions(+) create mode 100644 client/doublezerod/internal/pim/register.go diff --git a/client/doublezerod/internal/pim/register.go b/client/doublezerod/internal/pim/register.go new file mode 100644 index 0000000000..15354669d8 --- /dev/null +++ b/client/doublezerod/internal/pim/register.go @@ -0,0 +1,145 @@ +package pim + +import ( + "encoding/binary" + "fmt" + "log/slog" + "math/rand" + "net" + "sync" + "time" + + "golang.org/x/net/ipv4" +) + +// RegisterSender periodically sends PIM Register messages (a beacon) for the +// publisher's groups to the RP, so the RP originates the source into MSDP. +// It ignores Register-Stop: there is no inbound path. +type RegisterSender struct { + done chan struct{} + wg *sync.WaitGroup + mu sync.Mutex + conn RawConner + iface string + srcOverlay net.IP + innerSrc net.IP + rp net.IP + dport int + payload []byte + groups []net.IP + updateCh chan []net.IP +} + +func NewRegisterSender() *RegisterSender { + return &RegisterSender{done: make(chan struct{}), updateCh: make(chan []net.IP)} +} + +func (s *RegisterSender) Start(iface string, srcOverlay, innerSrc net.IP, groups []net.IP, rp net.IP, dport int, payload []byte, interval time.Duration) error { + c, err := net.ListenPacket("ip4:103", "0.0.0.0") + if err != nil { + return fmt.Errorf("register: failed to listen: %v", err) + } + r, err := ipv4.NewRawConn(c) + if err != nil { + return fmt.Errorf("register: failed to create raw conn: %v", err) + } + if err := r.SetControlMessage(ipv4.FlagInterface, true); err != nil { + return fmt.Errorf("register: failed to enable control message: %v", err) + } + intf, err := net.InterfaceByName(iface) + if err != nil { + return fmt.Errorf("register: failed to get interface: %v", err) + } + s.srcOverlay = srcOverlay + s.innerSrc = innerSrc + s.rp = rp + s.dport = dport + s.payload = payload + return s.startWithConn(r, intf, groups, interval) +} + +func (s *RegisterSender) startWithConn(conn RawConner, intf *net.Interface, groups []net.IP, interval time.Duration) error { + s.conn = conn + s.iface = intf.Name + s.groups = groups + s.wg = &sync.WaitGroup{} + s.wg.Add(1) + go func() { + defer conn.Close() + defer s.wg.Done() + + // Stagger the first send within the interval so publishers across the + // fleet do not synchronize into CoPP-stressing bursts. + jitter := time.Duration(rand.Int63n(int64(interval))) + select { + case <-time.After(jitter): + case <-s.done: + return + } + + s.sendAll(intf) + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + s.sendAll(intf) + case g := <-s.updateCh: + s.mu.Lock() + s.groups = g + s.mu.Unlock() + case <-s.done: + return + } + } + }() + return nil +} + +func (s *RegisterSender) sendAll(intf *net.Interface) { + s.mu.Lock() + groups := s.groups + s.mu.Unlock() + for _, g := range groups { + if err := s.sendRegister(intf, g); err != nil { + slog.Error("failed to send pim register", "group", g, "error", err) + } + } +} + +func (s *RegisterSender) sendRegister(intf *net.Interface, group net.IP) error { + buf, err := constructRegisterMessage(s.innerSrc, group, s.dport, s.payload) + if err != nil { + return err + } + b := buf.Bytes() + // PIM Register checksum is computed over the first 8 bytes only + // (PIM header + flags), excluding the encapsulated datagram (RFC 7761 4.9.1). + binary.BigEndian.PutUint16(b[2:4], Checksum(b[:8])) + + iph := &ipv4.Header{ + Version: 4, + Len: ipv4.HeaderLen, + TTL: 32, + Protocol: 103, + Src: s.srcOverlay, + Dst: s.rp, + TotalLen: ipv4.HeaderLen + len(b), + } + // Pin egress to the GRE tunnel interface so no route for the RP is needed. + cm := &ipv4.ControlMessage{IfIndex: intf.Index} + return s.conn.WriteTo(iph, b, cm) +} + +func (s *RegisterSender) UpdateGroups(groups []net.IP) error { + s.updateCh <- groups + return nil +} + +func (s *RegisterSender) Close() error { + close(s.done) + if s.wg != nil { + s.wg.Wait() + } + return nil +} diff --git a/client/doublezerod/internal/pim/register_test.go b/client/doublezerod/internal/pim/register_test.go index 521694eb4c..f566900910 100644 --- a/client/doublezerod/internal/pim/register_test.go +++ b/client/doublezerod/internal/pim/register_test.go @@ -2,10 +2,12 @@ package pim import ( "net" + "sync" "testing" "github.com/google/gopacket" "github.com/google/gopacket/layers" + "golang.org/x/net/ipv4" ) func TestConstructRegisterMessage(t *testing.T) { @@ -47,3 +49,64 @@ func TestConstructRegisterMessage(t *testing.T) { t.Fatalf("inner UDP dport = %d, want 5765", udp.DstPort) } } + +type mockRawConn struct { + mu sync.Mutex + calls []writeCall +} + +type writeCall struct { + h *ipv4.Header + b []byte + cm *ipv4.ControlMessage +} + +func (m *mockRawConn) WriteTo(h *ipv4.Header, b []byte, cm *ipv4.ControlMessage) error { + m.mu.Lock() + defer m.mu.Unlock() + cp := make([]byte, len(b)) + copy(cp, b) + m.calls = append(m.calls, writeCall{h: h, b: cp, cm: cm}) + return nil +} +func (m *mockRawConn) Close() error { return nil } +func (m *mockRawConn) SetMulticastInterface(*net.Interface) error { return nil } +func (m *mockRawConn) SetControlMessage(ipv4.ControlFlags, bool) error { return nil } + +func TestRegisterSenderSendsRegisterToRP(t *testing.T) { + mock := &mockRawConn{} + s := NewRegisterSender() + s.conn = mock + s.innerSrc = net.IPv4(148, 51, 122, 203) + s.srcOverlay = net.IPv4(169, 254, 4, 58) + s.rp = RpAddress + s.dport = 5765 + s.payload = []byte{0x44, 0x5A, 0x00, 0x01} + + intf := &net.Interface{Index: 7, Name: "doublezero1"} + group := net.IPv4(233, 84, 178, 5) + + if err := s.sendRegister(intf, group); err != nil { + t.Fatalf("sendRegister: %v", err) + } + if len(mock.calls) != 1 { + t.Fatalf("got %d writes, want 1", len(mock.calls)) + } + c := mock.calls[0] + if !c.h.Dst.Equal(RpAddress) { + t.Fatalf("dst = %s, want RP %s", c.h.Dst, RpAddress) + } + if c.h.Protocol != 103 { + t.Fatalf("proto = %d, want 103", c.h.Protocol) + } + if c.cm.IfIndex != 7 { + t.Fatalf("ifindex = %d, want 7 (egress pinned to tunnel)", c.cm.IfIndex) + } + if c.b[0] != 0x21 { + t.Fatalf("pim byte0 = 0x%02x, want 0x21", c.b[0]) + } + // Checksum is non-zero and computed over the first 8 bytes only. + if c.b[2] == 0 && c.b[3] == 0 { + t.Fatal("pim checksum not set") + } +} From 9f8326607c7e4c59cb7a042c945e414dea9a9484 Mon Sep 17 00:00:00 2001 From: Ben Cairns Date: Mon, 29 Jun 2026 23:24:48 -0500 Subject: [PATCH 03/13] fix(pim): harden RegisterSender Close/UpdateGroups, conn cleanup, checksum test Co-Authored-By: Claude Sonnet 4.6 --- client/doublezerod/internal/pim/register.go | 11 +++++++++-- client/doublezerod/internal/pim/register_test.go | 12 +++++++++--- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/client/doublezerod/internal/pim/register.go b/client/doublezerod/internal/pim/register.go index 15354669d8..e6ad509cea 100644 --- a/client/doublezerod/internal/pim/register.go +++ b/client/doublezerod/internal/pim/register.go @@ -17,6 +17,7 @@ import ( // It ignores Register-Stop: there is no inbound path. type RegisterSender struct { done chan struct{} + closeOnce sync.Once wg *sync.WaitGroup mu sync.Mutex conn RawConner @@ -41,13 +42,16 @@ func (s *RegisterSender) Start(iface string, srcOverlay, innerSrc net.IP, groups } r, err := ipv4.NewRawConn(c) if err != nil { + c.Close() return fmt.Errorf("register: failed to create raw conn: %v", err) } if err := r.SetControlMessage(ipv4.FlagInterface, true); err != nil { + r.Close() return fmt.Errorf("register: failed to enable control message: %v", err) } intf, err := net.InterfaceByName(iface) if err != nil { + r.Close() return fmt.Errorf("register: failed to get interface: %v", err) } s.srcOverlay = srcOverlay @@ -132,12 +136,15 @@ func (s *RegisterSender) sendRegister(intf *net.Interface, group net.IP) error { } func (s *RegisterSender) UpdateGroups(groups []net.IP) error { - s.updateCh <- groups + select { + case s.updateCh <- groups: + case <-s.done: + } return nil } func (s *RegisterSender) Close() error { - close(s.done) + s.closeOnce.Do(func() { close(s.done) }) if s.wg != nil { s.wg.Wait() } diff --git a/client/doublezerod/internal/pim/register_test.go b/client/doublezerod/internal/pim/register_test.go index f566900910..718d313b61 100644 --- a/client/doublezerod/internal/pim/register_test.go +++ b/client/doublezerod/internal/pim/register_test.go @@ -1,6 +1,7 @@ package pim import ( + "encoding/binary" "net" "sync" "testing" @@ -105,8 +106,13 @@ func TestRegisterSenderSendsRegisterToRP(t *testing.T) { if c.b[0] != 0x21 { t.Fatalf("pim byte0 = 0x%02x, want 0x21", c.b[0]) } - // Checksum is non-zero and computed over the first 8 bytes only. - if c.b[2] == 0 && c.b[3] == 0 { - t.Fatal("pim checksum not set") + // Checksum is computed over the first 8 bytes only (RFC 7761 4.9.1), + // with the checksum field itself zeroed during computation. + hdr := make([]byte, 8) + copy(hdr, c.b[:8]) + hdr[2], hdr[3] = 0, 0 + want := Checksum(hdr) + if got := binary.BigEndian.Uint16(c.b[2:4]); got != want { + t.Fatalf("pim checksum = 0x%04x, want 0x%04x", got, want) } } From e2281548ebea2b95ff8326605e1329cff0e74cff Mon Sep 17 00:00:00 2001 From: Ben Cairns Date: Mon, 29 Jun 2026 23:28:34 -0500 Subject: [PATCH 04/13] feat(api): add MulticastRpAddress to ProvisionRequest --- client/doublezerod/internal/api/requests.go | 7 +++++++ client/doublezerod/internal/api/requests_test.go | 11 +++++++++++ 2 files changed, 18 insertions(+) diff --git a/client/doublezerod/internal/api/requests.go b/client/doublezerod/internal/api/requests.go index 73611aa1f1..647dd120b2 100644 --- a/client/doublezerod/internal/api/requests.go +++ b/client/doublezerod/internal/api/requests.go @@ -84,6 +84,7 @@ type ProvisionRequest struct { DoubleZeroPrefixes []*net.IPNet `json:"doublezero_prefixes"` MulticastSubGroups []net.IP `json:"mcast_sub_groups"` MulticastPubGroups []net.IP `json:"mcast_pub_groups"` + MulticastRpAddress net.IP `json:"mcast_rp_address"` BgpLocalAsn uint32 `json:"bgp_local_asn"` BgpRemoteAsn uint32 `json:"bgp_remote_asn"` } @@ -191,6 +192,9 @@ func (p *ProvisionRequest) InfraEqual(other *ProvisionRequest) bool { if p.BgpLocalAsn != other.BgpLocalAsn || p.BgpRemoteAsn != other.BgpRemoteAsn { return false } + if !p.MulticastRpAddress.Equal(other.MulticastRpAddress) { + return false + } return ipNetSlicesEqual(p.DoubleZeroPrefixes, other.DoubleZeroPrefixes) } @@ -270,6 +274,9 @@ func (p *ProvisionRequest) Validate() error { if p.BgpRemoteAsn == 0 { p.BgpRemoteAsn = 65001 } + if p.MulticastRpAddress == nil { + p.MulticastRpAddress = net.IPv4(10, 0, 0, 0) + } return nil } diff --git a/client/doublezerod/internal/api/requests_test.go b/client/doublezerod/internal/api/requests_test.go index 84b3299646..355c286cb8 100644 --- a/client/doublezerod/internal/api/requests_test.go +++ b/client/doublezerod/internal/api/requests_test.go @@ -161,6 +161,7 @@ func testFullProvisionRequest() ProvisionRequest { BgpRemoteAsn: 65001, MulticastPubGroups: []net.IP{net.IPv4(239, 0, 0, 1)}, MulticastSubGroups: []net.IP{net.IPv4(239, 0, 0, 2)}, + MulticastRpAddress: net.IPv4(10, 0, 0, 0), } } @@ -198,6 +199,16 @@ func fieldIndex(typ reflect.Type, name string) int { return -1 } +func TestProvisionRequestDefaultsRpAddress(t *testing.T) { + p := &ProvisionRequest{} + if err := p.Validate(); err != nil { + t.Fatalf("Validate: %v", err) + } + if !p.MulticastRpAddress.Equal(net.IPv4(10, 0, 0, 0)) { + t.Fatalf("MulticastRpAddress = %v, want 10.0.0.0", p.MulticastRpAddress) + } +} + func TestIPSetDiff(t *testing.T) { tests := []struct { name string From 835e2bf300c6c24ee2a5e6fd868e6142696924cd Mon Sep 17 00:00:00 2001 From: Ben Cairns Date: Mon, 29 Jun 2026 23:37:28 -0500 Subject: [PATCH 05/13] feat(services): start PIM Register beacon for multicast publishers - Export heartbeat.HeartbeatPayload (was unexported heartbeatPayload) - Add RegisterWriter interface to services/base.go - Wire register field into MulticastService: ctor param, Setup start, Teardown close, UpdateGroups update - Add mockRegister + TestMulticastSetupStartsRegisterForPublisher; remove manager import from test file to avoid arity breakage in Task 5 Co-Authored-By: Claude Sonnet 4.6 --- .../internal/multicast/heartbeat.go | 6 +- client/doublezerod/internal/services/base.go | 6 + .../internal/services/multicast.go | 14 +- .../internal/services/services_test.go | 142 +++++++++++++----- 4 files changed, 125 insertions(+), 43 deletions(-) diff --git a/client/doublezerod/internal/multicast/heartbeat.go b/client/doublezerod/internal/multicast/heartbeat.go index 2553022936..59d3d7eb87 100644 --- a/client/doublezerod/internal/multicast/heartbeat.go +++ b/client/doublezerod/internal/multicast/heartbeat.go @@ -22,9 +22,9 @@ const ( DefaultHeartbeatInterval = 10 * time.Second ) -// heartbeatPayload is the fixed payload sent in each heartbeat packet. +// HeartbeatPayload is the fixed payload sent in each heartbeat packet. // 0x44, 0x5A = "DZ", followed by a 2-byte version (0x00, 0x01). -var heartbeatPayload = []byte{0x44, 0x5A, 0x00, 0x01} +var HeartbeatPayload = []byte{0x44, 0x5A, 0x00, 0x01} // PacketConner abstracts the UDP multicast connection for testing. type PacketConner interface { @@ -135,7 +135,7 @@ func (h *HeartbeatSender) startWithConn(p PacketConner, intf *net.Interface, gro func sendHeartbeats(p PacketConner, dsts []*net.UDPAddr) { for _, dst := range dsts { - if _, err := p.WriteTo(heartbeatPayload, nil, dst); err != nil { + if _, err := p.WriteTo(HeartbeatPayload, nil, dst); err != nil { slog.Error("failed to send heartbeat", "dst", dst, "error", err) } } diff --git a/client/doublezerod/internal/services/base.go b/client/doublezerod/internal/services/base.go index e7b02c5eec..51da7dbc28 100644 --- a/client/doublezerod/internal/services/base.go +++ b/client/doublezerod/internal/services/base.go @@ -27,6 +27,12 @@ type HeartbeatWriter interface { Close() error } +type RegisterWriter interface { + Start(iface string, srcOverlay, innerSrc net.IP, groups []net.IP, rp net.IP, dport int, payload []byte, interval time.Duration) error + UpdateGroups(groups []net.IP) error + Close() error +} + type BGPReaderWriter interface { AddPeer(*bgp.PeerConfig, []bgp.NLRI) error DeletePeer(net.IP) error diff --git a/client/doublezerod/internal/services/multicast.go b/client/doublezerod/internal/services/multicast.go index 70228fe5be..f10fd288b5 100644 --- a/client/doublezerod/internal/services/multicast.go +++ b/client/doublezerod/internal/services/multicast.go @@ -11,6 +11,7 @@ import ( "github.com/malbeclabs/doublezero/client/doublezerod/internal/api" "github.com/malbeclabs/doublezero/client/doublezerod/internal/bgp" "github.com/malbeclabs/doublezero/client/doublezerod/internal/multicast" + "github.com/malbeclabs/doublezero/client/doublezerod/internal/pim" "github.com/malbeclabs/doublezero/client/doublezerod/internal/routing" "golang.org/x/net/ipv4" "golang.org/x/sys/unix" @@ -21,6 +22,7 @@ type MulticastService struct { nl routing.Netlinker pim PIMWriter heartbeat HeartbeatWriter + register RegisterWriter Tunnel *routing.Tunnel DoubleZeroAddr net.IP MulticastPubGroups []net.IP @@ -31,12 +33,13 @@ type MulticastService struct { func (s *MulticastService) UserType() api.UserType { return api.UserTypeMulticast } func (s *MulticastService) ServiceType() ServiceType { return ServiceTypeMulticast } -func NewMulticastService(bgp BGPReaderWriter, nl routing.Netlinker, pim PIMWriter, heartbeat HeartbeatWriter) *MulticastService { +func NewMulticastService(bgp BGPReaderWriter, nl routing.Netlinker, pim PIMWriter, heartbeat HeartbeatWriter, register RegisterWriter) *MulticastService { return &MulticastService{ bgp: bgp, nl: nl, pim: pim, heartbeat: heartbeat, + register: register, } } @@ -92,6 +95,9 @@ func (s *MulticastService) Setup(p *api.ProvisionRequest) error { if err := s.heartbeat.Start(tun.Name, p.DoubleZeroIP, p.MulticastPubGroups, multicast.DefaultHeartbeatTTL, multicast.DefaultHeartbeatInterval); err != nil { return fmt.Errorf("error starting heartbeat sender: %v", err) } + if err := s.register.Start(tun.Name, s.Tunnel.LocalOverlay, p.DoubleZeroIP, p.MulticastPubGroups, p.MulticastRpAddress, multicast.HeartbeatPort, multicast.HeartbeatPayload, pim.DefaultRegisterInterval); err != nil { + return fmt.Errorf("error starting register sender: %v", err) + } } if isSubscriber { @@ -162,6 +168,9 @@ func (s *MulticastService) Teardown() error { if err := s.heartbeat.Close(); err != nil { slog.Error("error stopping heartbeat sender", "error", err) } + if err := s.register.Close(); err != nil { + slog.Error("error stopping register sender", "error", err) + } } if s.isSubscriber() { @@ -266,6 +275,9 @@ func (s *MulticastService) UpdateGroups(newPR *api.ProvisionRequest) error { if err := s.heartbeat.UpdateGroups(newPR.MulticastPubGroups); err != nil { return fmt.Errorf("error updating heartbeat groups: %v", err) } + if err := s.register.UpdateGroups(newPR.MulticastPubGroups); err != nil { + return fmt.Errorf("error updating register groups: %v", err) + } } // Update subscriber routes diff --git a/client/doublezerod/internal/services/services_test.go b/client/doublezerod/internal/services/services_test.go index 2de0960303..550c04d17b 100644 --- a/client/doublezerod/internal/services/services_test.go +++ b/client/doublezerod/internal/services/services_test.go @@ -1,6 +1,7 @@ package services_test import ( + "fmt" "net" "syscall" "testing" @@ -10,12 +11,42 @@ import ( "github.com/jwhited/corebgp" "github.com/malbeclabs/doublezero/client/doublezerod/internal/api" "github.com/malbeclabs/doublezero/client/doublezerod/internal/bgp" - "github.com/malbeclabs/doublezero/client/doublezerod/internal/manager" + "github.com/malbeclabs/doublezero/client/doublezerod/internal/multicast" "github.com/malbeclabs/doublezero/client/doublezerod/internal/pim" "github.com/malbeclabs/doublezero/client/doublezerod/internal/routing" + "github.com/malbeclabs/doublezero/client/doublezerod/internal/services" "golang.org/x/sys/unix" ) +// Provisioner mirrors the manager.Provisioner interface so we can test +// services without importing the manager package (which calls +// NewMulticastService with the old arity while Task 5 is pending). +type Provisioner interface { + Setup(*api.ProvisionRequest) error + Teardown() error + Status() (*api.StatusResponse, error) + ProvisionRequest() *api.ProvisionRequest +} + +// createTestService constructs the appropriate service for the given user type, +// replacing manager.CreateService so that the test package does not import +// manager (which currently fails to compile because it calls NewMulticastService +// with the old arity — that is fixed in Task 5). +func createTestService(u api.UserType, bgp services.BGPReaderWriter, nl routing.Netlinker, p services.PIMWriter, hb services.HeartbeatWriter) (Provisioner, error) { + switch u { + case api.UserTypeIBRL: + return services.NewIBRLService(bgp, nl), nil + case api.UserTypeIBRLWithAllocatedIP: + return services.NewIBRLServiceWithAllocatedAddress(bgp, nl), nil + case api.UserTypeEdgeFiltering: + return services.NewEdgeFilteringService(bgp, nl), nil + case api.UserTypeMulticast: + return services.NewMulticastService(bgp, nl, p, hb, &mockRegister{}), nil + default: + return nil, fmt.Errorf("unsupported user type: %s", u) + } +} + type MockBgpServer struct { deletedPeer net.IP addPeer *bgp.PeerConfig @@ -153,6 +184,35 @@ func (m *MockHeartbeatSender) Close() error { return nil } +type mockRegister struct { + started bool + iface string + rp net.IP + groups []net.IP + dport int + closed bool + updated [][]net.IP +} + +func (m *mockRegister) Start(iface string, srcOverlay, innerSrc net.IP, groups []net.IP, rp net.IP, dport int, payload []byte, interval time.Duration) error { + m.started = true + m.iface = iface + m.rp = rp + m.groups = groups + m.dport = dport + return nil +} + +func (m *mockRegister) UpdateGroups(groups []net.IP) error { + m.updated = append(m.updated, groups) + return nil +} + +func (m *mockRegister) Close() error { + m.closed = true + return nil +} + func TestServices(t *testing.T) { tests := []struct { name string @@ -617,7 +677,7 @@ func TestServices(t *testing.T) { mockPim := &MockPIMServer{} mockHeartbeat := &MockHeartbeatSender{} - svc, err := manager.CreateService(tt.userType, mockBgp, mockNetlink, mockPim, mockHeartbeat) + svc, err := createTestService(tt.userType, mockBgp, mockNetlink, mockPim, mockHeartbeat) if err != nil { t.Fatalf("failed to create service: %v", err) } @@ -699,10 +759,7 @@ func TestMulticastService_UpdateGroups_AddPubGroup(t *testing.T) { mockPim := &MockPIMServer{} mockHeartbeat := &MockHeartbeatSender{} - svc, err := manager.CreateService(api.UserTypeMulticast, mockBgp, mockNetlink, mockPim, mockHeartbeat) - if err != nil { - t.Fatalf("failed to create service: %v", err) - } + svc := services.NewMulticastService(mockBgp, mockNetlink, mockPim, mockHeartbeat, &mockRegister{}) pr := &api.ProvisionRequest{ UserType: api.UserTypeMulticast, @@ -743,13 +800,7 @@ func TestMulticastService_UpdateGroups_AddPubGroup(t *testing.T) { BgpRemoteAsn: 65001, } - gu, ok := svc.(interface { - UpdateGroups(*api.ProvisionRequest) error - }) - if !ok { - t.Fatal("service does not implement UpdateGroups") - } - if err := gu.UpdateGroups(newPR); err != nil { + if err := svc.UpdateGroups(newPR); err != nil { t.Fatalf("UpdateGroups failed: %v", err) } @@ -789,10 +840,7 @@ func TestMulticastService_UpdateGroups_RemovePubGroup(t *testing.T) { mockPim := &MockPIMServer{} mockHeartbeat := &MockHeartbeatSender{} - svc, err := manager.CreateService(api.UserTypeMulticast, mockBgp, mockNetlink, mockPim, mockHeartbeat) - if err != nil { - t.Fatalf("failed to create service: %v", err) - } + svc := services.NewMulticastService(mockBgp, mockNetlink, mockPim, mockHeartbeat, &mockRegister{}) pr := &api.ProvisionRequest{ UserType: api.UserTypeMulticast, @@ -830,13 +878,7 @@ func TestMulticastService_UpdateGroups_RemovePubGroup(t *testing.T) { BgpRemoteAsn: 65001, } - gu, ok := svc.(interface { - UpdateGroups(*api.ProvisionRequest) error - }) - if !ok { - t.Fatal("service does not implement UpdateGroups") - } - if err := gu.UpdateGroups(newPR); err != nil { + if err := svc.UpdateGroups(newPR); err != nil { t.Fatalf("UpdateGroups failed: %v", err) } @@ -855,10 +897,7 @@ func TestMulticastService_UpdateGroups_PublisherRoleTransition(t *testing.T) { mockPim := &MockPIMServer{} mockHeartbeat := &MockHeartbeatSender{} - svc, err := manager.CreateService(api.UserTypeMulticast, mockBgp, mockNetlink, mockPim, mockHeartbeat) - if err != nil { - t.Fatalf("failed to create service: %v", err) - } + svc := services.NewMulticastService(mockBgp, mockNetlink, mockPim, mockHeartbeat, &mockRegister{}) // Start as subscriber only pr := &api.ProvisionRequest{ @@ -896,13 +935,7 @@ func TestMulticastService_UpdateGroups_PublisherRoleTransition(t *testing.T) { BgpRemoteAsn: 65001, } - gu, ok := svc.(interface { - UpdateGroups(*api.ProvisionRequest) error - }) - if !ok { - t.Fatal("service does not implement UpdateGroups") - } - err = gu.UpdateGroups(newPR) + err := svc.UpdateGroups(newPR) if err == nil { t.Fatal("expected error for publisher role transition, got nil") } @@ -914,10 +947,7 @@ func TestMulticastService_DoubleTeardown(t *testing.T) { mockPim := &MockPIMServer{} mockHeartbeat := &MockHeartbeatSender{} - svc, err := manager.CreateService(api.UserTypeMulticast, mockBgp, mockNetlink, mockPim, mockHeartbeat) - if err != nil { - t.Fatalf("failed to create service: %v", err) - } + svc := services.NewMulticastService(mockBgp, mockNetlink, mockPim, mockHeartbeat, &mockRegister{}) pr := &api.ProvisionRequest{ UserType: api.UserTypeMulticast, @@ -948,3 +978,37 @@ func TestMulticastService_DoubleTeardown(t *testing.T) { t.Fatalf("second Teardown() returned error: %v", err) } } + +func TestMulticastSetupStartsRegisterForPublisher(t *testing.T) { + reg := &mockRegister{} + svc := services.NewMulticastService(&MockBgpServer{}, &MockNetlink{}, &MockPIMServer{}, &MockHeartbeatSender{}, reg) + + p := &api.ProvisionRequest{ + UserType: api.UserTypeMulticast, + TunnelSrc: net.IPv4(1, 1, 1, 1), + TunnelDst: net.IPv4(2, 2, 2, 2), + MulticastPubGroups: []net.IP{net.IPv4(233, 84, 178, 5)}, + MulticastRpAddress: net.IPv4(10, 0, 0, 0), + TunnelNet: &net.IPNet{ + IP: net.IPv4(169, 254, 0, 0), + Mask: net.IPMask{255, 255, 255, 254}, + }, + DoubleZeroIP: net.IPv4(7, 7, 7, 7), + DoubleZeroPrefixes: []*net.IPNet{}, + BgpLocalAsn: 65000, + BgpRemoteAsn: 65001, + } + _ = p.Validate() + if err := svc.Setup(p); err != nil { + t.Fatalf("Setup: %v", err) + } + if !reg.started { + t.Fatal("register beacon was not started for a publisher") + } + if !reg.rp.Equal(net.IPv4(10, 0, 0, 0)) { + t.Fatalf("register rp = %v, want 10.0.0.0", reg.rp) + } + if reg.dport != multicast.HeartbeatPort { + t.Fatalf("register dport = %d, want %d", reg.dport, multicast.HeartbeatPort) + } +} From 932a8cb47ee7d8805b94c9e41fd20f3f4c6baaa7 Mon Sep 17 00:00:00 2001 From: Ben Cairns Date: Mon, 29 Jun 2026 23:45:23 -0500 Subject: [PATCH 06/13] feat(manager): wire RegisterWriter through CreateService and NetlinkManager Thread services.RegisterWriter through CreateService (new final param), NetlinkManager struct + NewNetlinkManager (after heartbeat), and the internal provisionLocked call site. Construct pim.NewRegisterSender() in runtime/run.go and pass it through NewNetlinkManager. Update all test call sites (reconciler_test.go, http_test.go) with mock implementations. Co-Authored-By: Claude Sonnet 4.6 --- .../doublezerod/internal/manager/http_test.go | 26 +++++++++++++++---- .../doublezerod/internal/manager/manager.go | 10 ++++--- .../internal/manager/reconciler_test.go | 14 ++++++++-- client/doublezerod/internal/runtime/run.go | 3 ++- 4 files changed, 41 insertions(+), 12 deletions(-) diff --git a/client/doublezerod/internal/manager/http_test.go b/client/doublezerod/internal/manager/http_test.go index 4c44e330e3..4ee8620deb 100644 --- a/client/doublezerod/internal/manager/http_test.go +++ b/client/doublezerod/internal/manager/http_test.go @@ -109,7 +109,8 @@ func TestHttpStatus(t *testing.T) { b := &MockBgpServer{} pim := &MockPIMServer{} heartbeat := &MockHeartbeatSender{} - mgr := manager.NewNetlinkManager(m, b, pim, heartbeat) + register := &MockRegisterSender{} + mgr := manager.NewNetlinkManager(m, b, pim, heartbeat, register) f, err := os.CreateTemp("/tmp", "doublezero.sock") if err != nil { @@ -213,7 +214,8 @@ func TestNetlinkManager_HttpEndpoints(t *testing.T) { b := &MockBgpServer{} pim := &MockPIMServer{} heartbeat := &MockHeartbeatSender{} - mgr := manager.NewNetlinkManager(m, b, pim, heartbeat) + register := &MockRegisterSender{} + mgr := manager.NewNetlinkManager(m, b, pim, heartbeat, register) f, err := os.CreateTemp("/tmp", "doublezero.sock") if err != nil { @@ -389,7 +391,7 @@ func TestNetlinkManager_HttpEndpoints(t *testing.T) { func TestNetlinkManager_GetProvisionedServices(t *testing.T) { t.Run("no_services", func(t *testing.T) { - mgr := manager.NewNetlinkManager(&MockNetlink{}, &MockBgpServer{}, &MockPIMServer{}, &MockHeartbeatSender{}) + mgr := manager.NewNetlinkManager(&MockNetlink{}, &MockBgpServer{}, &MockPIMServer{}, &MockHeartbeatSender{}, &MockRegisterSender{}) reqs := mgr.GetProvisionedServices() if len(reqs) != 0 { t.Fatalf("expected 0 provisioned services, got %d", len(reqs)) @@ -397,7 +399,7 @@ func TestNetlinkManager_GetProvisionedServices(t *testing.T) { }) t.Run("unicast_only", func(t *testing.T) { - mgr := manager.NewNetlinkManager(&MockNetlink{}, &MockBgpServer{}, &MockPIMServer{}, &MockHeartbeatSender{}) + mgr := manager.NewNetlinkManager(&MockNetlink{}, &MockBgpServer{}, &MockPIMServer{}, &MockHeartbeatSender{}, &MockRegisterSender{}) pr := api.ProvisionRequest{ UserType: api.UserTypeIBRL, TunnelSrc: net.IPv4(1, 1, 1, 1), @@ -420,7 +422,7 @@ func TestNetlinkManager_GetProvisionedServices(t *testing.T) { }) t.Run("after_remove", func(t *testing.T) { - mgr := manager.NewNetlinkManager(&MockNetlink{}, &MockBgpServer{}, &MockPIMServer{}, &MockHeartbeatSender{}) + mgr := manager.NewNetlinkManager(&MockNetlink{}, &MockBgpServer{}, &MockPIMServer{}, &MockHeartbeatSender{}, &MockRegisterSender{}) pr := api.ProvisionRequest{ UserType: api.UserTypeIBRL, TunnelSrc: net.IPv4(1, 1, 1, 1), @@ -553,3 +555,17 @@ func (m *MockNetlink) RuleDel(n *routing.IPRule) error { func (m *MockNetlink) RouteByProtocol(protocol int) ([]*routing.Route, error) { return m.routes, nil } + +type MockRegisterSender struct{} + +func (m *MockRegisterSender) Start(iface string, srcOverlay, innerSrc net.IP, groups []net.IP, rp net.IP, dport int, payload []byte, interval time.Duration) error { + return nil +} + +func (m *MockRegisterSender) UpdateGroups(groups []net.IP) error { + return nil +} + +func (m *MockRegisterSender) Close() error { + return nil +} diff --git a/client/doublezerod/internal/manager/manager.go b/client/doublezerod/internal/manager/manager.go index 7b83198ce4..08811485ec 100644 --- a/client/doublezerod/internal/manager/manager.go +++ b/client/doublezerod/internal/manager/manager.go @@ -127,6 +127,7 @@ type NetlinkManager struct { bgp BGPServer pim services.PIMWriter heartbeat services.HeartbeatWriter + register services.RegisterWriter mu sync.Mutex // Reconciler fields @@ -146,7 +147,7 @@ type NetlinkManager struct { // CreateService creates the appropriate service based on the provisioned // user type. -func CreateService(u api.UserType, bgp services.BGPReaderWriter, nl routing.Netlinker, pim services.PIMWriter, heartbeat services.HeartbeatWriter) (Provisioner, error) { +func CreateService(u api.UserType, bgp services.BGPReaderWriter, nl routing.Netlinker, pim services.PIMWriter, heartbeat services.HeartbeatWriter, register services.RegisterWriter) (Provisioner, error) { switch u { case api.UserTypeIBRL: return services.NewIBRLService(bgp, nl), nil @@ -155,18 +156,19 @@ func CreateService(u api.UserType, bgp services.BGPReaderWriter, nl routing.Netl case api.UserTypeEdgeFiltering: return services.NewEdgeFilteringService(bgp, nl), nil case api.UserTypeMulticast: - return services.NewMulticastService(bgp, nl, pim, heartbeat), nil + return services.NewMulticastService(bgp, nl, pim, heartbeat, register), nil default: return nil, fmt.Errorf("unsupported user type: %s", u) } } -func NewNetlinkManager(netlink routing.Netlinker, bgp BGPServer, pim services.PIMWriter, heartbeat services.HeartbeatWriter, opts ...Option) *NetlinkManager { +func NewNetlinkManager(netlink routing.Netlinker, bgp BGPServer, pim services.PIMWriter, heartbeat services.HeartbeatWriter, register services.RegisterWriter, opts ...Option) *NetlinkManager { n := &NetlinkManager{ netlink: netlink, bgp: bgp, pim: pim, heartbeat: heartbeat, + register: register, pollInterval: defaultPollInterval, fetchTimeout: defaultFetchTimeout, enableCh: make(chan bool, 1), @@ -189,7 +191,7 @@ func (n *NetlinkManager) Provision(pr api.ProvisionRequest) error { // provisionLocked creates and sets up a service for the given provision request. // Caller must hold n.mu. func (n *NetlinkManager) provisionLocked(pr api.ProvisionRequest) error { - svc, err := CreateService(pr.UserType, n.bgp, n.netlink, n.pim, n.heartbeat) + svc, err := CreateService(pr.UserType, n.bgp, n.netlink, n.pim, n.heartbeat, n.register) if err != nil { return fmt.Errorf("error creating service: %v", err) } diff --git a/client/doublezerod/internal/manager/reconciler_test.go b/client/doublezerod/internal/manager/reconciler_test.go index 3d5f370e10..4627ff5882 100644 --- a/client/doublezerod/internal/manager/reconciler_test.go +++ b/client/doublezerod/internal/manager/reconciler_test.go @@ -84,6 +84,14 @@ func (m *mockHeartbeatSender) Start(string, net.IP, []net.IP, int, time.Duration func (m *mockHeartbeatSender) UpdateGroups([]net.IP) error { return nil } func (m *mockHeartbeatSender) Close() error { return nil } +type mockRegisterSender struct{} + +func (m *mockRegisterSender) Start(string, net.IP, net.IP, []net.IP, net.IP, int, []byte, time.Duration) error { + return nil +} +func (m *mockRegisterSender) UpdateGroups([]net.IP) error { return nil } +func (m *mockRegisterSender) Close() error { return nil } + type mockLatencyProvider struct { results []latency.LatencyResult } @@ -99,14 +107,16 @@ func newTestNLM(fetcher Fetcher, opts ...Option) *NetlinkManager { bgpSrv := &mockBgpServer{} pimSrv := &mockPIMServer{} hb := &mockHeartbeatSender{} - return NewNetlinkManager(nl, bgpSrv, pimSrv, hb, append([]Option{WithFetcher(fetcher)}, opts...)...) + reg := &mockRegisterSender{} + return NewNetlinkManager(nl, bgpSrv, pimSrv, hb, reg, append([]Option{WithFetcher(fetcher)}, opts...)...) } func newTestNLMWithNetlink(nl routing.Netlinker, fetcher Fetcher, opts ...Option) *NetlinkManager { bgpSrv := &mockBgpServer{} pimSrv := &mockPIMServer{} hb := &mockHeartbeatSender{} - return NewNetlinkManager(nl, bgpSrv, pimSrv, hb, append([]Option{WithFetcher(fetcher)}, opts...)...) + reg := &mockRegisterSender{} + return NewNetlinkManager(nl, bgpSrv, pimSrv, hb, reg, append([]Option{WithFetcher(fetcher)}, opts...)...) } func testDevice(pk [32]byte, ip [4]uint8, prefixes [][5]uint8) serviceability.Device { diff --git a/client/doublezerod/internal/runtime/run.go b/client/doublezerod/internal/runtime/run.go index 5e89709edc..1bbfaeb858 100644 --- a/client/doublezerod/internal/runtime/run.go +++ b/client/doublezerod/internal/runtime/run.go @@ -67,6 +67,7 @@ func Run(ctx context.Context, sockFile string, routeConfigPath string, enableLat return fmt.Errorf("error creating bgp server: %v", err) } + register := pim.NewRegisterSender() pim := pim.NewPIMServer() heartbeat := multicast.NewHeartbeatSender() @@ -126,7 +127,7 @@ func Run(ctx context.Context, sockFile string, routeConfigPath string, enableLat if latencyManager != nil { nlmOpts = append(nlmOpts, manager.WithLatencyProvider(latencyManager)) } - nlm := manager.NewNetlinkManager(nlr, bgp, pim, heartbeat, nlmOpts...) + nlm := manager.NewNetlinkManager(nlr, bgp, pim, heartbeat, register, nlmOpts...) errCh := make(chan error) From 76285700f8f9625650ed1d5c422212433340deee Mon Sep 17 00:00:00 2001 From: Ben Cairns Date: Mon, 29 Jun 2026 23:50:59 -0500 Subject: [PATCH 07/13] feat(controller): permit PIM Register to the RP on publisher tunnels Add `permit pim any host 10.0.0.0` to SEC-USER-PUB-MCAST-IN before the final deny, so unicast PIM Register packets sent by the client to the RP (10.0.0.0) are not dropped at the inbound tunnel ACL. The belt-and- suspenders `pim ipv4 border-router` and SEC-USER-SUB-MCAST-IN are unchanged. --- .../internal/controller/fixtures/base.config.drained.txt | 1 + .../controller/fixtures/base.config.flex-algo-disabled.txt | 1 + .../internal/controller/fixtures/base.config.flex-algo.txt | 1 + .../controller/internal/controller/fixtures/base.config.txt | 1 + .../internal/controller/fixtures/base.config.with.mgmt.vrf.txt | 1 + .../controller/fixtures/base.config.without.interfaces.peers.txt | 1 + .../controller/internal/controller/fixtures/e2e.last.user.tmpl | 1 + .../controller/internal/controller/fixtures/e2e.multi.vrf.tmpl | 1 + .../internal/controller/fixtures/e2e.peer.removal.tmpl | 1 + controlplane/controller/internal/controller/fixtures/e2e.tmpl | 1 + .../controller/fixtures/e2e.without.interfaces.peers.tmpl | 1 + .../controller/internal/controller/fixtures/interfaces.txt | 1 + .../controller/fixtures/metro.routing.disabled.tunnel.tmpl | 1 + .../controller/internal/controller/fixtures/mixed.tunnel.tmpl | 1 + .../fixtures/multi.vrf.mixed.metro.routing.tunnel.tmpl | 1 + .../internal/controller/fixtures/multi.vrf.tunnel.tmpl | 1 + .../internal/controller/fixtures/multicast.tunnel.tmpl | 1 + .../internal/controller/fixtures/nohardware.tunnel.tmpl | 1 + .../controller/internal/controller/fixtures/unicast.tunnel.tmpl | 1 + .../internal/controller/fixtures/unknown.peer.removal.tmpl | 1 + .../controller/internal/controller/templates/tunnel.tmpl | 1 + 21 files changed, 21 insertions(+) diff --git a/controlplane/controller/internal/controller/fixtures/base.config.drained.txt b/controlplane/controller/internal/controller/fixtures/base.config.drained.txt index 51b0736d8a..6b26dcdc9f 100644 --- a/controlplane/controller/internal/controller/fixtures/base.config.drained.txt +++ b/controlplane/controller/internal/controller/fixtures/base.config.drained.txt @@ -153,6 +153,7 @@ ip access-list SEC-USER-PUB-MCAST-IN permit tcp any any eq bgp permit ip any 224.0.0.13/32 permit ip any 239.0.0.0/24 + permit pim any host 10.0.0.0 deny ip any any ! no ip access-list SEC-USER-SUB-MCAST-IN diff --git a/controlplane/controller/internal/controller/fixtures/base.config.flex-algo-disabled.txt b/controlplane/controller/internal/controller/fixtures/base.config.flex-algo-disabled.txt index d1275a7f26..470bc0258a 100644 --- a/controlplane/controller/internal/controller/fixtures/base.config.flex-algo-disabled.txt +++ b/controlplane/controller/internal/controller/fixtures/base.config.flex-algo-disabled.txt @@ -192,6 +192,7 @@ ip access-list SEC-USER-PUB-MCAST-IN permit tcp any any eq bgp permit ip any 224.0.0.13/32 permit ip any 239.0.0.0/24 + permit pim any host 10.0.0.0 deny ip any any ! no ip access-list SEC-USER-SUB-MCAST-IN diff --git a/controlplane/controller/internal/controller/fixtures/base.config.flex-algo.txt b/controlplane/controller/internal/controller/fixtures/base.config.flex-algo.txt index 84685b34ca..f1d73fca72 100644 --- a/controlplane/controller/internal/controller/fixtures/base.config.flex-algo.txt +++ b/controlplane/controller/internal/controller/fixtures/base.config.flex-algo.txt @@ -205,6 +205,7 @@ ip access-list SEC-USER-PUB-MCAST-IN permit tcp any any eq bgp permit ip any 224.0.0.13/32 permit ip any 239.0.0.0/24 + permit pim any host 10.0.0.0 deny ip any any ! no ip access-list SEC-USER-SUB-MCAST-IN diff --git a/controlplane/controller/internal/controller/fixtures/base.config.txt b/controlplane/controller/internal/controller/fixtures/base.config.txt index 6bedfda9f5..d579189158 100644 --- a/controlplane/controller/internal/controller/fixtures/base.config.txt +++ b/controlplane/controller/internal/controller/fixtures/base.config.txt @@ -153,6 +153,7 @@ ip access-list SEC-USER-PUB-MCAST-IN permit tcp any any eq bgp permit ip any 224.0.0.13/32 permit ip any 239.0.0.0/24 + permit pim any host 10.0.0.0 deny ip any any ! no ip access-list SEC-USER-SUB-MCAST-IN diff --git a/controlplane/controller/internal/controller/fixtures/base.config.with.mgmt.vrf.txt b/controlplane/controller/internal/controller/fixtures/base.config.with.mgmt.vrf.txt index 29bf2297d5..54b0c5d96b 100644 --- a/controlplane/controller/internal/controller/fixtures/base.config.with.mgmt.vrf.txt +++ b/controlplane/controller/internal/controller/fixtures/base.config.with.mgmt.vrf.txt @@ -129,6 +129,7 @@ ip access-list SEC-USER-PUB-MCAST-IN permit tcp any any eq bgp permit ip any 224.0.0.13/32 permit ip any 239.0.0.0/24 + permit pim any host 10.0.0.0 deny ip any any ! no ip access-list SEC-USER-SUB-MCAST-IN diff --git a/controlplane/controller/internal/controller/fixtures/base.config.without.interfaces.peers.txt b/controlplane/controller/internal/controller/fixtures/base.config.without.interfaces.peers.txt index 5dd5996172..27549e9674 100644 --- a/controlplane/controller/internal/controller/fixtures/base.config.without.interfaces.peers.txt +++ b/controlplane/controller/internal/controller/fixtures/base.config.without.interfaces.peers.txt @@ -98,6 +98,7 @@ ip access-list SEC-USER-PUB-MCAST-IN permit tcp any any eq bgp permit ip any 224.0.0.13/32 permit ip any 239.0.0.0/24 + permit pim any host 10.0.0.0 deny ip any any ! no ip access-list SEC-USER-SUB-MCAST-IN diff --git a/controlplane/controller/internal/controller/fixtures/e2e.last.user.tmpl b/controlplane/controller/internal/controller/fixtures/e2e.last.user.tmpl index 876fbf032f..60c6f5028a 100644 --- a/controlplane/controller/internal/controller/fixtures/e2e.last.user.tmpl +++ b/controlplane/controller/internal/controller/fixtures/e2e.last.user.tmpl @@ -166,6 +166,7 @@ ip access-list SEC-USER-PUB-MCAST-IN permit tcp any any eq bgp permit ip any 224.0.0.13/32 permit ip any 239.0.0.0/24 + permit pim any host 10.0.0.0 deny ip any any ! no ip access-list SEC-USER-SUB-MCAST-IN diff --git a/controlplane/controller/internal/controller/fixtures/e2e.multi.vrf.tmpl b/controlplane/controller/internal/controller/fixtures/e2e.multi.vrf.tmpl index 0384f04b30..99e7d7a796 100644 --- a/controlplane/controller/internal/controller/fixtures/e2e.multi.vrf.tmpl +++ b/controlplane/controller/internal/controller/fixtures/e2e.multi.vrf.tmpl @@ -257,6 +257,7 @@ ip access-list SEC-USER-PUB-MCAST-IN permit tcp any any eq bgp permit ip any 224.0.0.13/32 permit ip any 239.0.0.0/24 + permit pim any host 10.0.0.0 deny ip any any ! no ip access-list SEC-USER-SUB-MCAST-IN diff --git a/controlplane/controller/internal/controller/fixtures/e2e.peer.removal.tmpl b/controlplane/controller/internal/controller/fixtures/e2e.peer.removal.tmpl index 7b38bd4753..79672b55b4 100644 --- a/controlplane/controller/internal/controller/fixtures/e2e.peer.removal.tmpl +++ b/controlplane/controller/internal/controller/fixtures/e2e.peer.removal.tmpl @@ -265,6 +265,7 @@ ip access-list SEC-USER-PUB-MCAST-IN permit tcp any any eq bgp permit ip any 224.0.0.13/32 permit ip any 239.0.0.0/24 + permit pim any host 10.0.0.0 deny ip any any ! no ip access-list SEC-USER-SUB-MCAST-IN diff --git a/controlplane/controller/internal/controller/fixtures/e2e.tmpl b/controlplane/controller/internal/controller/fixtures/e2e.tmpl index e97841b5fb..d86ea7097b 100644 --- a/controlplane/controller/internal/controller/fixtures/e2e.tmpl +++ b/controlplane/controller/internal/controller/fixtures/e2e.tmpl @@ -286,6 +286,7 @@ ip access-list SEC-USER-PUB-MCAST-IN permit tcp any any eq bgp permit ip any 224.0.0.13/32 permit ip any 239.0.0.0/24 + permit pim any host 10.0.0.0 deny ip any any ! no ip access-list SEC-USER-SUB-MCAST-IN diff --git a/controlplane/controller/internal/controller/fixtures/e2e.without.interfaces.peers.tmpl b/controlplane/controller/internal/controller/fixtures/e2e.without.interfaces.peers.tmpl index 6280252608..5f049df6c8 100644 --- a/controlplane/controller/internal/controller/fixtures/e2e.without.interfaces.peers.tmpl +++ b/controlplane/controller/internal/controller/fixtures/e2e.without.interfaces.peers.tmpl @@ -116,6 +116,7 @@ ip access-list SEC-USER-PUB-MCAST-IN permit tcp any any eq bgp permit ip any 224.0.0.13/32 permit ip any 239.0.0.0/24 + permit pim any host 10.0.0.0 deny ip any any ! no ip access-list SEC-USER-SUB-MCAST-IN diff --git a/controlplane/controller/internal/controller/fixtures/interfaces.txt b/controlplane/controller/internal/controller/fixtures/interfaces.txt index 2de2fccfa1..ec9ea4eae9 100644 --- a/controlplane/controller/internal/controller/fixtures/interfaces.txt +++ b/controlplane/controller/internal/controller/fixtures/interfaces.txt @@ -211,6 +211,7 @@ ip access-list SEC-USER-PUB-MCAST-IN permit tcp any any eq bgp permit ip any 224.0.0.13/32 permit ip any 239.0.0.0/24 + permit pim any host 10.0.0.0 deny ip any any ! no ip access-list SEC-USER-SUB-MCAST-IN diff --git a/controlplane/controller/internal/controller/fixtures/metro.routing.disabled.tunnel.tmpl b/controlplane/controller/internal/controller/fixtures/metro.routing.disabled.tunnel.tmpl index e80d6ad7cc..3406051feb 100644 --- a/controlplane/controller/internal/controller/fixtures/metro.routing.disabled.tunnel.tmpl +++ b/controlplane/controller/internal/controller/fixtures/metro.routing.disabled.tunnel.tmpl @@ -216,6 +216,7 @@ ip access-list SEC-USER-PUB-MCAST-IN permit tcp any any eq bgp permit ip any 224.0.0.13/32 permit ip any 239.0.0.0/24 + permit pim any host 10.0.0.0 deny ip any any ! no ip access-list SEC-USER-SUB-MCAST-IN diff --git a/controlplane/controller/internal/controller/fixtures/mixed.tunnel.tmpl b/controlplane/controller/internal/controller/fixtures/mixed.tunnel.tmpl index bba01079d4..6607a16ff5 100644 --- a/controlplane/controller/internal/controller/fixtures/mixed.tunnel.tmpl +++ b/controlplane/controller/internal/controller/fixtures/mixed.tunnel.tmpl @@ -317,6 +317,7 @@ ip access-list SEC-USER-PUB-MCAST-IN permit tcp any any eq bgp permit ip any 224.0.0.13/32 permit ip any 239.0.0.0/24 + permit pim any host 10.0.0.0 deny ip any any ! no ip access-list SEC-USER-SUB-MCAST-IN diff --git a/controlplane/controller/internal/controller/fixtures/multi.vrf.mixed.metro.routing.tunnel.tmpl b/controlplane/controller/internal/controller/fixtures/multi.vrf.mixed.metro.routing.tunnel.tmpl index 1f8717f413..f8c1e77882 100644 --- a/controlplane/controller/internal/controller/fixtures/multi.vrf.mixed.metro.routing.tunnel.tmpl +++ b/controlplane/controller/internal/controller/fixtures/multi.vrf.mixed.metro.routing.tunnel.tmpl @@ -225,6 +225,7 @@ ip access-list SEC-USER-PUB-MCAST-IN permit tcp any any eq bgp permit ip any 224.0.0.13/32 permit ip any 239.0.0.0/24 + permit pim any host 10.0.0.0 deny ip any any ! no ip access-list SEC-USER-SUB-MCAST-IN diff --git a/controlplane/controller/internal/controller/fixtures/multi.vrf.tunnel.tmpl b/controlplane/controller/internal/controller/fixtures/multi.vrf.tunnel.tmpl index d1fd6c51c1..f4e93d1577 100644 --- a/controlplane/controller/internal/controller/fixtures/multi.vrf.tunnel.tmpl +++ b/controlplane/controller/internal/controller/fixtures/multi.vrf.tunnel.tmpl @@ -227,6 +227,7 @@ ip access-list SEC-USER-PUB-MCAST-IN permit tcp any any eq bgp permit ip any 224.0.0.13/32 permit ip any 239.0.0.0/24 + permit pim any host 10.0.0.0 deny ip any any ! no ip access-list SEC-USER-SUB-MCAST-IN diff --git a/controlplane/controller/internal/controller/fixtures/multicast.tunnel.tmpl b/controlplane/controller/internal/controller/fixtures/multicast.tunnel.tmpl index 1774cb95d2..f123c8741c 100644 --- a/controlplane/controller/internal/controller/fixtures/multicast.tunnel.tmpl +++ b/controlplane/controller/internal/controller/fixtures/multicast.tunnel.tmpl @@ -261,6 +261,7 @@ ip access-list SEC-USER-PUB-MCAST-IN permit tcp any any eq bgp permit ip any 224.0.0.13/32 permit ip any 239.0.0.0/24 + permit pim any host 10.0.0.0 deny ip any any ! no ip access-list SEC-USER-SUB-MCAST-IN diff --git a/controlplane/controller/internal/controller/fixtures/nohardware.tunnel.tmpl b/controlplane/controller/internal/controller/fixtures/nohardware.tunnel.tmpl index 330285a124..c5a29c0733 100644 --- a/controlplane/controller/internal/controller/fixtures/nohardware.tunnel.tmpl +++ b/controlplane/controller/internal/controller/fixtures/nohardware.tunnel.tmpl @@ -306,6 +306,7 @@ ip access-list SEC-USER-PUB-MCAST-IN permit tcp any any eq bgp permit ip any 224.0.0.13/32 permit ip any 239.0.0.0/24 + permit pim any host 10.0.0.0 deny ip any any ! no ip access-list SEC-USER-SUB-MCAST-IN diff --git a/controlplane/controller/internal/controller/fixtures/unicast.tunnel.tmpl b/controlplane/controller/internal/controller/fixtures/unicast.tunnel.tmpl index 4d44429561..35240a0dd7 100644 --- a/controlplane/controller/internal/controller/fixtures/unicast.tunnel.tmpl +++ b/controlplane/controller/internal/controller/fixtures/unicast.tunnel.tmpl @@ -274,6 +274,7 @@ ip access-list SEC-USER-PUB-MCAST-IN permit tcp any any eq bgp permit ip any 224.0.0.13/32 permit ip any 239.0.0.0/24 + permit pim any host 10.0.0.0 deny ip any any ! no ip access-list SEC-USER-SUB-MCAST-IN diff --git a/controlplane/controller/internal/controller/fixtures/unknown.peer.removal.tmpl b/controlplane/controller/internal/controller/fixtures/unknown.peer.removal.tmpl index 2b25b42e1c..21a58de4f9 100644 --- a/controlplane/controller/internal/controller/fixtures/unknown.peer.removal.tmpl +++ b/controlplane/controller/internal/controller/fixtures/unknown.peer.removal.tmpl @@ -285,6 +285,7 @@ ip access-list SEC-USER-PUB-MCAST-IN permit tcp any any eq bgp permit ip any 224.0.0.13/32 permit ip any 239.0.0.0/24 + permit pim any host 10.0.0.0 deny ip any any ! no ip access-list SEC-USER-SUB-MCAST-IN diff --git a/controlplane/controller/internal/controller/templates/tunnel.tmpl b/controlplane/controller/internal/controller/templates/tunnel.tmpl index 49f7fc6f41..2147629845 100644 --- a/controlplane/controller/internal/controller/templates/tunnel.tmpl +++ b/controlplane/controller/internal/controller/templates/tunnel.tmpl @@ -450,6 +450,7 @@ ip access-list SEC-USER-PUB-MCAST-IN permit tcp any any eq bgp permit ip any 224.0.0.13/32 permit ip any {{ .MulticastGroupBlock }} + permit pim any host 10.0.0.0 deny ip any any ! no ip access-list SEC-USER-SUB-MCAST-IN From 4c7d519d3de7b5f7a8a39beef6ddf662f609bf9a Mon Sep 17 00:00:00 2001 From: Ben Cairns Date: Mon, 29 Jun 2026 23:57:44 -0500 Subject: [PATCH 08/13] docs/test(pim,services): clarify Register buffer/port; assert beacon pub groups Co-Authored-By: Claude Sonnet 4.6 --- client/doublezerod/internal/pim/pim.go | 6 ++++++ client/doublezerod/internal/services/services_test.go | 8 ++++++++ 2 files changed, 14 insertions(+) diff --git a/client/doublezerod/internal/pim/pim.go b/client/doublezerod/internal/pim/pim.go index 99549d6e12..62017b41d1 100644 --- a/client/doublezerod/internal/pim/pim.go +++ b/client/doublezerod/internal/pim/pim.go @@ -758,6 +758,10 @@ type RegisterMessage struct { func (p *RegisterMessage) LayerType() gopacket.LayerType { return gopacket.LayerTypePayload } +// SerializeTo writes the 4-byte Register flags word followed by the +// encapsulated datagram (p.Data). b must be a freshly allocated +// SerializeBuffer: it prepends the flags and appends the data, so any +// pre-existing buffer contents would corrupt the Register layout. func (p *RegisterMessage) SerializeTo(b gopacket.SerializeBuffer, opts gopacket.SerializeOptions) error { flags, err := b.PrependBytes(4) if err != nil { @@ -793,6 +797,8 @@ func constructRegisterMessage(innerSrc, group net.IP, dport int, payload []byte) SrcIP: innerSrc.To4(), DstIP: group.To4(), } + // SrcPort mirrors DstPort: the RP reads only the inner source/group IPs to + // create (S,G) state, so the encapsulated UDP ports are immaterial. udp := &layers.UDP{ SrcPort: layers.UDPPort(dport), DstPort: layers.UDPPort(dport), diff --git a/client/doublezerod/internal/services/services_test.go b/client/doublezerod/internal/services/services_test.go index 550c04d17b..8d102bae7f 100644 --- a/client/doublezerod/internal/services/services_test.go +++ b/client/doublezerod/internal/services/services_test.go @@ -1011,4 +1011,12 @@ func TestMulticastSetupStartsRegisterForPublisher(t *testing.T) { if reg.dport != multicast.HeartbeatPort { t.Fatalf("register dport = %d, want %d", reg.dport, multicast.HeartbeatPort) } + if len(reg.groups) != len(p.MulticastPubGroups) { + t.Fatalf("register groups len = %d, want %d", len(reg.groups), len(p.MulticastPubGroups)) + } + for i := range reg.groups { + if !reg.groups[i].Equal(p.MulticastPubGroups[i]) { + t.Fatalf("register group[%d] = %v, want %v", i, reg.groups[i], p.MulticastPubGroups[i]) + } + } } From 94c644e2156f80e1eca3ef952eebaa776eabc77b Mon Sep 17 00:00:00 2001 From: Ben Cairns Date: Tue, 30 Jun 2026 00:26:14 -0500 Subject: [PATCH 09/13] docs(changelog): note RFC-22 client PIM Register beacon + controller permit --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b21bcccc2d..9b11c57630 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,9 @@ All notable changes to this project will be documented in this file. - Client - Add a `-route-liveness-backoff-max` daemon flag to cap the Down-state liveness probe interval. Defaults to 60s (production behavior unchanged); the e2e harness pins a small value to avoid a probe gap that flaked the multi-client IBRL tests. (#3949) + - Originate a PIM Register beacon for multicast publishers: `doublezerod` periodically sends a PIM Register (encapsulating the publisher heartbeat) unicast to the RP over the tunnel, so the device originates the MSDP SA for the published source even on a dual-role publisher/subscriber tunnel, where `pim ipv4 border-router` source injection is suppressed by the subscriber-side PIM neighbor. (RFC-22) +- Controller + - Permit the unicast PIM Register to the RP (`permit pim any host 10.0.0.0`) on publisher multicast tunnels so the client-originated Register reaches the device; `pim ipv4 border-router` is retained as a backstop. (RFC-22) - E2E - Pin the e2e ledger `solana-test-validator` to the deploy floor (agave 2.2.16, testnet) so a green e2e proves a change actually deploys and runs on the production cluster runtime. Previously the runtime validator rode the SBF build toolchain version (2.3.13); it is now decoupled and pinned independently. The build toolchain is unchanged. (#3957) From 7495bfd8858ba58111f59e9b0c803b44ecc25aa8 Mon Sep 17 00:00:00 2001 From: Ben Cairns Date: Tue, 30 Jun 2026 01:11:20 -0500 Subject: [PATCH 10/13] fix(manager): validate reconciler-built ProvisionRequest so MulticastRpAddress defaults The reconciler path called buildProvisionRequest which constructed an api.ProvisionRequest without calling Validate(). This left MulticastRpAddress as nil. When the multicast service then called register.Start with that nil rp, sendRegister built an ipv4.Header with a nil Dst and WriteTo failed with "missing address". The HTTP provisioning path was unaffected because ServeProvision calls Validate() explicitly (internal/manager/http.go:66). Fix: build the request into a local var in buildProvisionRequest and call Validate() before returning, mirroring what the HTTP path does. Validate() defaults nil MulticastRpAddress to 10.0.0.0. Regression test TestReconcile_ProvisionMulticast_DefaultsRpAddress exercises a publisher user through the reconciler and asserts that both ProvisionRequest.MulticastRpAddress and the rp arg passed to mockRegisterSender.Start equal net.IPv4(10,0,0,0). Co-Authored-By: Claude Opus 4.8 (1M context) --- .../doublezerod/internal/manager/manager.go | 8 +- .../internal/manager/reconciler_test.go | 76 ++++++++++++++++++- 2 files changed, 80 insertions(+), 4 deletions(-) diff --git a/client/doublezerod/internal/manager/manager.go b/client/doublezerod/internal/manager/manager.go index 08811485ec..e964e39c8d 100644 --- a/client/doublezerod/internal/manager/manager.go +++ b/client/doublezerod/internal/manager/manager.go @@ -705,7 +705,7 @@ func (n *NetlinkManager) buildProvisionRequest( } } - return api.ProvisionRequest{ + pr := api.ProvisionRequest{ UserType: mapUserType(u.UserType), TunnelSrc: tunnelSrc, TunnelDst: tunnelDst, @@ -716,7 +716,11 @@ func (n *NetlinkManager) buildProvisionRequest( BgpRemoteAsn: cfg.RemoteASN, MulticastPubGroups: pubGroups, MulticastSubGroups: subGroups, - }, nil + } + if err := pr.Validate(); err != nil { + return api.ProvisionRequest{}, fmt.Errorf("invalid provision request: %w", err) + } + return pr, nil } // mapUserType maps onchain UserUserType to daemon api.UserType. diff --git a/client/doublezerod/internal/manager/reconciler_test.go b/client/doublezerod/internal/manager/reconciler_test.go index 4627ff5882..53f281bc35 100644 --- a/client/doublezerod/internal/manager/reconciler_test.go +++ b/client/doublezerod/internal/manager/reconciler_test.go @@ -84,9 +84,15 @@ func (m *mockHeartbeatSender) Start(string, net.IP, []net.IP, int, time.Duration func (m *mockHeartbeatSender) UpdateGroups([]net.IP) error { return nil } func (m *mockHeartbeatSender) Close() error { return nil } -type mockRegisterSender struct{} +type mockRegisterSender struct { + mu sync.Mutex + capturedRPs []net.IP +} -func (m *mockRegisterSender) Start(string, net.IP, net.IP, []net.IP, net.IP, int, []byte, time.Duration) error { +func (m *mockRegisterSender) Start(iface string, srcOverlay, innerSrc net.IP, groups []net.IP, rp net.IP, dport int, payload []byte, interval time.Duration) error { + m.mu.Lock() + defer m.mu.Unlock() + m.capturedRPs = append(m.capturedRPs, rp) return nil } func (m *mockRegisterSender) UpdateGroups([]net.IP) error { return nil } @@ -119,6 +125,18 @@ func newTestNLMWithNetlink(nl routing.Netlinker, fetcher Fetcher, opts ...Option return NewNetlinkManager(nl, bgpSrv, pimSrv, hb, reg, append([]Option{WithFetcher(fetcher)}, opts...)...) } +// newTestNLMWithRegisterSender is like newTestNLM but returns the register +// mock so callers can inspect captured arguments. +func newTestNLMWithRegisterSender(fetcher Fetcher, opts ...Option) (*NetlinkManager, *mockRegisterSender) { + nl := &mockNetlink{} + bgpSrv := &mockBgpServer{} + pimSrv := &mockPIMServer{} + hb := &mockHeartbeatSender{} + reg := &mockRegisterSender{} + mgr := NewNetlinkManager(nl, bgpSrv, pimSrv, hb, reg, append([]Option{WithFetcher(fetcher)}, opts...)...) + return mgr, reg +} + func testDevice(pk [32]byte, ip [4]uint8, prefixes [][5]uint8) serviceability.Device { return serviceability.Device{ PubKey: pk, @@ -343,6 +361,60 @@ func TestReconcile_ProvisionMulticast(t *testing.T) { } } +// TestReconcile_ProvisionMulticast_DefaultsRpAddress verifies that the +// reconciler path calls Validate() on the built ProvisionRequest so that a +// nil MulticastRpAddress is defaulted to 10.0.0.0 before the register sender +// is started. Without the fix buildProvisionRequest returns a nil +// MulticastRpAddress, which makes RegisterSender.Start fail with +// "missing address". +func TestReconcile_ProvisionMulticast_DefaultsRpAddress(t *testing.T) { + devicePK := [32]byte{1} + mcastGroupPK := [32]byte{2} + clientIP := net.IPv4(1, 2, 3, 4).To4() + + // Publisher user — has publish groups, which triggers register.Start. + user := testUser([4]uint8{1, 2, 3, 4}, devicePK, serviceability.UserTypeMulticast, serviceability.UserStatusActivated) + user.Publishers = [][32]uint8{mcastGroupPK} + + fetcher := &mockFetcher{ + data: &serviceability.ProgramData{ + GlobalConfig: testGlobalConfig(), + Devices: []serviceability.Device{testDevice(devicePK, [4]uint8{5, 6, 7, 8}, nil)}, + Users: []serviceability.User{user}, + MulticastGroups: []serviceability.MulticastGroup{{PubKey: mcastGroupPK, MulticastIp: [4]uint8{239, 0, 0, 1}}}, + }, + } + + n, reg := newTestNLMWithRegisterSender(fetcher, WithClientIP(clientIP), WithPollInterval(time.Second)) + n.reconcile(context.Background()) + + if n.MulticastService == nil { + t.Fatal("expected multicast service to be provisioned") + } + + // The ProvisionRequest stored in the service must have a defaulted RP address. + pr := n.MulticastService.ProvisionRequest() + if pr.MulticastRpAddress == nil { + t.Fatal("MulticastRpAddress is nil on reconciler path; Validate() was not called") + } + wantRP := net.IPv4(10, 0, 0, 0) + if !pr.MulticastRpAddress.Equal(wantRP) { + t.Fatalf("expected MulticastRpAddress %v, got %v", wantRP, pr.MulticastRpAddress) + } + + // The rp passed to register.Start must also be the defaulted address. + reg.mu.Lock() + capturedRPs := append([]net.IP(nil), reg.capturedRPs...) + reg.mu.Unlock() + + if len(capturedRPs) == 0 { + t.Fatal("register.Start was never called for a publisher user") + } + if !capturedRPs[0].Equal(wantRP) { + t.Fatalf("register.Start received rp=%v, want %v", capturedRPs[0], wantRP) + } +} + func TestReconcile_RemoveMulticast(t *testing.T) { clientIP := net.IPv4(1, 2, 3, 4).To4() From 0689751171fcd38230aa427354d144c37e7e8e79 Mon Sep 17 00:00:00 2001 From: Ben Cairns Date: Tue, 30 Jun 2026 22:25:20 -0500 Subject: [PATCH 11/13] fix(multicast): update heartbeat_test to exported HeartbeatPayload The Task 4 rename heartbeatPayload -> HeartbeatPayload missed the test file references, tripping go-lint typecheck on CI. Co-Authored-By: Claude Opus 4.8 (1M context) --- client/doublezerod/internal/multicast/heartbeat_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/client/doublezerod/internal/multicast/heartbeat_test.go b/client/doublezerod/internal/multicast/heartbeat_test.go index 7df031d235..d196fa12b3 100644 --- a/client/doublezerod/internal/multicast/heartbeat_test.go +++ b/client/doublezerod/internal/multicast/heartbeat_test.go @@ -100,12 +100,12 @@ func TestHeartbeatSender_SendsImmediately(t *testing.T) { if udpAddr.Port != HeartbeatPort { t.Errorf("expected port %d, got %d", HeartbeatPort, udpAddr.Port) } - if len(w.payload) != len(heartbeatPayload) { - t.Errorf("expected payload len %d, got %d", len(heartbeatPayload), len(w.payload)) + if len(w.payload) != len(HeartbeatPayload) { + t.Errorf("expected payload len %d, got %d", len(HeartbeatPayload), len(w.payload)) } for i, b := range w.payload { - if b != heartbeatPayload[i] { - t.Errorf("payload[%d] = 0x%02x, want 0x%02x", i, b, heartbeatPayload[i]) + if b != HeartbeatPayload[i] { + t.Errorf("payload[%d] = 0x%02x, want 0x%02x", i, b, HeartbeatPayload[i]) } } case <-time.After(2 * time.Second): From c98a4ea6b6621849f6ffa06605f1539a3936310d Mon Sep 17 00:00:00 2001 From: Ben Cairns Date: Tue, 30 Jun 2026 22:34:13 -0500 Subject: [PATCH 12/13] style(pim): gofmt register_test.go Fix mockRawConn method alignment flagged by go-lint (gofmt). Co-Authored-By: Claude Opus 4.8 (1M context) --- client/doublezerod/internal/pim/register_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/doublezerod/internal/pim/register_test.go b/client/doublezerod/internal/pim/register_test.go index 718d313b61..1f18c2f2dc 100644 --- a/client/doublezerod/internal/pim/register_test.go +++ b/client/doublezerod/internal/pim/register_test.go @@ -70,8 +70,8 @@ func (m *mockRawConn) WriteTo(h *ipv4.Header, b []byte, cm *ipv4.ControlMessage) m.calls = append(m.calls, writeCall{h: h, b: cp, cm: cm}) return nil } -func (m *mockRawConn) Close() error { return nil } -func (m *mockRawConn) SetMulticastInterface(*net.Interface) error { return nil } +func (m *mockRawConn) Close() error { return nil } +func (m *mockRawConn) SetMulticastInterface(*net.Interface) error { return nil } func (m *mockRawConn) SetControlMessage(ipv4.ControlFlags, bool) error { return nil } func TestRegisterSenderSendsRegisterToRP(t *testing.T) { From b07b5dc58cbec697bafdd7e1862d1e33c3a19e9d Mon Sep 17 00:00:00 2001 From: Ben Cairns Date: Tue, 30 Jun 2026 23:34:06 -0500 Subject: [PATCH 13/13] test(e2e): add PIM Register permit to agent config fixtures The controller now renders `permit pim any host 10.0.0.0` in the device-global SEC-USER-PUB-MCAST-IN ACL (RFC-22), so every e2e agent config fixture that includes that ACL needs the line. Updates the multicast, ibrl, and ibrl_with_allocated_addr fixtures to match. Co-Authored-By: Claude Opus 4.8 (1M context) --- e2e/fixtures/ibrl/doublezero_agent_config_drained.tmpl | 1 + e2e/fixtures/ibrl/doublezero_agent_config_peer_removed.tmpl | 1 + e2e/fixtures/ibrl/doublezero_agent_config_user_added.tmpl | 1 + e2e/fixtures/ibrl/doublezero_agent_config_user_removed.tmpl | 1 + .../doublezero_agent_config_user_added.tmpl | 1 + .../doublezero_agent_config_user_removed.tmpl | 1 + .../multicast/doublezero_agent_config_both_users_added.tmpl | 1 + .../multicast/doublezero_agent_config_both_users_removed.tmpl | 1 + 8 files changed, 8 insertions(+) diff --git a/e2e/fixtures/ibrl/doublezero_agent_config_drained.tmpl b/e2e/fixtures/ibrl/doublezero_agent_config_drained.tmpl index 9a1493aec3..8da31dbe68 100644 --- a/e2e/fixtures/ibrl/doublezero_agent_config_drained.tmpl +++ b/e2e/fixtures/ibrl/doublezero_agent_config_drained.tmpl @@ -301,6 +301,7 @@ ip access-list SEC-USER-PUB-MCAST-IN permit tcp any any eq bgp permit ip any 224.0.0.13/32 permit ip any 233.84.178.0/24 + permit pim any host 10.0.0.0 deny ip any any ! no ip access-list SEC-USER-SUB-MCAST-IN diff --git a/e2e/fixtures/ibrl/doublezero_agent_config_peer_removed.tmpl b/e2e/fixtures/ibrl/doublezero_agent_config_peer_removed.tmpl index b5968a91ca..8e5fa1add8 100644 --- a/e2e/fixtures/ibrl/doublezero_agent_config_peer_removed.tmpl +++ b/e2e/fixtures/ibrl/doublezero_agent_config_peer_removed.tmpl @@ -301,6 +301,7 @@ ip access-list SEC-USER-PUB-MCAST-IN permit tcp any any eq bgp permit ip any 224.0.0.13/32 permit ip any 233.84.178.0/24 + permit pim any host 10.0.0.0 deny ip any any ! no ip access-list SEC-USER-SUB-MCAST-IN diff --git a/e2e/fixtures/ibrl/doublezero_agent_config_user_added.tmpl b/e2e/fixtures/ibrl/doublezero_agent_config_user_added.tmpl index d4bcb82e22..a66906b0a2 100644 --- a/e2e/fixtures/ibrl/doublezero_agent_config_user_added.tmpl +++ b/e2e/fixtures/ibrl/doublezero_agent_config_user_added.tmpl @@ -369,6 +369,7 @@ ip access-list SEC-USER-PUB-MCAST-IN permit tcp any any eq bgp permit ip any 224.0.0.13/32 permit ip any 233.84.178.0/24 + permit pim any host 10.0.0.0 deny ip any any ! no ip access-list SEC-USER-SUB-MCAST-IN diff --git a/e2e/fixtures/ibrl/doublezero_agent_config_user_removed.tmpl b/e2e/fixtures/ibrl/doublezero_agent_config_user_removed.tmpl index be0664ff84..33786c3db3 100644 --- a/e2e/fixtures/ibrl/doublezero_agent_config_user_removed.tmpl +++ b/e2e/fixtures/ibrl/doublezero_agent_config_user_removed.tmpl @@ -318,6 +318,7 @@ ip access-list SEC-USER-PUB-MCAST-IN permit tcp any any eq bgp permit ip any 224.0.0.13/32 permit ip any 233.84.178.0/24 + permit pim any host 10.0.0.0 deny ip any any ! no ip access-list SEC-USER-SUB-MCAST-IN diff --git a/e2e/fixtures/ibrl_with_allocated_addr/doublezero_agent_config_user_added.tmpl b/e2e/fixtures/ibrl_with_allocated_addr/doublezero_agent_config_user_added.tmpl index 1d29f171fc..b6393a4803 100644 --- a/e2e/fixtures/ibrl_with_allocated_addr/doublezero_agent_config_user_added.tmpl +++ b/e2e/fixtures/ibrl_with_allocated_addr/doublezero_agent_config_user_added.tmpl @@ -369,6 +369,7 @@ ip access-list SEC-USER-PUB-MCAST-IN permit tcp any any eq bgp permit ip any 224.0.0.13/32 permit ip any 233.84.178.0/24 + permit pim any host 10.0.0.0 deny ip any any ! no ip access-list SEC-USER-SUB-MCAST-IN diff --git a/e2e/fixtures/ibrl_with_allocated_addr/doublezero_agent_config_user_removed.tmpl b/e2e/fixtures/ibrl_with_allocated_addr/doublezero_agent_config_user_removed.tmpl index be0664ff84..33786c3db3 100644 --- a/e2e/fixtures/ibrl_with_allocated_addr/doublezero_agent_config_user_removed.tmpl +++ b/e2e/fixtures/ibrl_with_allocated_addr/doublezero_agent_config_user_removed.tmpl @@ -318,6 +318,7 @@ ip access-list SEC-USER-PUB-MCAST-IN permit tcp any any eq bgp permit ip any 224.0.0.13/32 permit ip any 233.84.178.0/24 + permit pim any host 10.0.0.0 deny ip any any ! no ip access-list SEC-USER-SUB-MCAST-IN diff --git a/e2e/fixtures/multicast/doublezero_agent_config_both_users_added.tmpl b/e2e/fixtures/multicast/doublezero_agent_config_both_users_added.tmpl index 7c97c26cff..03382e79a5 100644 --- a/e2e/fixtures/multicast/doublezero_agent_config_both_users_added.tmpl +++ b/e2e/fixtures/multicast/doublezero_agent_config_both_users_added.tmpl @@ -441,6 +441,7 @@ ip access-list SEC-USER-PUB-MCAST-IN permit tcp any any eq bgp permit ip any 224.0.0.13/32 permit ip any 233.84.178.0/24 + permit pim any host 10.0.0.0 deny ip any any ! no ip access-list SEC-USER-SUB-MCAST-IN diff --git a/e2e/fixtures/multicast/doublezero_agent_config_both_users_removed.tmpl b/e2e/fixtures/multicast/doublezero_agent_config_both_users_removed.tmpl index be0664ff84..33786c3db3 100644 --- a/e2e/fixtures/multicast/doublezero_agent_config_both_users_removed.tmpl +++ b/e2e/fixtures/multicast/doublezero_agent_config_both_users_removed.tmpl @@ -318,6 +318,7 @@ ip access-list SEC-USER-PUB-MCAST-IN permit tcp any any eq bgp permit ip any 224.0.0.13/32 permit ip any 233.84.178.0/24 + permit pim any host 10.0.0.0 deny ip any any ! no ip access-list SEC-USER-SUB-MCAST-IN