diff --git a/cmd/ion-node/main.go b/cmd/ion-node/main.go index 3cda0be..12035a4 100644 --- a/cmd/ion-node/main.go +++ b/cmd/ion-node/main.go @@ -552,12 +552,22 @@ func runStatus(cCtx *cli.Context) error { if err != nil { return err } + dids, err := st.CountDIDs() + if err != nil { + return err + } + pending, err := st.RetryablePendingCount() + if err != nil { + return err + } committed := "none" if ok { committed = fmt.Sprintf("%d", w) } - fmt.Printf("network=%s committedHeight=%s indexedAnchors=%d datadir=%s\n", - cfg.Network, committed, len(anchors), cfg.DataDir) + // dids is the count this node has projected; it can undercount while + // pendingRetryable > 0 (anchored content not yet fetched from IPFS). + fmt.Printf("network=%s committedHeight=%s indexedAnchors=%d dids=%d pendingRetryable=%d datadir=%s\n", + cfg.Network, committed, len(anchors), dids, pending, cfg.DataDir) return nil } @@ -690,7 +700,7 @@ func runServe(cCtx *cli.Context) error { defer casClient.Close() resolver := resolve.New(st, casClient, cfg.Method) - handler := api.NewHandler(resolver, logger) + handler := api.NewHandler(resolver, logger).WithStats(st) // GET /stats: dids, pendingRetryable, heights srv := api.NewServer(cCtx.String("http-addr"), handler.Mux()) ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) @@ -722,6 +732,12 @@ func printStatus(n *node) { n.logger.Error("status", "err", err) return } + // Surface the two most common silent degradations alongside progress: a peer + // count drifting toward zero (sync stall) and a growing retryable-pending set + // (IPFS unavailable). dids is best-effort visibility (cheap COUNT). + dids, _ := n.store.CountDIDs() + pending, _ := n.store.RetryablePendingCount() n.logger.Info("indexer status", - "tipHeight", s.TipHeight, "stableFrontier", s.StableFrontier, "committedHeight", s.CommittedHeight) + "tipHeight", s.TipHeight, "stableFrontier", s.StableFrontier, "committedHeight", s.CommittedHeight, + "peers", n.client.PeerCount(), "dids", dids, "pendingRetryable", pending) } diff --git a/docs/hardening-backlog.md b/docs/hardening-backlog.md index 8ff26e9..eb8a117 100644 --- a/docs/hardening-backlog.md +++ b/docs/hardening-backlog.md @@ -41,7 +41,7 @@ All items below survived an independent skeptic re-reading the cited code. | 11 | 🟡 medium | small | reliability | `merkle-verify-failure-permanently-stalls-indexer` | open | | 12 | 🟡 medium | medium | data-integrity | `retry-double-anchor-double-pay` | open | | 13 | 🟡 medium | medium | data-integrity | `writer-check-failopen-permanent` | open | -| 14 | 🟡 medium | small | operational | `no-metrics-no-pending-growth-visibility` | open | +| 14 | 🟡 medium | small | operational | `no-metrics-no-pending-growth-visibility` | ✅ done (#103) | | 15 | 🟡 medium | trivial | correctness | `scan-operation-count-non-canonical-accepted` | ✅ done (#99) | | 16 | 🟡 medium | medium | operational | `concurrency-flag-dead-block-download-sequential` | open | | 17 | 🟡 medium | small | test-gap | `test-gap-cas-error-classification` | ✅ done (#87) | diff --git a/internal/api/resolution.go b/internal/api/resolution.go index 40362a8..8c163e2 100644 --- a/internal/api/resolution.go +++ b/internal/api/resolution.go @@ -57,9 +57,19 @@ type DIDResolver interface { Resolve(did string) resolve.Result } +// StatsProvider exposes node counts for the /stats endpoint (the projection store +// satisfies it). All methods are cheap counts. +type StatsProvider interface { + CountDIDs() (int, error) + RetryablePendingCount() (int, error) + Watermark() (height int32, ok bool, err error) + ProcessedWatermark() (height int32, ok bool, err error) +} + // Handler serves the DID resolution API. type Handler struct { resolver DIDResolver + stats StatsProvider // optional; when set, enables GET /stats log *slog.Logger } @@ -71,14 +81,56 @@ func NewHandler(r DIDResolver, log *slog.Logger) *Handler { return &Handler{resolver: r, log: log} } -// Mux returns the routed HTTP handler: the resolution endpoint plus /health. +// WithStats enables the GET /stats endpoint backed by s. Returns the handler for +// chaining. This keeps agent-native parity: the DID/pending counts an operator reads +// from the `status` CLI are also machine-readable over HTTP. +func (h *Handler) WithStats(s StatsProvider) *Handler { + h.stats = s + return h +} + +// Mux returns the routed HTTP handler: the resolution endpoint, /health, and (when a +// stats provider is configured) /stats. func (h *Handler) Mux() *http.ServeMux { mux := http.NewServeMux() mux.HandleFunc(identifiersPrefix, h.handleResolve) mux.HandleFunc("/health", h.handleHealth) + if h.stats != nil { + mux.HandleFunc("/stats", h.handleStats) + } return mux } +func (h *Handler) handleStats(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + writeJSON(w, http.StatusMethodNotAllowed, map[string]any{"error": "methodNotAllowed"}) + return + } + dids, err := h.stats.CountDIDs() + if err != nil { + h.log.Error("stats: count dids", "err", err) + writeJSON(w, http.StatusInternalServerError, map[string]any{"error": "internalError"}) + return + } + pending, err := h.stats.RetryablePendingCount() + if err != nil { + h.log.Error("stats: pending count", "err", err) + writeJSON(w, http.StatusInternalServerError, map[string]any{"error": "internalError"}) + return + } + committed, _, _ := h.stats.Watermark() + processed, _, _ := h.stats.ProcessedWatermark() + writeJSON(w, http.StatusOK, map[string]any{ + "dids": dids, + "pendingRetryable": pending, + "committedHeight": committed, + "processedHeight": processed, + // dids reflects locally-resolved content; it can undercount while + // pendingRetryable > 0 (anchored content not yet fetched). + "fullyResolved": pending == 0, + }) +} + func (h *Handler) handleResolve(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeJSON(w, http.StatusMethodNotAllowed, map[string]any{ diff --git a/internal/api/resolution_test.go b/internal/api/resolution_test.go index 05df73d..9c09d43 100644 --- a/internal/api/resolution_test.go +++ b/internal/api/resolution_test.go @@ -239,3 +239,57 @@ func TestHTTPHealth(t *testing.T) { t.Errorf("health = %d %v, want 200 ok", status, body) } } + +// fakeStats is a canned StatsProvider for the /stats endpoint test. +type fakeStats struct { + dids, pending int + committed int32 + processed int32 +} + +func (f fakeStats) CountDIDs() (int, error) { return f.dids, nil } +func (f fakeStats) RetryablePendingCount() (int, error) { return f.pending, nil } +func (f fakeStats) Watermark() (int32, bool, error) { return f.committed, true, nil } +func (f fakeStats) ProcessedWatermark() (int32, bool, error) { return f.processed, true, nil } + +// TestStatsEndpoint verifies GET /stats reports the node counts (agent-native parity +// with the `status` CLI), and is only registered when a StatsProvider is configured. +func TestStatsEndpoint(t *testing.T) { + h := NewHandler(fakeResolver{}, nil).WithStats(fakeStats{dids: 42, pending: 3, committed: 100, processed: 98}) + srv := httptest.NewServer(h.Mux()) + defer srv.Close() + + resp, err := http.Get(srv.URL + "/stats") + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + t.Fatalf("GET /stats status = %d, want 200", resp.StatusCode) + } + var body map[string]any + if err := json.NewDecoder(resp.Body).Decode(&body); err != nil { + t.Fatal(err) + } + if got := body["dids"]; got != float64(42) { + t.Errorf("dids = %v, want 42", got) + } + if got := body["pendingRetryable"]; got != float64(3) { + t.Errorf("pendingRetryable = %v, want 3", got) + } + if got := body["fullyResolved"]; got != false { + t.Errorf("fullyResolved = %v, want false (pending > 0)", got) + } + + // Without a StatsProvider, /stats is not registered (404). + plain := httptest.NewServer(NewHandler(fakeResolver{}, nil).Mux()) + defer plain.Close() + r2, err := http.Get(plain.URL + "/stats") + if err != nil { + t.Fatal(err) + } + defer r2.Body.Close() + if r2.StatusCode != http.StatusNotFound { + t.Errorf("GET /stats without a provider = %d, want 404", r2.StatusCode) + } +} diff --git a/internal/p2p/peer.go b/internal/p2p/peer.go index a64c423..3f4e79f 100644 --- a/internal/p2p/peer.go +++ b/internal/p2p/peer.go @@ -134,6 +134,14 @@ func (c *Client) pickPeer() *peer.Peer { return nil } +// PeerCount returns the number of currently-connected peers — operator visibility +// for the most common silent degradation (drifting toward zero peers). +func (c *Client) PeerCount() int { + c.mu.Lock() + defer c.mu.Unlock() + return len(c.peers) +} + // connectedGroups returns the net-group histogram of current peers (for diversity). func (c *Client) connectedGroups() map[string]int { c.mu.Lock() diff --git a/internal/store/bbolt.go b/internal/store/bbolt.go index 9105b76..0942dd3 100644 --- a/internal/store/bbolt.go +++ b/internal/store/bbolt.go @@ -220,6 +220,31 @@ func (s *boltStore) RetryablePendingCount() (int, error) { return n, err } +func (s *boltStore) CountDIDs() (int, error) { + n := 0 + err := s.db.View(func(tx *bolt.Tx) error { + // bucketDIDFwd keys are suffix||0x1f||txnum, sorted, so all entries for one + // suffix are contiguous — count a distinct suffix each time the prefix changes. + c := tx.Bucket(bucketDIDFwd).Cursor() + var last []byte + have := false + for k, _ := c.First(); k != nil; k, _ = c.Next() { + sep := bytes.IndexByte(k, didSep[0]) + if sep < 0 { + continue // malformed key; skip defensively + } + suffix := k[:sep] + if !have || !bytes.Equal(suffix, last) { + n++ + last = append(last[:0], suffix...) + have = true + } + } + return nil + }) + return n, err +} + func (s *boltStore) ResolvePending(txnum uint64, entries []DIDIndexEntry) error { return s.db.Update(func(tx *bolt.Tx) error { if err := tx.Bucket(bucketPending).Delete(u64be(txnum)); err != nil { diff --git a/internal/store/sqlite.go b/internal/store/sqlite.go index 9e4be44..3af7f8d 100644 --- a/internal/store/sqlite.go +++ b/internal/store/sqlite.go @@ -284,6 +284,14 @@ func (s *sqliteStore) RetryablePendingCount() (int, error) { return n, nil } +func (s *sqliteStore) CountDIDs() (int, error) { + var n int + if err := s.db.QueryRow(`SELECT COUNT(DISTINCT suffix) FROM did_anchors`).Scan(&n); err != nil { + return 0, fmt.Errorf("store: count dids: %w", err) + } + return n, nil +} + func (s *sqliteStore) ResolvePending(txnum uint64, entries []DIDIndexEntry) error { tx, err := s.db.Begin() if err != nil { diff --git a/internal/store/store.go b/internal/store/store.go index a02524c..8b0b106 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -141,6 +141,13 @@ type Store interface { // caveat; it is a cheap COUNT, not a full PendingAnchors scan. RetryablePendingCount() (int, error) + // CountDIDs returns the number of distinct DID suffixes in the projection — the + // count of DIDs this node has indexed and projected up to the processed + // watermark. It reflects only locally-resolved content, so it can undercount + // while RetryablePendingCount() > 0 (anchored content not yet fetched). Used for + // operator visibility (status / the /stats endpoint). + CountDIDs() (int, error) + // ResolvePending splices a now-available anchor's DID entries into the // projection at txnum and removes the pending entry, in one transaction. It // does NOT move the processed watermark (the height was already accounted for diff --git a/internal/store/store_did_test.go b/internal/store/store_did_test.go index 183ff71..acc8f98 100644 --- a/internal/store/store_did_test.go +++ b/internal/store/store_did_test.go @@ -231,3 +231,35 @@ func TestDIDProjectionParity(t *testing.T) { } } } + +// TestCountDIDs verifies CountDIDs returns the number of DISTINCT suffixes (not the +// number of did_anchors rows — a DID with several operations counts once), across +// both engines, and that it stays reorg-coupled with the projection. +func TestCountDIDs(t *testing.T) { + eachEngine(t, func(t *testing.T, s Store) { + if n, err := s.CountDIDs(); err != nil || n != 0 { + t.Fatalf("CountDIDs on empty = %d, %v; want 0", n, err) + } + // didA has two ops, didB and didC one each → 3 distinct DIDs, 4 rows. + entries := []DIDIndexEntry{ + mkEntry("didA", 5, 0), + mkEntry("didB", 5, 1), + mkEntry("didA", 6, 0), + mkEntry("didC", 7, 0), + } + if err := s.CommitProcessed(5, 7, entries, nil); err != nil { + t.Fatalf("CommitProcessed: %v", err) + } + if n, err := s.CountDIDs(); err != nil || n != 3 { + t.Fatalf("CountDIDs = %d, %v; want 3 distinct suffixes", n, err) + } + // A reorg that rolls back height 6+ drops didA's height-6 op (didA still has + // its height-5 op) and didC entirely → 2 distinct DIDs remain. + if err := s.Rollback(6); err != nil { + t.Fatalf("Rollback: %v", err) + } + if n, err := s.CountDIDs(); err != nil || n != 2 { + t.Errorf("CountDIDs after Rollback(6) = %d, %v; want 2 (didA, didB)", n, err) + } + }) +}