diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d77743a1c..c0e6572390 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ All notable changes to this project will be documented in this file. - Client - Add a `--no-wait` flag to `doublezero disconnect` that skips waiting for the daemon to tear down the tunnel(s), exiting once the onchain user deletion is confirmed. (#3911) + - Add periodic kernel route reconciliation to `doublezerod` that detects and reinstalls missing routes, with a metric tracking install failures ([#3669](https://github.com/malbeclabs/doublezero/issues/3669)) - CLI - Add hidden `migrate flex-algo` (RFC-18 link-topology and Vpnv4 loopback FlexAlgoNodeSegment backfill); the prior `migrate` command is now `migrate user-pda`. Moved from `doublezero-admin`. - Add hidden `device migrate-multicast-counts` and `device migrate-unicast-counts` to reconcile stale per-device subscriber, publisher, and unicast-user counts. Moved from `doublezero-admin`. diff --git a/client/doublezerod/cmd/doublezerod/main.go b/client/doublezerod/cmd/doublezerod/main.go index c0453969cb..f2d4b8dde3 100644 --- a/client/doublezerod/cmd/doublezerod/main.go +++ b/client/doublezerod/cmd/doublezerod/main.go @@ -45,13 +45,14 @@ var ( stateDir = flag.String("state-dir", "/var/lib/doublezerod", "directory for persistent state files") // Route liveness configuration flags. - routeLivenessTxMin = flag.Duration("route-liveness-tx-min", defaultRouteLivenessTxMin, "route liveness tx min") - routeLivenessRxMin = flag.Duration("route-liveness-rx-min", defaultRouteLivenessRxMin, "route liveness rx min") - routeLivenessDetectMult = flag.Uint("route-liveness-detect-mult", defaultRouteLivenessDetectMult, "route liveness detect mult") - routeLivenessMinTxFloor = flag.Duration("route-liveness-min-tx-floor", defaultRouteLivenessMinTxFloor, "route liveness min tx floor") - routeLivenessMaxTxCeil = flag.Duration("route-liveness-max-tx-ceil", defaultRouteLivenessMaxTxCeil, "route liveness max tx ceil") - routeLivenessPeerMetrics = flag.Bool("route-liveness-peer-metrics", false, "enables per peer metrics for route liveness (high cardinality)") - routeLivenessDebug = flag.Bool("route-liveness-debug", false, "enables debug logging for route liveness") + routeLivenessTxMin = flag.Duration("route-liveness-tx-min", defaultRouteLivenessTxMin, "route liveness tx min") + routeLivenessRxMin = flag.Duration("route-liveness-rx-min", defaultRouteLivenessRxMin, "route liveness rx min") + routeLivenessDetectMult = flag.Uint("route-liveness-detect-mult", defaultRouteLivenessDetectMult, "route liveness detect mult") + routeLivenessMinTxFloor = flag.Duration("route-liveness-min-tx-floor", defaultRouteLivenessMinTxFloor, "route liveness min tx floor") + routeLivenessMaxTxCeil = flag.Duration("route-liveness-max-tx-ceil", defaultRouteLivenessMaxTxCeil, "route liveness max tx ceil") + routeLivenessReconcileInterval = flag.Duration("route-liveness-reconcile-interval", defaultRouteLivenessReconcileInterval, "interval for periodic kernel route reconciliation; 0 disables") + routeLivenessPeerMetrics = flag.Bool("route-liveness-peer-metrics", false, "enables per peer metrics for route liveness (high cardinality)") + routeLivenessDebug = flag.Bool("route-liveness-debug", false, "enables debug logging for route liveness") // TODO(snormore): These flags are temporary for initial rollout testing. // They will be superceded by a single `route-liveness-enable` flag, where false means @@ -66,12 +67,13 @@ var ( ) const ( - defaultOnchainRPCTimeout = 30 * time.Second - defaultRouteLivenessTxMin = 1 * time.Second - defaultRouteLivenessRxMin = 1 * time.Second - defaultRouteLivenessDetectMult = 3 - defaultRouteLivenessMinTxFloor = 50 * time.Millisecond - defaultRouteLivenessMaxTxCeil = 3 * time.Second + defaultOnchainRPCTimeout = 30 * time.Second + defaultRouteLivenessTxMin = 1 * time.Second + defaultRouteLivenessRxMin = 1 * time.Second + defaultRouteLivenessDetectMult = 3 + defaultRouteLivenessMinTxFloor = 50 * time.Millisecond + defaultRouteLivenessMaxTxCeil = 3 * time.Second + defaultRouteLivenessReconcileInterval = 30 * time.Second defaultRouteLivenessBindIP = "0.0.0.0" ) @@ -179,6 +181,8 @@ func main() { EnablePeerMetrics: *routeLivenessPeerMetrics, + RouteReconcileInterval: *routeLivenessReconcileInterval, + // Default to treating peers that advertise passive mode as passive. That is, we will // install their routes immediately and never uninstall them on down events. HonorPeerAdvertisedPassive: true, diff --git a/client/doublezerod/internal/liveness/main_test.go b/client/doublezerod/internal/liveness/main_test.go index 4736b6fe9f..1f2ee80c1f 100644 --- a/client/doublezerod/internal/liveness/main_test.go +++ b/client/doublezerod/internal/liveness/main_test.go @@ -70,7 +70,11 @@ func wait[T any](t *testing.T, ch <-chan T, d time.Duration, name string) T { func newTestRoute(mutate func(*Route)) *Route { r := &Route{Route: routing.Route{ - Table: 100, + // Use the main routing table: liveness only attaches in IBRL mode, whose + // routes live in RT_TABLE_MAIN, and RouteByProtocol only returns + // main-table routes. A non-main table would never come back from the real + // backend, so reconcile tests must mirror production here. + Table: unix.RT_TABLE_MAIN, Src: net.IPv4(10, 4, 0, 1), Dst: &net.IPNet{IP: net.IPv4(10, 4, 0, 11), Mask: net.CIDRMask(32, 32)}, NextHop: net.IPv4(10, 5, 0, 1), diff --git a/client/doublezerod/internal/liveness/manager.go b/client/doublezerod/internal/liveness/manager.go index 32556c282b..e0c0fea128 100644 --- a/client/doublezerod/internal/liveness/manager.go +++ b/client/doublezerod/internal/liveness/manager.go @@ -13,6 +13,7 @@ import ( "github.com/malbeclabs/doublezero/client/doublezerod/internal/routing" "github.com/prometheus/client_golang/prometheus" + "golang.org/x/sys/unix" ) const ( @@ -92,6 +93,10 @@ type ManagerConfig struct { // Client version to advertise to peers in control packets. ClientVersion string + + // RouteReconcileInterval controls how often the manager scans the kernel + // routing table for missing routes and reinstalls them. Zero disables. + RouteReconcileInterval time.Duration } // Validate fills defaults and enforces constraints for ManagerConfig. @@ -151,6 +156,12 @@ func (c *ManagerConfig) Validate() error { if c.ClientVersion == "" { return errors.New("clientVersion is required") } + if c.RouteReconcileInterval < 0 { + return errors.New("routeReconcileInterval must be non-negative") + } + // Note: RouteReconcileInterval == 0 is left as-is to disable reconciliation + // (see the `> 0` guard in NewManager). The operational default comes from + // the flag default in main.go, not from here. return nil } @@ -291,6 +302,26 @@ func NewManager(ctx context.Context, cfg *ManagerConfig, cr *routing.ConfiguredR } }() + // Route reconciliation goroutine: periodically scans the kernel routing + // table for missing routes and reinstalls them. + if cfg.RouteReconcileInterval > 0 { + log.Info("liveness: route reconciliation enabled", "interval", cfg.RouteReconcileInterval.String()) + m.wg.Add(1) + go func() { + defer m.wg.Done() + ticker := time.NewTicker(cfg.RouteReconcileInterval) + defer ticker.Stop() + for { + select { + case <-m.ctx.Done(): + return + case <-ticker.C: + m.reconcileRoutes() + } + } + }() + } + // If any routes are configured to be excluded, mark then as AdminDown immediately. if m.cr != nil { for ip := range m.cr.GetExcluded() { @@ -415,15 +446,14 @@ func (m *manager) WithdrawRoute(r *Route, iface string) error { m.log.Info("liveness: withdrawing route", "route", r.String(), "iface", iface) - if m.cfg.PassiveMode && !r.NoUninstall { - // Passive-mode: caller wants immediate kernel update independent of liveness. - if err := m.cfg.Netlinker.RouteDelete(&r.Route); err != nil { - m.metrics.RouteUninstallFailures.WithLabelValues(iface, srcIP).Inc() - return fmt.Errorf("error withdrawing route: %v", err) - } - m.metrics.routeWithdraw(iface, srcIP) - } - + // Clear desired/installed under the lock *before* issuing any kernel + // RouteDelete, mirroring onSessionDown. reconcileRoutes re-checks + // installed[rk] under m.mu before its RouteAdd, so clearing the flag first + // closes the resurrection race: reconcile either observes the withdrawal + // and skips, or its add lands before our delete and the end state is the + // route gone. (The previous passive-mode ordering deleted the kernel route + // first, leaving a window where reconcile could resurrect a route the + // manager believed was withdrawn.) rk := routeKeyFor(iface, r) m.mu.Lock() delete(m.desired, rk) @@ -448,12 +478,13 @@ func (m *manager) WithdrawRoute(r *Route, iface string) error { m.metrics.SessionsMapSize.Set(float64(len(m.sessions))) m.mu.Unlock() - // If we previously installed the route (and not in PassiveMode), remove it now. - if wasInstalled && !m.cfg.PassiveMode && !r.NoUninstall { - err := m.cfg.Netlinker.RouteDelete(&r.Route) - if err != nil { + // Remove the kernel route. In passive mode the caller wants an immediate + // kernel update independent of liveness, so we always delete; otherwise we + // only delete a route we previously installed. + if !r.NoUninstall && (m.cfg.PassiveMode || wasInstalled) { + if err := m.cfg.Netlinker.RouteDelete(&r.Route); err != nil { m.metrics.RouteUninstallFailures.WithLabelValues(iface, srcIP).Inc() - return err + return fmt.Errorf("error withdrawing route: %v", err) } m.metrics.routeWithdraw(iface, srcIP) } @@ -834,7 +865,7 @@ func (m *manager) onSessionDown(sess *Session) { } if m.cfg.PassiveMode { - m.log.Debug("liveness: session down (global passive; keeping route)", + m.log.Info("liveness: session down (global passive; keeping route)", "peer", peer.String(), "route", snap.Route.String(), "downSince", snap.DownSince.UTC().String(), @@ -845,7 +876,7 @@ func (m *manager) onSessionDown(sess *Session) { } if effectivelyPassive { - m.log.Debug("liveness: session down (peer passive; keeping route)", + m.log.Info("liveness: session down (peer passive; keeping route)", "peer", peer.String(), "route", snap.Route.String(), "downSince", snap.DownSince.UTC().String(), @@ -875,6 +906,110 @@ func (m *manager) onSessionDown(sess *Session) { ) } +// reconcileRoutes scans the kernel routing table for routes that should be +// installed but are missing, and reinstalls them. This mitigates routes being +// removed by external processes. +func (m *manager) reconcileRoutes() { + // Snapshot installed and desired under lock. + type installedRoute struct { + rk RouteKey + route *Route + } + m.mu.Lock() + toCheck := make([]installedRoute, 0, len(m.installed)) + for rk, ok := range m.installed { + if !ok { + continue + } + r, exists := m.desired[rk] + if !exists { + continue + } + // Skip excluded destinations: the manager's Netlinker is a + // ConfiguredRouteReaderWriter whose RouteAdd is a silent no-op for + // these, so they are never actually present in the kernel. Without + // this guard every excluded route would be flagged "missing" and + // "reinstalled" (a no-op) on every tick, inflating the reinstall + // counter and spamming logs forever. + if m.cr != nil && r.Dst != nil && r.Dst.IP != nil && m.cr.IsExcluded(r.Dst.IP.String()) { + continue + } + toCheck = append(toCheck, installedRoute{rk: rk, route: r}) + } + m.mu.Unlock() + + if len(toCheck) == 0 { + return + } + + kernelRoutes, err := m.cfg.Netlinker.RouteByProtocol(unix.RTPROT_BGP) + if err != nil { + m.log.Error("liveness: error fetching kernel routes for reconciliation", "error", err) + return + } + + // Build a lookup set keyed by (table, dst, nexthop, src) for fast matching. + // Dst uses the full prefix (IP + mask) via *net.IPNet.String() so a kernel + // route with a different mask does not satisfy a desired route at the same + // IP (e.g. 10.0.0.0/16 must not match a desired 10.0.0.0/24). + type kernelKey struct { + Table int + Dst string + NextHop string + SrcIP string + } + dstString := func(ipnet *net.IPNet) string { + if ipnet == nil || ipnet.IP == nil { + return "" + } + return ipnet.String() + } + kernelSet := make(map[kernelKey]struct{}, len(kernelRoutes)) + for _, kr := range kernelRoutes { + var nhIP, srcIP string + if kr.NextHop != nil && kr.NextHop.To4() != nil { + nhIP = kr.NextHop.To4().String() + } + if kr.Src != nil && kr.Src.To4() != nil { + srcIP = kr.Src.To4().String() + } + kernelSet[kernelKey{Table: kr.Table, Dst: dstString(kr.Dst), NextHop: nhIP, SrcIP: srcIP}] = struct{}{} + } + + for _, ir := range toCheck { + kk := kernelKey{Table: ir.route.Table, Dst: dstString(ir.route.Dst), NextHop: ir.rk.NextHop, SrcIP: ir.rk.SrcIP} + if _, present := kernelSet[kk]; present { + continue + } + // Re-check and reinstall under the lock. onSessionDown flips + // installed[rk] to false under m.mu *before* issuing RouteDelete, so + // holding the lock across the re-check and RouteAdd closes the race: + // either we observe the withdrawal and skip, or our add completes + // before the delete lands and the end state stays consistent. The + // netlink call under the lock only happens for genuinely-missing + // routes, which are rare by definition. + m.mu.Lock() + if !m.installed[ir.rk] { + m.mu.Unlock() + continue + } + err := m.cfg.Netlinker.RouteAdd(&ir.route.Route) + m.mu.Unlock() + + if err != nil { + m.log.Error("liveness: error reinstalling route", + "error", err, "route", ir.route.String()) + m.metrics.RouteInstallFailures.WithLabelValues(ir.rk.Interface, ir.rk.SrcIP).Inc() + continue + } + m.log.Warn("liveness: reinstalled missing route", + "route", ir.route.String(), + "iface", ir.rk.Interface, + ) + m.metrics.routeReinstall(ir.rk.Interface, ir.rk.SrcIP) + } +} + // isPeerEffectivelyPassive returns true when this session should not have its // dataplane (kernel route) managed due to peer-advertised passive mode. // diff --git a/client/doublezerod/internal/liveness/manager_test.go b/client/doublezerod/internal/liveness/manager_test.go index c015ef4a1d..091a35e736 100644 --- a/client/doublezerod/internal/liveness/manager_test.go +++ b/client/doublezerod/internal/liveness/manager_test.go @@ -13,6 +13,7 @@ import ( "github.com/prometheus/client_golang/prometheus" prom "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" + "golang.org/x/sys/unix" ) func TestClient_Liveness_Manager_ConfigValidate(t *testing.T) { @@ -68,6 +69,12 @@ func TestClient_Liveness_Manager_ConfigValidate(t *testing.T) { require.NotZero(t, cfg.BackoffMax) require.GreaterOrEqual(t, int64(cfg.MaxTxCeil), int64(cfg.MinTxFloor)) require.GreaterOrEqual(t, int64(cfg.BackoffMax), int64(cfg.MinTxFloor)) + + // RouteReconcileInterval == 0 must survive Validate() unchanged so that 0 + // disables reconciliation (the kill switch). A negative value is rejected. + require.Zero(t, cfg.RouteReconcileInterval, "Validate must not rewrite a zero RouteReconcileInterval") + cfg.RouteReconcileInterval = -1 + require.Error(t, cfg.Validate()) } func TestClient_Liveness_Manager_NewManager_BindsAndLocalAddr(t *testing.T) { @@ -1564,6 +1571,10 @@ func newTestManager(t *testing.T, mutate func(*ManagerConfig)) (*manager, error) } func newTestManagerWithMetrics(t *testing.T, mutate func(*ManagerConfig)) (*manager, *prometheus.Registry, error) { + return newTestManagerWithRoutesAndMetrics(t, nil, mutate) +} + +func newTestManagerWithRoutesAndMetrics(t *testing.T, cr *routing.ConfiguredRoutes, mutate func(*ManagerConfig)) (*manager, *prometheus.Registry, error) { reg := prometheus.NewRegistry() cfg := &ManagerConfig{ Logger: newTestLogger(t), @@ -1582,7 +1593,7 @@ func newTestManagerWithMetrics(t *testing.T, mutate func(*ManagerConfig)) (*mana if mutate != nil { mutate(cfg) } - m, err := NewManager(t.Context(), cfg, nil) + m, err := NewManager(t.Context(), cfg, cr) return m, reg, err } @@ -1650,6 +1661,284 @@ func metricHasLabels(m *prom.Metric, labels prometheus.Labels) bool { return true } +func TestClient_Liveness_Manager_ReconcileRoutes_ReinstallsMissing(t *testing.T) { + t.Parallel() + + addCalls := 0 + mock := &MockRouteReaderWriter{ + RouteAddFunc: func(r *routing.Route) error { + addCalls++ + return nil + }, + RouteByProtocolFunc: func(int) ([]*routing.Route, error) { + // Return empty — no routes in kernel. + return nil, nil + }, + } + + m, reg, err := newTestManagerWithMetrics(t, func(cfg *ManagerConfig) { + cfg.Netlinker = mock + cfg.PassiveMode = true + cfg.RouteReconcileInterval = time.Hour // disable ticker; we call manually + }) + require.NoError(t, err) + t.Cleanup(func() { _ = m.Close() }) + + r := newTestRoute(nil) + err = m.RegisterRoute(r, "lo", m.LocalAddr().Port) + require.NoError(t, err) + + // RegisterRoute in PassiveMode calls RouteAdd once. + mock.mu.Lock() + addCalls = 0 + mock.mu.Unlock() + + m.reconcileRoutes() + + mock.mu.Lock() + require.Equal(t, 1, addCalls, "expected one RouteAdd call to reinstall the missing route") + mock.mu.Unlock() + + reinstalls := getCounterValue(t, reg, "doublezero_liveness_route_reinstalls_total", + prometheus.Labels{LabelIface: "lo", LabelLocalIP: r.Src.To4().String()}) + require.Equal(t, float64(1), reinstalls) +} + +func TestClient_Liveness_Manager_ReconcileRoutes_SkipsPresent(t *testing.T) { + t.Parallel() + + r := newTestRoute(nil) + addCalls := 0 + mock := &MockRouteReaderWriter{ + RouteAddFunc: func(rr *routing.Route) error { + addCalls++ + return nil + }, + RouteByProtocolFunc: func(int) ([]*routing.Route, error) { + // Return a freshly-constructed route with the same field values rather + // than the identical &r.Route pointer the manager installed. This is + // how the real netlink backend behaves (4-byte vs 16-byte net.IP, + // prefsrc echo, mask normalization), so it genuinely exercises + // kernelKey construction on both the kernel and desired sides instead + // of trivially matching by pointer identity. + return []*routing.Route{{ + Table: r.Table, + Src: net.IPv4(10, 4, 0, 1).To4(), + Dst: &net.IPNet{IP: net.IPv4(10, 4, 0, 11).To4(), Mask: net.CIDRMask(32, 32)}, + NextHop: net.IPv4(10, 5, 0, 1).To4(), + Protocol: unix.RTPROT_BGP, + }}, nil + }, + } + + m, _, err := newTestManagerWithMetrics(t, func(cfg *ManagerConfig) { + cfg.Netlinker = mock + cfg.PassiveMode = true + cfg.RouteReconcileInterval = time.Hour + }) + require.NoError(t, err) + t.Cleanup(func() { _ = m.Close() }) + + err = m.RegisterRoute(r, "lo", m.LocalAddr().Port) + require.NoError(t, err) + + // Reset after RegisterRoute's install. + mock.mu.Lock() + addCalls = 0 + mock.mu.Unlock() + + m.reconcileRoutes() + + mock.mu.Lock() + require.Equal(t, 0, addCalls, "should not reinstall a route that is present in the kernel") + mock.mu.Unlock() +} + +func TestClient_Liveness_Manager_ReconcileRoutes_SkipsUninstalled(t *testing.T) { + t.Parallel() + + addCalls := 0 + mock := &MockRouteReaderWriter{ + RouteAddFunc: func(r *routing.Route) error { + addCalls++ + return nil + }, + RouteByProtocolFunc: func(int) ([]*routing.Route, error) { + return nil, nil + }, + } + + // Active mode: route is registered but not installed until session goes Up. + m, _, err := newTestManagerWithMetrics(t, func(cfg *ManagerConfig) { + cfg.Netlinker = mock + cfg.PassiveMode = false + cfg.RouteReconcileInterval = time.Hour + }) + require.NoError(t, err) + t.Cleanup(func() { _ = m.Close() }) + + r := newTestRoute(func(r *Route) { + r.Src = net.IPv4(127, 0, 0, 1) + r.Dst = &net.IPNet{IP: net.IPv4(127, 0, 0, 2), Mask: net.CIDRMask(32, 32)} + }) + err = m.RegisterRoute(r, "lo", m.LocalAddr().Port) + require.NoError(t, err) + + // In active mode, installed[rk] is false until session goes Up. + mock.mu.Lock() + addCalls = 0 + mock.mu.Unlock() + + m.reconcileRoutes() + + mock.mu.Lock() + require.Equal(t, 0, addCalls, "should not reinstall a route that was never installed") + mock.mu.Unlock() +} + +func TestClient_Liveness_Manager_ReconcileRoutes_SkipsExcluded(t *testing.T) { + t.Parallel() + + r := newTestRoute(nil) + + // Configured routes that exclude this route's destination. In production the + // manager's Netlinker is a ConfiguredRouteReaderWriter whose RouteAdd is a + // silent no-op for excluded destinations, so they are never present in the + // kernel even though installed[rk] is marked true. Reconciliation must not + // treat them as missing. + dir := t.TempDir() + cfgPath := filepath.Join(dir, "routes.json") + require.NoError(t, os.WriteFile(cfgPath, []byte(`{"exclude":["`+r.Dst.IP.String()+`"]}`), 0o600)) + cfg, err := routing.NewConfiguredRoutes(cfgPath) + require.NoError(t, err) + + addCalls := 0 + mock := &MockRouteReaderWriter{ + RouteAddFunc: func(rr *routing.Route) error { + addCalls++ + return nil + }, + RouteByProtocolFunc: func(int) ([]*routing.Route, error) { + // Empty — no routes in kernel (the excluded route was never added). + return nil, nil + }, + } + + m, reg, err := newTestManagerWithRoutesAndMetrics(t, cfg, func(cfg *ManagerConfig) { + cfg.Netlinker = mock + cfg.PassiveMode = true + cfg.RouteReconcileInterval = time.Hour + }) + require.NoError(t, err) + t.Cleanup(func() { _ = m.Close() }) + + err = m.RegisterRoute(r, "lo", m.LocalAddr().Port) + require.NoError(t, err) + + // Reset after RegisterRoute's install. + mock.mu.Lock() + addCalls = 0 + mock.mu.Unlock() + + m.reconcileRoutes() + + mock.mu.Lock() + require.Equal(t, 0, addCalls, "should not reinstall an excluded route") + mock.mu.Unlock() + + reinstalls := getCounterValue(t, reg, "doublezero_liveness_route_reinstalls_total", + prometheus.Labels{LabelIface: "lo", LabelLocalIP: r.Src.To4().String()}) + require.Equal(t, float64(0), reinstalls, "excluded route must not increment the reinstall counter") +} + +func TestClient_Liveness_Manager_ReconcileRoutes_IncrementsInstallFailureMetric(t *testing.T) { + t.Parallel() + + addShouldFail := false + mock := &MockRouteReaderWriter{ + RouteAddFunc: func(r *routing.Route) error { + if addShouldFail { + return errors.New("boom") + } + return nil + }, + RouteByProtocolFunc: func(int) ([]*routing.Route, error) { + // Empty — route is missing from the kernel. + return nil, nil + }, + } + + m, reg, err := newTestManagerWithMetrics(t, func(cfg *ManagerConfig) { + cfg.Netlinker = mock + cfg.PassiveMode = true + cfg.RouteReconcileInterval = time.Hour + }) + require.NoError(t, err) + t.Cleanup(func() { _ = m.Close() }) + + r := newTestRoute(nil) + err = m.RegisterRoute(r, "lo", m.LocalAddr().Port) + require.NoError(t, err) + + // Make the reconcile-time RouteAdd fail. + mock.mu.Lock() + addShouldFail = true + mock.mu.Unlock() + + m.reconcileRoutes() + + failures := getCounterValue(t, reg, "doublezero_liveness_route_install_failures_total", + prometheus.Labels{LabelIface: "lo", LabelLocalIP: r.Src.To4().String()}) + require.Equal(t, float64(1), failures, "a failed reinstall must increment the install failure metric") + + reinstalls := getCounterValue(t, reg, "doublezero_liveness_route_reinstalls_total", + prometheus.Labels{LabelIface: "lo", LabelLocalIP: r.Src.To4().String()}) + require.Equal(t, float64(0), reinstalls, "a failed reinstall must not increment the reinstall counter") +} + +// TestClient_Liveness_Manager_WithdrawRoute_PassiveClearsInstalledBeforeDelete +// guards the ordering the reconcile resurrection-race fix depends on: in passive +// mode WithdrawRoute must clear installed[rk] under the lock *before* issuing the +// kernel RouteDelete. If it deleted first (the old ordering), reconcile could +// observe the route missing from the kernel while installed[rk] was still true +// and resurrect a route the manager believed was withdrawn. +func TestClient_Liveness_Manager_WithdrawRoute_PassiveClearsInstalledBeforeDelete(t *testing.T) { + t.Parallel() + + r := newTestRoute(nil) + rk := routeKeyFor("lo", r) + + var installedAtDelete bool + var deleteCalled bool + var mgr *manager + mock := &MockRouteReaderWriter{ + RouteDeleteFunc: func(*routing.Route) error { + deleteCalled = true + installedAtDelete = mgr.IsInstalled(rk) + return nil + }, + } + + m, err := newTestManager(t, func(cfg *ManagerConfig) { + cfg.Netlinker = mock + cfg.PassiveMode = true + }) + require.NoError(t, err) + t.Cleanup(func() { _ = m.Close() }) + mgr = m + + err = m.RegisterRoute(r, "lo", m.LocalAddr().Port) + require.NoError(t, err) + require.True(t, mgr.IsInstalled(rk), "route should be installed after RegisterRoute in passive mode") + + err = m.WithdrawRoute(r, "lo") + require.NoError(t, err) + + require.True(t, deleteCalled, "passive WithdrawRoute must issue a kernel delete") + require.False(t, installedAtDelete, "installed[rk] must be cleared before the kernel RouteDelete") + require.False(t, mgr.IsInstalled(rk), "route should not be installed after WithdrawRoute") +} + func getHistogramCount(t *testing.T, reg *prometheus.Registry, name string, labels prometheus.Labels) float64 { t.Helper() diff --git a/client/doublezerod/internal/liveness/metrics.go b/client/doublezerod/internal/liveness/metrics.go index d1e9e31842..4aff9a6a1e 100644 --- a/client/doublezerod/internal/liveness/metrics.go +++ b/client/doublezerod/internal/liveness/metrics.go @@ -42,6 +42,7 @@ type Metrics struct { DesiredMapSize prometheus.Gauge PeerSessions *prometheus.GaugeVec PeerDetectTime *prometheus.GaugeVec + RouteReinstalls *prometheus.CounterVec } var ( @@ -212,6 +213,13 @@ func newMetrics() *Metrics { Help: "Size of the desired map", }, ), + RouteReinstalls: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "doublezero_liveness_route_reinstalls_total", + Help: "Count of routes reinstalled after being removed from the kernel by an external process", + }, + serviceLabels, + ), } } @@ -241,6 +249,7 @@ func (m *Metrics) Register(r prometheus.Registerer) { m.DesiredMapSize, m.PeerSessions, m.PeerDetectTime, + m.RouteReinstalls, ) } @@ -296,6 +305,10 @@ func (m *Metrics) convergenceToUp(peer Peer, convergence time.Duration) { m.ConvergenceToUp.WithLabelValues(peer.Interface, peer.LocalIP).Observe(convergence.Seconds()) } +func (m *Metrics) routeReinstall(iface, localIP string) { + m.RouteReinstalls.WithLabelValues(iface, localIP).Inc() +} + func (m *Metrics) convergenceToDown(peer Peer, convergence time.Duration) { m.ConvergenceToDown.WithLabelValues(peer.Interface, peer.LocalIP).Observe(convergence.Seconds()) } diff --git a/client/doublezerod/internal/routing/netlink.go b/client/doublezerod/internal/routing/netlink.go index 5e774eabf7..197b1c8a39 100644 --- a/client/doublezerod/internal/routing/netlink.go +++ b/client/doublezerod/internal/routing/netlink.go @@ -142,6 +142,15 @@ func (n Netlink) RouteGet(ip net.IP) ([]*Route, error) { return routes, nil } +// RouteByProtocol returns IPv4 routes matching the given routing protocol. +// +// NOTE: only RT_FILTER_PROTOCOL is set, so vishvananda/netlink restricts the +// listing to the main routing table (RT_TABLE_MAIN); routes in other tables +// are skipped unless RT_FILTER_TABLE is also supplied. This is fine for the +// current callers (liveness route reconciliation runs only in IBRL mode, whose +// routes live in the main table). If this is ever used for edge-filtering +// routes in tables 100/101, add RT_FILTER_TABLE and list per desired table, +// otherwise those routes will never be returned. func (n Netlink) RouteByProtocol(protocol int) ([]*Route, error) { routeFilter := &nl.Route{ Protocol: nl.RouteProtocol(protocol), diff --git a/telemetry/migrations/isis_latest_views_test.go b/telemetry/migrations/isis_latest_views_test.go index 979ddd6028..31296e5777 100644 --- a/telemetry/migrations/isis_latest_views_test.go +++ b/telemetry/migrations/isis_latest_views_test.go @@ -38,19 +38,23 @@ func TestIsisLatestViews_LastSeenPerNetworkInstance(t *testing.T) { scrapeA := time.Now().UTC().Add(-time.Hour).Truncate(time.Second) scrapeB := scrapeA.Add(time.Minute) - mustExec(t, db, ` - INSERT INTO isis_global_state (timestamp, device_pubkey, network_instance, instance, net, level_capability) VALUES - (?, ?, 'default', 'default', '49.0001.0000.0000.0001.00', 'LEVEL_2'), - (?, ?, 'vrf1', 'vrf1', '49.0002.0000.0000.0001.00', 'LEVEL_2'), - (?, ?, 'default', 'default', '49.0001.0000.0000.0001.00', 'LEVEL_2') - `, scrapeA, device, scrapeA, device, scrapeB, device) - - mustExec(t, db, ` - INSERT INTO isis_overload_bit (timestamp, device_pubkey, network_instance, overload_bit) VALUES - (?, ?, 'default', false), - (?, ?, 'vrf1', true), - (?, ?, 'default', false) - `, scrapeA, device, scrapeA, device, scrapeB, device) + // Insert one row per statement. The clickhouse-go database/sql driver only + // reliably binds placeholders for single-row INSERTs; a multi-row VALUES + // list with placeholders can silently drop rows, leaving the latest views + // empty. + const insertGlobalState = ` + INSERT INTO isis_global_state (timestamp, device_pubkey, network_instance, instance, net, level_capability) + VALUES (?, ?, ?, ?, ?, ?)` + mustExec(t, db, insertGlobalState, scrapeA, device, "default", "default", "49.0001.0000.0000.0001.00", "LEVEL_2") + mustExec(t, db, insertGlobalState, scrapeA, device, "vrf1", "vrf1", "49.0002.0000.0000.0001.00", "LEVEL_2") + mustExec(t, db, insertGlobalState, scrapeB, device, "default", "default", "49.0001.0000.0000.0001.00", "LEVEL_2") + + const insertOverloadBit = ` + INSERT INTO isis_overload_bit (timestamp, device_pubkey, network_instance, overload_bit) + VALUES (?, ?, ?, ?)` + mustExec(t, db, insertOverloadBit, scrapeA, device, "default", false) + mustExec(t, db, insertOverloadBit, scrapeA, device, "vrf1", true) + mustExec(t, db, insertOverloadBit, scrapeB, device, "default", false) t.Run("isis_global_state_latest", func(t *testing.T) { type row struct {