Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions cmd/ion-node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,12 +552,22 @@ func runStatus(cCtx *cli.Context) error {
if err != nil {
return err
}
dids, err := st.CountDIDs()
if err != nil {
return err
}
pending, err := st.RetryablePendingCount()
if err != nil {
return err
}
committed := "none"
if ok {
committed = fmt.Sprintf("%d", w)
}
fmt.Printf("network=%s committedHeight=%s indexedAnchors=%d datadir=%s\n",
cfg.Network, committed, len(anchors), cfg.DataDir)
// dids is the count this node has projected; it can undercount while
// pendingRetryable > 0 (anchored content not yet fetched from IPFS).
fmt.Printf("network=%s committedHeight=%s indexedAnchors=%d dids=%d pendingRetryable=%d datadir=%s\n",
cfg.Network, committed, len(anchors), dids, pending, cfg.DataDir)
return nil
}

Expand Down Expand Up @@ -690,7 +700,7 @@ func runServe(cCtx *cli.Context) error {
defer casClient.Close()

resolver := resolve.New(st, casClient, cfg.Method)
handler := api.NewHandler(resolver, logger)
handler := api.NewHandler(resolver, logger).WithStats(st) // GET /stats: dids, pendingRetryable, heights
srv := api.NewServer(cCtx.String("http-addr"), handler.Mux())

ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
Expand Down Expand Up @@ -722,6 +732,12 @@ func printStatus(n *node) {
n.logger.Error("status", "err", err)
return
}
// Surface the two most common silent degradations alongside progress: a peer
// count drifting toward zero (sync stall) and a growing retryable-pending set
// (IPFS unavailable). dids is best-effort visibility (cheap COUNT).
dids, _ := n.store.CountDIDs()
pending, _ := n.store.RetryablePendingCount()
n.logger.Info("indexer status",
"tipHeight", s.TipHeight, "stableFrontier", s.StableFrontier, "committedHeight", s.CommittedHeight)
"tipHeight", s.TipHeight, "stableFrontier", s.StableFrontier, "committedHeight", s.CommittedHeight,
"peers", n.client.PeerCount(), "dids", dids, "pendingRetryable", pending)
}
2 changes: 1 addition & 1 deletion docs/hardening-backlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ All items below survived an independent skeptic re-reading the cited code.
| 11 | 🟡 medium | small | reliability | `merkle-verify-failure-permanently-stalls-indexer` | open |
| 12 | 🟡 medium | medium | data-integrity | `retry-double-anchor-double-pay` | open |
| 13 | 🟡 medium | medium | data-integrity | `writer-check-failopen-permanent` | open |
| 14 | 🟡 medium | small | operational | `no-metrics-no-pending-growth-visibility` | open |
| 14 | 🟡 medium | small | operational | `no-metrics-no-pending-growth-visibility` | ✅ done (#103) |
| 15 | 🟡 medium | trivial | correctness | `scan-operation-count-non-canonical-accepted` | ✅ done (#99) |
| 16 | 🟡 medium | medium | operational | `concurrency-flag-dead-block-download-sequential` | open |
| 17 | 🟡 medium | small | test-gap | `test-gap-cas-error-classification` | ✅ done (#87) |
Expand Down
54 changes: 53 additions & 1 deletion internal/api/resolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,19 @@ type DIDResolver interface {
Resolve(did string) resolve.Result
}

// StatsProvider exposes node counts for the /stats endpoint (the projection store
// satisfies it). All methods are cheap counts.
type StatsProvider interface {
CountDIDs() (int, error)
RetryablePendingCount() (int, error)
Watermark() (height int32, ok bool, err error)
ProcessedWatermark() (height int32, ok bool, err error)
}

// Handler serves the DID resolution API.
type Handler struct {
resolver DIDResolver
stats StatsProvider // optional; when set, enables GET /stats
log *slog.Logger
}

Expand All @@ -71,14 +81,56 @@ func NewHandler(r DIDResolver, log *slog.Logger) *Handler {
return &Handler{resolver: r, log: log}
}

// Mux returns the routed HTTP handler: the resolution endpoint plus /health.
// WithStats enables the GET /stats endpoint backed by s. Returns the handler for
// chaining. This keeps agent-native parity: the DID/pending counts an operator reads
// from the `status` CLI are also machine-readable over HTTP.
func (h *Handler) WithStats(s StatsProvider) *Handler {
h.stats = s
return h
}

// Mux returns the routed HTTP handler: the resolution endpoint, /health, and (when a
// stats provider is configured) /stats.
func (h *Handler) Mux() *http.ServeMux {
mux := http.NewServeMux()
mux.HandleFunc(identifiersPrefix, h.handleResolve)
mux.HandleFunc("/health", h.handleHealth)
if h.stats != nil {
mux.HandleFunc("/stats", h.handleStats)
}
return mux
}

func (h *Handler) handleStats(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
writeJSON(w, http.StatusMethodNotAllowed, map[string]any{"error": "methodNotAllowed"})
return
}
dids, err := h.stats.CountDIDs()
if err != nil {
h.log.Error("stats: count dids", "err", err)
writeJSON(w, http.StatusInternalServerError, map[string]any{"error": "internalError"})
return
}
pending, err := h.stats.RetryablePendingCount()
if err != nil {
h.log.Error("stats: pending count", "err", err)
writeJSON(w, http.StatusInternalServerError, map[string]any{"error": "internalError"})
return
}
committed, _, _ := h.stats.Watermark()
processed, _, _ := h.stats.ProcessedWatermark()
writeJSON(w, http.StatusOK, map[string]any{
"dids": dids,
"pendingRetryable": pending,
"committedHeight": committed,
"processedHeight": processed,
// dids reflects locally-resolved content; it can undercount while
// pendingRetryable > 0 (anchored content not yet fetched).
"fullyResolved": pending == 0,
})
}

func (h *Handler) handleResolve(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
writeJSON(w, http.StatusMethodNotAllowed, map[string]any{
Expand Down
54 changes: 54 additions & 0 deletions internal/api/resolution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,3 +239,57 @@ func TestHTTPHealth(t *testing.T) {
t.Errorf("health = %d %v, want 200 ok", status, body)
}
}

// fakeStats is a canned StatsProvider for the /stats endpoint test.
type fakeStats struct {
dids, pending int
committed int32
processed int32
}

func (f fakeStats) CountDIDs() (int, error) { return f.dids, nil }
func (f fakeStats) RetryablePendingCount() (int, error) { return f.pending, nil }
func (f fakeStats) Watermark() (int32, bool, error) { return f.committed, true, nil }
func (f fakeStats) ProcessedWatermark() (int32, bool, error) { return f.processed, true, nil }

// TestStatsEndpoint verifies GET /stats reports the node counts (agent-native parity
// with the `status` CLI), and is only registered when a StatsProvider is configured.
func TestStatsEndpoint(t *testing.T) {
h := NewHandler(fakeResolver{}, nil).WithStats(fakeStats{dids: 42, pending: 3, committed: 100, processed: 98})
srv := httptest.NewServer(h.Mux())
defer srv.Close()

resp, err := http.Get(srv.URL + "/stats")
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Fatalf("GET /stats status = %d, want 200", resp.StatusCode)
}
var body map[string]any
if err := json.NewDecoder(resp.Body).Decode(&body); err != nil {
t.Fatal(err)
}
if got := body["dids"]; got != float64(42) {
t.Errorf("dids = %v, want 42", got)
}
if got := body["pendingRetryable"]; got != float64(3) {
t.Errorf("pendingRetryable = %v, want 3", got)
}
if got := body["fullyResolved"]; got != false {
t.Errorf("fullyResolved = %v, want false (pending > 0)", got)
}

// Without a StatsProvider, /stats is not registered (404).
plain := httptest.NewServer(NewHandler(fakeResolver{}, nil).Mux())
defer plain.Close()
r2, err := http.Get(plain.URL + "/stats")
if err != nil {
t.Fatal(err)
}
defer r2.Body.Close()
if r2.StatusCode != http.StatusNotFound {
t.Errorf("GET /stats without a provider = %d, want 404", r2.StatusCode)
}
}
8 changes: 8 additions & 0 deletions internal/p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ func (c *Client) pickPeer() *peer.Peer {
return nil
}

// PeerCount returns the number of currently-connected peers — operator visibility
// for the most common silent degradation (drifting toward zero peers).
func (c *Client) PeerCount() int {
c.mu.Lock()
defer c.mu.Unlock()
return len(c.peers)
}

// connectedGroups returns the net-group histogram of current peers (for diversity).
func (c *Client) connectedGroups() map[string]int {
c.mu.Lock()
Expand Down
25 changes: 25 additions & 0 deletions internal/store/bbolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,31 @@ func (s *boltStore) RetryablePendingCount() (int, error) {
return n, err
}

func (s *boltStore) CountDIDs() (int, error) {
n := 0
err := s.db.View(func(tx *bolt.Tx) error {
// bucketDIDFwd keys are suffix||0x1f||txnum, sorted, so all entries for one
// suffix are contiguous — count a distinct suffix each time the prefix changes.
c := tx.Bucket(bucketDIDFwd).Cursor()
var last []byte
have := false
for k, _ := c.First(); k != nil; k, _ = c.Next() {
sep := bytes.IndexByte(k, didSep[0])
if sep < 0 {
continue // malformed key; skip defensively
}
suffix := k[:sep]
if !have || !bytes.Equal(suffix, last) {
n++
last = append(last[:0], suffix...)
have = true
}
}
return nil
})
return n, err
}

func (s *boltStore) ResolvePending(txnum uint64, entries []DIDIndexEntry) error {
return s.db.Update(func(tx *bolt.Tx) error {
if err := tx.Bucket(bucketPending).Delete(u64be(txnum)); err != nil {
Expand Down
8 changes: 8 additions & 0 deletions internal/store/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,14 @@ func (s *sqliteStore) RetryablePendingCount() (int, error) {
return n, nil
}

func (s *sqliteStore) CountDIDs() (int, error) {
var n int
if err := s.db.QueryRow(`SELECT COUNT(DISTINCT suffix) FROM did_anchors`).Scan(&n); err != nil {
return 0, fmt.Errorf("store: count dids: %w", err)
}
return n, nil
}

func (s *sqliteStore) ResolvePending(txnum uint64, entries []DIDIndexEntry) error {
tx, err := s.db.Begin()
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions internal/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@ type Store interface {
// caveat; it is a cheap COUNT, not a full PendingAnchors scan.
RetryablePendingCount() (int, error)

// CountDIDs returns the number of distinct DID suffixes in the projection — the
// count of DIDs this node has indexed and projected up to the processed
// watermark. It reflects only locally-resolved content, so it can undercount
// while RetryablePendingCount() > 0 (anchored content not yet fetched). Used for
// operator visibility (status / the /stats endpoint).
CountDIDs() (int, error)

// ResolvePending splices a now-available anchor's DID entries into the
// projection at txnum and removes the pending entry, in one transaction. It
// does NOT move the processed watermark (the height was already accounted for
Expand Down
32 changes: 32 additions & 0 deletions internal/store/store_did_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,3 +231,35 @@ func TestDIDProjectionParity(t *testing.T) {
}
}
}

// TestCountDIDs verifies CountDIDs returns the number of DISTINCT suffixes (not the
// number of did_anchors rows — a DID with several operations counts once), across
// both engines, and that it stays reorg-coupled with the projection.
func TestCountDIDs(t *testing.T) {
eachEngine(t, func(t *testing.T, s Store) {
if n, err := s.CountDIDs(); err != nil || n != 0 {
t.Fatalf("CountDIDs on empty = %d, %v; want 0", n, err)
}
// didA has two ops, didB and didC one each → 3 distinct DIDs, 4 rows.
entries := []DIDIndexEntry{
mkEntry("didA", 5, 0),
mkEntry("didB", 5, 1),
mkEntry("didA", 6, 0),
mkEntry("didC", 7, 0),
}
if err := s.CommitProcessed(5, 7, entries, nil); err != nil {
t.Fatalf("CommitProcessed: %v", err)
}
if n, err := s.CountDIDs(); err != nil || n != 3 {
t.Fatalf("CountDIDs = %d, %v; want 3 distinct suffixes", n, err)
}
// A reorg that rolls back height 6+ drops didA's height-6 op (didA still has
// its height-5 op) and didC entirely → 2 distinct DIDs remain.
if err := s.Rollback(6); err != nil {
t.Fatalf("Rollback: %v", err)
}
if n, err := s.CountDIDs(); err != nil || n != 2 {
t.Errorf("CountDIDs after Rollback(6) = %d, %v; want 2 (didA, didB)", n, err)
}
})
}
Loading