Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ All notable changes to this project will be documented in this file.

- Client
- Add a `-route-liveness-backoff-max` daemon flag to cap the Down-state liveness probe interval. Defaults to 60s (production behavior unchanged); the e2e harness pins a small value to avoid a probe gap that flaked the multi-client IBRL tests. (#3949)
- Originate a PIM Register beacon for multicast publishers: `doublezerod` periodically sends a PIM Register (encapsulating the publisher heartbeat) unicast to the RP over the tunnel, so the device originates the MSDP SA for the published source even on a dual-role publisher/subscriber tunnel, where `pim ipv4 border-router` source injection is suppressed by the subscriber-side PIM neighbor. (RFC-22)
- Controller
- Permit the unicast PIM Register to the RP (`permit pim any host 10.0.0.0`) on publisher multicast tunnels so the client-originated Register reaches the device; `pim ipv4 border-router` is retained as a backstop. (RFC-22)
- E2E
- Pin the e2e ledger `solana-test-validator` to the deploy floor (agave 2.2.16, testnet) so a green e2e proves a change actually deploys and runs on the production cluster runtime. Previously the runtime validator rode the SBF build toolchain version (2.3.13); it is now decoupled and pinned independently. The build toolchain is unchanged. (#3957)

Expand Down
7 changes: 7 additions & 0 deletions client/doublezerod/internal/api/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type ProvisionRequest struct {
DoubleZeroPrefixes []*net.IPNet `json:"doublezero_prefixes"`
MulticastSubGroups []net.IP `json:"mcast_sub_groups"`
MulticastPubGroups []net.IP `json:"mcast_pub_groups"`
MulticastRpAddress net.IP `json:"mcast_rp_address"`
BgpLocalAsn uint32 `json:"bgp_local_asn"`
BgpRemoteAsn uint32 `json:"bgp_remote_asn"`
}
Expand Down Expand Up @@ -191,6 +192,9 @@ func (p *ProvisionRequest) InfraEqual(other *ProvisionRequest) bool {
if p.BgpLocalAsn != other.BgpLocalAsn || p.BgpRemoteAsn != other.BgpRemoteAsn {
return false
}
if !p.MulticastRpAddress.Equal(other.MulticastRpAddress) {
return false
}
return ipNetSlicesEqual(p.DoubleZeroPrefixes, other.DoubleZeroPrefixes)
}

Expand Down Expand Up @@ -270,6 +274,9 @@ func (p *ProvisionRequest) Validate() error {
if p.BgpRemoteAsn == 0 {
p.BgpRemoteAsn = 65001
}
if p.MulticastRpAddress == nil {
p.MulticastRpAddress = net.IPv4(10, 0, 0, 0)
}
return nil
}

Expand Down
11 changes: 11 additions & 0 deletions client/doublezerod/internal/api/requests_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func testFullProvisionRequest() ProvisionRequest {
BgpRemoteAsn: 65001,
MulticastPubGroups: []net.IP{net.IPv4(239, 0, 0, 1)},
MulticastSubGroups: []net.IP{net.IPv4(239, 0, 0, 2)},
MulticastRpAddress: net.IPv4(10, 0, 0, 0),
}
}

Expand Down Expand Up @@ -198,6 +199,16 @@ func fieldIndex(typ reflect.Type, name string) int {
return -1
}

func TestProvisionRequestDefaultsRpAddress(t *testing.T) {
p := &ProvisionRequest{}
if err := p.Validate(); err != nil {
t.Fatalf("Validate: %v", err)
}
if !p.MulticastRpAddress.Equal(net.IPv4(10, 0, 0, 0)) {
t.Fatalf("MulticastRpAddress = %v, want 10.0.0.0", p.MulticastRpAddress)
}
}

func TestIPSetDiff(t *testing.T) {
tests := []struct {
name string
Expand Down
26 changes: 21 additions & 5 deletions client/doublezerod/internal/manager/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ func TestHttpStatus(t *testing.T) {
b := &MockBgpServer{}
pim := &MockPIMServer{}
heartbeat := &MockHeartbeatSender{}
mgr := manager.NewNetlinkManager(m, b, pim, heartbeat)
register := &MockRegisterSender{}
mgr := manager.NewNetlinkManager(m, b, pim, heartbeat, register)

f, err := os.CreateTemp("/tmp", "doublezero.sock")
if err != nil {
Expand Down Expand Up @@ -213,7 +214,8 @@ func TestNetlinkManager_HttpEndpoints(t *testing.T) {
b := &MockBgpServer{}
pim := &MockPIMServer{}
heartbeat := &MockHeartbeatSender{}
mgr := manager.NewNetlinkManager(m, b, pim, heartbeat)
register := &MockRegisterSender{}
mgr := manager.NewNetlinkManager(m, b, pim, heartbeat, register)

f, err := os.CreateTemp("/tmp", "doublezero.sock")
if err != nil {
Expand Down Expand Up @@ -389,15 +391,15 @@ func TestNetlinkManager_HttpEndpoints(t *testing.T) {

func TestNetlinkManager_GetProvisionedServices(t *testing.T) {
t.Run("no_services", func(t *testing.T) {
mgr := manager.NewNetlinkManager(&MockNetlink{}, &MockBgpServer{}, &MockPIMServer{}, &MockHeartbeatSender{})
mgr := manager.NewNetlinkManager(&MockNetlink{}, &MockBgpServer{}, &MockPIMServer{}, &MockHeartbeatSender{}, &MockRegisterSender{})
reqs := mgr.GetProvisionedServices()
if len(reqs) != 0 {
t.Fatalf("expected 0 provisioned services, got %d", len(reqs))
}
})

t.Run("unicast_only", func(t *testing.T) {
mgr := manager.NewNetlinkManager(&MockNetlink{}, &MockBgpServer{}, &MockPIMServer{}, &MockHeartbeatSender{})
mgr := manager.NewNetlinkManager(&MockNetlink{}, &MockBgpServer{}, &MockPIMServer{}, &MockHeartbeatSender{}, &MockRegisterSender{})
pr := api.ProvisionRequest{
UserType: api.UserTypeIBRL,
TunnelSrc: net.IPv4(1, 1, 1, 1),
Expand All @@ -420,7 +422,7 @@ func TestNetlinkManager_GetProvisionedServices(t *testing.T) {
})

t.Run("after_remove", func(t *testing.T) {
mgr := manager.NewNetlinkManager(&MockNetlink{}, &MockBgpServer{}, &MockPIMServer{}, &MockHeartbeatSender{})
mgr := manager.NewNetlinkManager(&MockNetlink{}, &MockBgpServer{}, &MockPIMServer{}, &MockHeartbeatSender{}, &MockRegisterSender{})
pr := api.ProvisionRequest{
UserType: api.UserTypeIBRL,
TunnelSrc: net.IPv4(1, 1, 1, 1),
Expand Down Expand Up @@ -553,3 +555,17 @@ func (m *MockNetlink) RuleDel(n *routing.IPRule) error {
func (m *MockNetlink) RouteByProtocol(protocol int) ([]*routing.Route, error) {
return m.routes, nil
}

type MockRegisterSender struct{}

func (m *MockRegisterSender) Start(iface string, srcOverlay, innerSrc net.IP, groups []net.IP, rp net.IP, dport int, payload []byte, interval time.Duration) error {
return nil
}

func (m *MockRegisterSender) UpdateGroups(groups []net.IP) error {
return nil
}

func (m *MockRegisterSender) Close() error {
return nil
}
18 changes: 12 additions & 6 deletions client/doublezerod/internal/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ type NetlinkManager struct {
bgp BGPServer
pim services.PIMWriter
heartbeat services.HeartbeatWriter
register services.RegisterWriter
mu sync.Mutex

// Reconciler fields
Expand All @@ -146,7 +147,7 @@ type NetlinkManager struct {

// CreateService creates the appropriate service based on the provisioned
// user type.
func CreateService(u api.UserType, bgp services.BGPReaderWriter, nl routing.Netlinker, pim services.PIMWriter, heartbeat services.HeartbeatWriter) (Provisioner, error) {
func CreateService(u api.UserType, bgp services.BGPReaderWriter, nl routing.Netlinker, pim services.PIMWriter, heartbeat services.HeartbeatWriter, register services.RegisterWriter) (Provisioner, error) {
switch u {
case api.UserTypeIBRL:
return services.NewIBRLService(bgp, nl), nil
Expand All @@ -155,18 +156,19 @@ func CreateService(u api.UserType, bgp services.BGPReaderWriter, nl routing.Netl
case api.UserTypeEdgeFiltering:
return services.NewEdgeFilteringService(bgp, nl), nil
case api.UserTypeMulticast:
return services.NewMulticastService(bgp, nl, pim, heartbeat), nil
return services.NewMulticastService(bgp, nl, pim, heartbeat, register), nil
default:
return nil, fmt.Errorf("unsupported user type: %s", u)
}
}

func NewNetlinkManager(netlink routing.Netlinker, bgp BGPServer, pim services.PIMWriter, heartbeat services.HeartbeatWriter, opts ...Option) *NetlinkManager {
func NewNetlinkManager(netlink routing.Netlinker, bgp BGPServer, pim services.PIMWriter, heartbeat services.HeartbeatWriter, register services.RegisterWriter, opts ...Option) *NetlinkManager {
n := &NetlinkManager{
netlink: netlink,
bgp: bgp,
pim: pim,
heartbeat: heartbeat,
register: register,
pollInterval: defaultPollInterval,
fetchTimeout: defaultFetchTimeout,
enableCh: make(chan bool, 1),
Expand All @@ -189,7 +191,7 @@ func (n *NetlinkManager) Provision(pr api.ProvisionRequest) error {
// provisionLocked creates and sets up a service for the given provision request.
// Caller must hold n.mu.
func (n *NetlinkManager) provisionLocked(pr api.ProvisionRequest) error {
svc, err := CreateService(pr.UserType, n.bgp, n.netlink, n.pim, n.heartbeat)
svc, err := CreateService(pr.UserType, n.bgp, n.netlink, n.pim, n.heartbeat, n.register)
if err != nil {
return fmt.Errorf("error creating service: %v", err)
}
Expand Down Expand Up @@ -703,7 +705,7 @@ func (n *NetlinkManager) buildProvisionRequest(
}
}

return api.ProvisionRequest{
pr := api.ProvisionRequest{
UserType: mapUserType(u.UserType),
TunnelSrc: tunnelSrc,
TunnelDst: tunnelDst,
Expand All @@ -714,7 +716,11 @@ func (n *NetlinkManager) buildProvisionRequest(
BgpRemoteAsn: cfg.RemoteASN,
MulticastPubGroups: pubGroups,
MulticastSubGroups: subGroups,
}, nil
}
if err := pr.Validate(); err != nil {
return api.ProvisionRequest{}, fmt.Errorf("invalid provision request: %w", err)
}
return pr, nil
}

// mapUserType maps onchain UserUserType to daemon api.UserType.
Expand Down
86 changes: 84 additions & 2 deletions client/doublezerod/internal/manager/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,20 @@ func (m *mockHeartbeatSender) Start(string, net.IP, []net.IP, int, time.Duration
func (m *mockHeartbeatSender) UpdateGroups([]net.IP) error { return nil }
func (m *mockHeartbeatSender) Close() error { return nil }

type mockRegisterSender struct {
mu sync.Mutex
capturedRPs []net.IP
}

func (m *mockRegisterSender) Start(iface string, srcOverlay, innerSrc net.IP, groups []net.IP, rp net.IP, dport int, payload []byte, interval time.Duration) error {
m.mu.Lock()
defer m.mu.Unlock()
m.capturedRPs = append(m.capturedRPs, rp)
return nil
}
func (m *mockRegisterSender) UpdateGroups([]net.IP) error { return nil }
func (m *mockRegisterSender) Close() error { return nil }

type mockLatencyProvider struct {
results []latency.LatencyResult
}
Expand All @@ -99,14 +113,28 @@ func newTestNLM(fetcher Fetcher, opts ...Option) *NetlinkManager {
bgpSrv := &mockBgpServer{}
pimSrv := &mockPIMServer{}
hb := &mockHeartbeatSender{}
return NewNetlinkManager(nl, bgpSrv, pimSrv, hb, append([]Option{WithFetcher(fetcher)}, opts...)...)
reg := &mockRegisterSender{}
return NewNetlinkManager(nl, bgpSrv, pimSrv, hb, reg, append([]Option{WithFetcher(fetcher)}, opts...)...)
}

func newTestNLMWithNetlink(nl routing.Netlinker, fetcher Fetcher, opts ...Option) *NetlinkManager {
bgpSrv := &mockBgpServer{}
pimSrv := &mockPIMServer{}
hb := &mockHeartbeatSender{}
return NewNetlinkManager(nl, bgpSrv, pimSrv, hb, append([]Option{WithFetcher(fetcher)}, opts...)...)
reg := &mockRegisterSender{}
return NewNetlinkManager(nl, bgpSrv, pimSrv, hb, reg, append([]Option{WithFetcher(fetcher)}, opts...)...)
}

// newTestNLMWithRegisterSender is like newTestNLM but returns the register
// mock so callers can inspect captured arguments.
func newTestNLMWithRegisterSender(fetcher Fetcher, opts ...Option) (*NetlinkManager, *mockRegisterSender) {
nl := &mockNetlink{}
bgpSrv := &mockBgpServer{}
pimSrv := &mockPIMServer{}
hb := &mockHeartbeatSender{}
reg := &mockRegisterSender{}
mgr := NewNetlinkManager(nl, bgpSrv, pimSrv, hb, reg, append([]Option{WithFetcher(fetcher)}, opts...)...)
return mgr, reg
}

func testDevice(pk [32]byte, ip [4]uint8, prefixes [][5]uint8) serviceability.Device {
Expand Down Expand Up @@ -333,6 +361,60 @@ func TestReconcile_ProvisionMulticast(t *testing.T) {
}
}

// TestReconcile_ProvisionMulticast_DefaultsRpAddress verifies that the
// reconciler path calls Validate() on the built ProvisionRequest so that a
// nil MulticastRpAddress is defaulted to 10.0.0.0 before the register sender
// is started. Without the fix buildProvisionRequest returns a nil
// MulticastRpAddress, which makes RegisterSender.Start fail with
// "missing address".
func TestReconcile_ProvisionMulticast_DefaultsRpAddress(t *testing.T) {
devicePK := [32]byte{1}
mcastGroupPK := [32]byte{2}
clientIP := net.IPv4(1, 2, 3, 4).To4()

// Publisher user — has publish groups, which triggers register.Start.
user := testUser([4]uint8{1, 2, 3, 4}, devicePK, serviceability.UserTypeMulticast, serviceability.UserStatusActivated)
user.Publishers = [][32]uint8{mcastGroupPK}

fetcher := &mockFetcher{
data: &serviceability.ProgramData{
GlobalConfig: testGlobalConfig(),
Devices: []serviceability.Device{testDevice(devicePK, [4]uint8{5, 6, 7, 8}, nil)},
Users: []serviceability.User{user},
MulticastGroups: []serviceability.MulticastGroup{{PubKey: mcastGroupPK, MulticastIp: [4]uint8{239, 0, 0, 1}}},
},
}

n, reg := newTestNLMWithRegisterSender(fetcher, WithClientIP(clientIP), WithPollInterval(time.Second))
n.reconcile(context.Background())

if n.MulticastService == nil {
t.Fatal("expected multicast service to be provisioned")
}

// The ProvisionRequest stored in the service must have a defaulted RP address.
pr := n.MulticastService.ProvisionRequest()
if pr.MulticastRpAddress == nil {
t.Fatal("MulticastRpAddress is nil on reconciler path; Validate() was not called")
}
wantRP := net.IPv4(10, 0, 0, 0)
if !pr.MulticastRpAddress.Equal(wantRP) {
t.Fatalf("expected MulticastRpAddress %v, got %v", wantRP, pr.MulticastRpAddress)
}

// The rp passed to register.Start must also be the defaulted address.
reg.mu.Lock()
capturedRPs := append([]net.IP(nil), reg.capturedRPs...)
reg.mu.Unlock()

if len(capturedRPs) == 0 {
t.Fatal("register.Start was never called for a publisher user")
}
if !capturedRPs[0].Equal(wantRP) {
t.Fatalf("register.Start received rp=%v, want %v", capturedRPs[0], wantRP)
}
}

func TestReconcile_RemoveMulticast(t *testing.T) {
clientIP := net.IPv4(1, 2, 3, 4).To4()

Expand Down
6 changes: 3 additions & 3 deletions client/doublezerod/internal/multicast/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ const (
DefaultHeartbeatInterval = 10 * time.Second
)

// heartbeatPayload is the fixed payload sent in each heartbeat packet.
// HeartbeatPayload is the fixed payload sent in each heartbeat packet.
// 0x44, 0x5A = "DZ", followed by a 2-byte version (0x00, 0x01).
var heartbeatPayload = []byte{0x44, 0x5A, 0x00, 0x01}
var HeartbeatPayload = []byte{0x44, 0x5A, 0x00, 0x01}

// PacketConner abstracts the UDP multicast connection for testing.
type PacketConner interface {
Expand Down Expand Up @@ -135,7 +135,7 @@ func (h *HeartbeatSender) startWithConn(p PacketConner, intf *net.Interface, gro

func sendHeartbeats(p PacketConner, dsts []*net.UDPAddr) {
for _, dst := range dsts {
if _, err := p.WriteTo(heartbeatPayload, nil, dst); err != nil {
if _, err := p.WriteTo(HeartbeatPayload, nil, dst); err != nil {
slog.Error("failed to send heartbeat", "dst", dst, "error", err)
}
}
Expand Down
8 changes: 4 additions & 4 deletions client/doublezerod/internal/multicast/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@ func TestHeartbeatSender_SendsImmediately(t *testing.T) {
if udpAddr.Port != HeartbeatPort {
t.Errorf("expected port %d, got %d", HeartbeatPort, udpAddr.Port)
}
if len(w.payload) != len(heartbeatPayload) {
t.Errorf("expected payload len %d, got %d", len(heartbeatPayload), len(w.payload))
if len(w.payload) != len(HeartbeatPayload) {
t.Errorf("expected payload len %d, got %d", len(HeartbeatPayload), len(w.payload))
}
for i, b := range w.payload {
if b != heartbeatPayload[i] {
t.Errorf("payload[%d] = 0x%02x, want 0x%02x", i, b, heartbeatPayload[i])
if b != HeartbeatPayload[i] {
t.Errorf("payload[%d] = 0x%02x, want 0x%02x", i, b, HeartbeatPayload[i])
}
}
case <-time.After(2 * time.Second):
Expand Down
Loading
Loading