From 1bea375ea2f002222f0ea556ff775522c420511a Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Tue, 9 Jun 2026 14:19:35 +0000 Subject: [PATCH] feat(fastsync): /fastsync/anchors feed + concurrent (SQLite/WAL) bundle store (#115) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Completes Option B of the fast-sync overlay (epic #111): the verifiable anchor-bundle HTTP feed, plus the concurrency fix that unblocks it. - internal/fastsync/store.go: convert the bundle store from bbolt (single-process exclusive lock) to SQLite/WAL, so the `start` writer and the `serve` reader can open it concurrently — the same property anchors.db relies on. Same Put/Get/Range API, so #113's indexer wiring and tests are unchanged. Still fail-closed / no reorg rollback (a stale bundle fails the client's merkle check; re-index overwrites). - internal/fastsync/service.go: Service — the transport-agnostic serving core (shared with Option C #122). AnchorBundles(from,to) reads the store, clamped to a max height span + max bundle count. - internal/api: GET /fastsync/anchors?from=&to= → {"anchors":[{height,txIndex,rawTx hex,branch hex[]}]}, CORS-open. The client reconstructs each bundle and verifies its merkle proof against its OWN PoW headers — the source can only omit, never forge. - cmd/ion-node: --serve-fast-sync now both (a) makes `start` capture bundles (WithBundleWriter over the SQLite store) and (b) makes `serve` expose /fastsync/cas + /fastsync/anchors. node.close() closes the bundle store. To populate bundles in production, run `start --serve-fast-sync` (captures going forward; a re-index backfills history). Tests: Service range/order/bounds; two concurrent Store handles (the start/serve processes) read+write the same file (a bbolt store would fail the 2nd Open); and the /fastsync/anchors endpoint serialized a bundle that a client reconstructs and VERIFIES after the JSON round-trip (+ CORS, bad-range 400, route-absent 404). go test -race ./... green. Co-authored-by: Liran Cohen Co-authored-by: Claude Opus 4.8 (1M context) --- cmd/ion-node/main.go | 51 ++++++++++--- internal/api/resolution.go | 70 +++++++++++++++++- internal/api/resolution_test.go | 93 +++++++++++++++++++++++ internal/fastsync/service.go | 51 +++++++++++++ internal/fastsync/service_test.go | 89 ++++++++++++++++++++++ internal/fastsync/store.go | 118 +++++++++++++++++------------- 6 files changed, 409 insertions(+), 63 deletions(-) create mode 100644 internal/fastsync/service.go create mode 100644 internal/fastsync/service_test.go diff --git a/cmd/ion-node/main.go b/cmd/ion-node/main.go index 21fe851..eba29c6 100644 --- a/cmd/ion-node/main.go +++ b/cmd/ion-node/main.go @@ -27,6 +27,7 @@ import ( "github.com/13x-tech/ion-node/internal/cas" "github.com/13x-tech/ion-node/internal/chain" "github.com/13x-tech/ion-node/internal/config" + "github.com/13x-tech/ion-node/internal/fastsync" "github.com/13x-tech/ion-node/internal/fee" "github.com/13x-tech/ion-node/internal/index" "github.com/13x-tech/ion-node/internal/lock" @@ -296,15 +297,16 @@ func buildCAS(cfg *config.Config, logger *slog.Logger) (sidetree.CAS, error) { // node bundles the wired-up components. type node struct { - cfg *config.Config - logger *slog.Logger - hc *chain.HeaderChain - client *p2p.Client - store store.Store - idx *index.Indexer - txIdx *txindex.Index // resolver cache for writer/lock verification; nil when disabled - cas sidetree.CAS // cached external IPFS CAS; nil when --observe=false - obs *process.Observer // nil when --observe=false + cfg *config.Config + logger *slog.Logger + hc *chain.HeaderChain + client *p2p.Client + store store.Store + idx *index.Indexer + txIdx *txindex.Index // resolver cache for writer/lock verification; nil when disabled + bundles *fastsync.Store // fast-sync bundle capture; nil unless --serve-fast-sync + cas sidetree.CAS // cached external IPFS CAS; nil when --observe=false + obs *process.Observer // nil when --observe=false } // writerVerification bundles the bitcoind-free resolver wiring shared by the indexer @@ -383,8 +385,26 @@ func buildNode(cCtx *cli.Context) (*node, error) { logger.Info("writer verification enabled", "esplora", endpoint) } + // Fast-sync bundle capture (#115): when serving fast-sync, record a verifiable + // inclusion bundle for each anchor as it is indexed. SQLite/WAL so `serve` can + // read the bundles concurrently with this writer. + var bundleStore *fastsync.Store + if cfg.ServeFastSync { + bs, err := fastsync.Open(filepath.Join(cfg.DataDir, "bundles.db")) + if err != nil { + _ = st.Close() + if txIdx != nil { + _ = txIdx.Close() + } + return nil, fmt.Errorf("bundle store: %w", err) + } + bundleStore = bs + idxOpts = append(idxOpts, index.WithBundleWriter(bs)) + logger.Info("fast-sync bundle capture enabled", "db", "bundles.db") + } + idx := index.New(cfg, client, st, logger, idxOpts...) - n := &node{cfg: cfg, logger: logger, hc: hc, client: client, store: st, idx: idx, txIdx: txIdx} + n := &node{cfg: cfg, logger: logger, hc: hc, client: client, store: st, idx: idx, txIdx: txIdx, bundles: bundleStore} if cCtx.Bool("observe") { casClient, err := buildCAS(cfg, logger) @@ -428,6 +448,9 @@ func (n *node) close() { if n.txIdx != nil { _ = n.txIdx.Close() } + if n.bundles != nil { + _ = n.bundles.Close() + } _ = n.store.Close() } @@ -735,7 +758,13 @@ func runServe(cCtx *cli.Context) error { handler := api.NewHandler(resolver, logger).WithStats(st) // GET /stats: dids, pendingRetryable, heights if cfg.ServeFastSync { handler = handler.WithCASGateway(casClient) // GET /fastsync/cas/ - logger.Info("fast-sync content gateway enabled", "endpoint", "/fastsync/cas/") + bs, err := fastsync.Open(filepath.Join(cfg.DataDir, "bundles.db")) + if err != nil { + return fmt.Errorf("bundle store: %w", err) + } + defer bs.Close() + handler = handler.WithAnchorBundles(fastsync.NewService(bs)) // GET /fastsync/anchors + logger.Info("fast-sync gateway enabled", "endpoints", "/fastsync/cas/, /fastsync/anchors?from=&to=") } srv := api.NewServer(cCtx.String("http-addr"), handler.Mux()) diff --git a/internal/api/resolution.go b/internal/api/resolution.go index 149af65..563f7a3 100644 --- a/internal/api/resolution.go +++ b/internal/api/resolution.go @@ -7,15 +7,18 @@ package api import ( + "encoding/hex" "encoding/json" "errors" "log/slog" "net/http" "net/url" + "strconv" "strings" "time" "github.com/13x-tech/ion-node/internal/cas" + "github.com/13x-tech/ion-node/internal/fastsync" "github.com/13x-tech/ion-node/internal/resolve" ) @@ -75,6 +78,16 @@ type CASGateway interface { Get(cid string, maxSizeInBytes int) ([]byte, error) } +// AnchorBundleProvider returns verifiable anchor bundles for a height range +// (*fastsync.Service satisfies it). It backs GET /fastsync/anchors: the client +// verifies each bundle's merkle proof against its OWN PoW headers, so this source is +// untrusted — it can only omit, never forge. +type AnchorBundleProvider interface { + AnchorBundles(from, to int32) ([]fastsync.Bundle, error) +} + +const fastSyncAnchorsPath = "/fastsync/anchors" + // maxGatewayBlobBytes bounds a single content-gateway response. Sidetree's largest // file types are well under this; it guards against an unexpectedly huge blob. const maxGatewayBlobBytes = 1 << 22 // 4 MiB @@ -84,8 +97,9 @@ const fastSyncCASPrefix = "/fastsync/cas/" // Handler serves the DID resolution API. type Handler struct { resolver DIDResolver - stats StatsProvider // optional; when set, enables GET /stats - cas CASGateway // optional; when set, enables GET /fastsync/cas/ + stats StatsProvider // optional; when set, enables GET /stats + cas CASGateway // optional; when set, enables GET /fastsync/cas/ + anchors AnchorBundleProvider // optional; when set, enables GET /fastsync/anchors log *slog.Logger } @@ -113,6 +127,14 @@ func (h *Handler) WithCASGateway(g CASGateway) *Handler { return h } +// WithAnchorBundles enables GET /fastsync/anchors?from=&to=, the verifiable +// anchor-bundle feed for fast-sync bootstrap / browser light-clients. CORS-open; the +// bundles are self-proving against the client's own Bitcoin headers. +func (h *Handler) WithAnchorBundles(p AnchorBundleProvider) *Handler { + h.anchors = p + return h +} + // Mux returns the routed HTTP handler: the resolution endpoint, /health, and (when // configured) /stats and the /fastsync/cas/ content gateway. func (h *Handler) Mux() *http.ServeMux { @@ -125,9 +147,53 @@ func (h *Handler) Mux() *http.ServeMux { if h.cas != nil { mux.HandleFunc(fastSyncCASPrefix, h.handleFastSyncCAS) } + if h.anchors != nil { + mux.HandleFunc(fastSyncAnchorsPath, h.handleFastSyncAnchors) + } return mux } +// handleFastSyncAnchors serves verifiable anchor bundles for a height range as JSON. +// Each bundle carries the raw tx and a merkle branch; the client recomputes the txid +// and checks the branch against the block header's merkle root from its OWN verified +// header chain (the rawTx and branch hashes are hex of the raw wire bytes). +func (h *Handler) handleFastSyncAnchors(w http.ResponseWriter, r *http.Request) { + setGatewayCORS(w) + if r.Method == http.MethodOptions { + w.WriteHeader(http.StatusNoContent) + return + } + if r.Method != http.MethodGet { + writeJSON(w, http.StatusMethodNotAllowed, map[string]any{"error": "methodNotAllowed"}) + return + } + from, err1 := strconv.Atoi(r.URL.Query().Get("from")) + to, err2 := strconv.Atoi(r.URL.Query().Get("to")) + if err1 != nil || err2 != nil { + writeJSON(w, http.StatusBadRequest, map[string]any{"error": "invalidRange", "message": "from and to must be integer heights"}) + return + } + bundles, err := h.anchors.AnchorBundles(int32(from), int32(to)) + if err != nil { + writeJSON(w, http.StatusBadRequest, map[string]any{"error": "invalidRange", "message": err.Error()}) + return + } + out := make([]map[string]any, 0, len(bundles)) + for _, b := range bundles { + branch := make([]string, len(b.Branch)) + for i, hsh := range b.Branch { + branch[i] = hex.EncodeToString(hsh[:]) + } + out = append(out, map[string]any{ + "height": b.Height, + "txIndex": b.TxIndex, + "rawTx": hex.EncodeToString(b.RawTx), + "branch": branch, + }) + } + writeJSON(w, http.StatusOK, map[string]any{"anchors": out}) +} + // setGatewayCORS opens the fast-sync gateway to browser clients. The served data is // public and self-verifying (anchors prove inclusion against Bitcoin; CAS blobs are // content-addressed), so there is nothing to protect with a same-origin policy. diff --git a/internal/api/resolution_test.go b/internal/api/resolution_test.go index 0b7890a..c74de93 100644 --- a/internal/api/resolution_test.go +++ b/internal/api/resolution_test.go @@ -1,6 +1,8 @@ package api import ( + "bytes" + "encoding/hex" "encoding/json" "fmt" "io" @@ -9,7 +11,12 @@ import ( "testing" "time" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/txscript" + "github.com/btcsuite/btcd/wire" + "github.com/13x-tech/ion-node/internal/cas" + "github.com/13x-tech/ion-node/internal/fastsync" "github.com/13x-tech/ion-node/internal/resolve" "github.com/13x-tech/ion-sdk-go/pkg/did" ) @@ -359,3 +366,89 @@ func TestFastSyncCASGateway(t *testing.T) { t.Errorf("no-gateway route = %d, want 404", none.StatusCode) } } + +// fakeAnchorProvider returns its bundles for any valid range; errors on a bad one. +type fakeAnchorProvider []fastsync.Bundle + +func (f fakeAnchorProvider) AnchorBundles(from, to int32) ([]fastsync.Bundle, error) { + if from < 0 || to < from { + return nil, fmt.Errorf("bad range [%d,%d]", from, to) + } + return f, nil +} + +// TestFastSyncAnchorsEndpoint verifies GET /fastsync/anchors serializes bundles such +// that a client can reconstruct and VERIFY them after the JSON round-trip (the whole +// point of the feed), plus CORS, bad-range 400, and route-absent-without-provider. +func TestFastSyncAnchorsEndpoint(t *testing.T) { + // A single-tx "block": the branch is empty and the merkle root is the txid, so a + // client verifies the reconstructed bundle against that root. + mt := wire.NewMsgTx(1) + mt.AddTxIn(&wire.TxIn{PreviousOutPoint: wire.OutPoint{Index: 0}, SignatureScript: []byte{0x51}}) + mt.AddTxOut(wire.NewTxOut(1, []byte{txscript.OP_TRUE})) + var raw bytes.Buffer + if err := mt.SerializeNoWitness(&raw); err != nil { + t.Fatal(err) + } + txid := mt.TxHash() + prov := fakeAnchorProvider{{Height: 700001, TxIndex: 0, RawTx: raw.Bytes(), Branch: nil}} + srv := httptest.NewServer(NewHandler(fakeResolver{}, nil).WithAnchorBundles(prov).Mux()) + defer srv.Close() + + resp, err := http.Get(srv.URL + "/fastsync/anchors?from=700001&to=700001") + if err != nil { + t.Fatal(err) + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("GET anchors = %d, want 200", resp.StatusCode) + } + if resp.Header.Get("Access-Control-Allow-Origin") != "*" { + t.Error("missing CORS header") + } + var body struct { + Anchors []struct { + Height int32 `json:"height"` + TxIndex int `json:"txIndex"` + RawTx string `json:"rawTx"` + Branch []string `json:"branch"` + } `json:"anchors"` + } + if err := json.NewDecoder(resp.Body).Decode(&body); err != nil { + t.Fatal(err) + } + resp.Body.Close() + if len(body.Anchors) != 1 { + t.Fatalf("got %d anchors, want 1", len(body.Anchors)) + } + a := body.Anchors[0] + + // Reconstruct the bundle from the JSON and verify it — exactly what a client does. + rawTx, err := hex.DecodeString(a.RawTx) + if err != nil { + t.Fatalf("rawTx not hex: %v", err) + } + branch := make([]chainhash.Hash, len(a.Branch)) + for i, hx := range a.Branch { + b, _ := hex.DecodeString(hx) + copy(branch[i][:], b) + } + got := fastsync.Bundle{Height: a.Height, TxIndex: a.TxIndex, RawTx: rawTx, Branch: branch} + if vtxid, ok := got.Verify(txid); !ok || vtxid != txid { + t.Error("reconstructed bundle failed verification after the JSON round-trip") + } + + // Bad range → 400. + bad, _ := http.Get(srv.URL + "/fastsync/anchors?from=abc&to=10") + bad.Body.Close() + if bad.StatusCode != http.StatusBadRequest { + t.Errorf("bad range = %d, want 400", bad.StatusCode) + } + // No provider → route absent (404). + plain := httptest.NewServer(NewHandler(fakeResolver{}, nil).Mux()) + defer plain.Close() + none, _ := http.Get(plain.URL + "/fastsync/anchors?from=1&to=2") + none.Body.Close() + if none.StatusCode != http.StatusNotFound { + t.Errorf("no-provider route = %d, want 404", none.StatusCode) + } +} diff --git a/internal/fastsync/service.go b/internal/fastsync/service.go new file mode 100644 index 0000000..1dbf732 --- /dev/null +++ b/internal/fastsync/service.go @@ -0,0 +1,51 @@ +package fastsync + +import "fmt" + +const ( + // defaultMaxAnchorHeights bounds a single AnchorBundles request's height span. + defaultMaxAnchorHeights = 5000 + // maxBundlesPerResponse bounds a single response's bundle count. + maxBundlesPerResponse = 5000 +) + +// Service is the transport-agnostic fast-sync serving core (epic #111). Both the +// HTTP gateway (#115) and the native P2P transport (#122) call into it, so the +// bounds and trust posture live in one place. It reads verifiable anchor bundles +// from the store; the caller (client) verifies each against its OWN PoW header chain. +type Service struct { + store *Store + maxHeights int32 +} + +// NewService builds a fast-sync service over the given bundle store. +func NewService(store *Store) *Service { + return &Service{store: store, maxHeights: defaultMaxAnchorHeights} +} + +// AnchorBundles returns the verifiable bundles whose confirming height is in +// [from, to], clamped to the configured max height span and a max bundle count so a +// single request can't pull an unbounded amount. The bundles are ordered by +// transaction number (height then tx index). +func (s *Service) AnchorBundles(from, to int32) ([]Bundle, error) { + if from < 0 { + return nil, fmt.Errorf("fastsync: negative from height %d", from) + } + if to < from { + return nil, fmt.Errorf("fastsync: to height %d below from height %d", to, from) + } + if to-from > s.maxHeights { + to = from + s.maxHeights + } + fromTxnum := uint64(uint32(from)) << 32 + toTxnum := uint64(uint32(to))<<32 | 0xffffffff + out := make([]Bundle, 0, 64) + err := s.store.Range(fromTxnum, toTxnum, func(_ uint64, b Bundle) bool { + out = append(out, b) + return len(out) < maxBundlesPerResponse + }) + if err != nil { + return nil, err + } + return out, nil +} diff --git a/internal/fastsync/service_test.go b/internal/fastsync/service_test.go new file mode 100644 index 0000000..f450139 --- /dev/null +++ b/internal/fastsync/service_test.go @@ -0,0 +1,89 @@ +package fastsync + +import ( + "path/filepath" + "testing" + + "github.com/btcsuite/btcd/chaincfg/chainhash" +) + +func putBundle(t *testing.T, s *Store, h int32, ti int) { + t.Helper() + txnum := uint64(uint32(h))<<32 | uint64(uint32(ti)) + b := Bundle{Height: h, TxIndex: ti, RawTx: []byte{byte(h), byte(ti)}, Branch: []chainhash.Hash{{byte(h)}}} + if err := s.Put(txnum, b); err != nil { + t.Fatalf("Put(%d,%d): %v", h, ti, err) + } +} + +func TestServiceAnchorBundles(t *testing.T) { + s, err := Open(filepath.Join(t.TempDir(), "b.db")) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = s.Close() }) + putBundle(t, s, 100, 0) + putBundle(t, s, 100, 3) + putBundle(t, s, 101, 0) + putBundle(t, s, 200, 0) + + svc := NewService(s) + + // [100,101] → the three bundles at 100/100/101, in txnum order; 200 excluded. + got, err := svc.AnchorBundles(100, 101) + if err != nil { + t.Fatalf("AnchorBundles: %v", err) + } + if len(got) != 3 { + t.Fatalf("AnchorBundles(100,101) = %d bundles, want 3", len(got)) + } + if got[0].Height != 100 || got[0].TxIndex != 0 || got[1].TxIndex != 3 || got[2].Height != 101 { + t.Errorf("bundles out of order: %+v", got) + } + + // Single height. + if got, _ := svc.AnchorBundles(200, 200); len(got) != 1 || got[0].Height != 200 { + t.Errorf("AnchorBundles(200,200) = %v, want one bundle at 200", got) + } + // Empty range below any data. + if got, _ := svc.AnchorBundles(50, 60); len(got) != 0 { + t.Errorf("AnchorBundles(50,60) = %d, want 0", len(got)) + } + // Bad ranges error. + if _, err := svc.AnchorBundles(-1, 10); err == nil { + t.Error("negative from should error") + } + if _, err := svc.AnchorBundles(100, 50); err == nil { + t.Error("to < from should error") + } +} + +// TestStoreConcurrentReadWrite proves the reason for the SQLite/WAL store: two +// independent handles to the same file (modeling the `start` writer process and the +// `serve` reader process) can operate concurrently — a bbolt store would have failed +// the second Open on its exclusive lock. +func TestStoreConcurrentReadWrite(t *testing.T) { + path := filepath.Join(t.TempDir(), "bundles.db") + writer, err := Open(path) + if err != nil { + t.Fatalf("open writer: %v", err) + } + t.Cleanup(func() { _ = writer.Close() }) + reader, err := Open(path) // a second process would do exactly this + if err != nil { + t.Fatalf("open reader concurrently: %v", err) + } + t.Cleanup(func() { _ = reader.Close() }) + + txnum := uint64(100) << 32 + if err := writer.Put(txnum, Bundle{Height: 100, TxIndex: 0, RawTx: []byte{0xab}, Branch: nil}); err != nil { + t.Fatalf("writer Put: %v", err) + } + b, ok, err := reader.Get(txnum) + if err != nil || !ok { + t.Fatalf("reader.Get after concurrent writer Put: ok=%v err=%v", ok, err) + } + if b.Height != 100 { + t.Errorf("reader saw height %d, want 100", b.Height) + } +} diff --git a/internal/fastsync/store.go b/internal/fastsync/store.go index 15521c3..8ad0fe8 100644 --- a/internal/fastsync/store.go +++ b/internal/fastsync/store.go @@ -1,92 +1,110 @@ package fastsync import ( - "encoding/binary" + "database/sql" "fmt" - bolt "go.etcd.io/bbolt" + _ "modernc.org/sqlite" // pure-Go SQLite driver, registered as "sqlite" ) -var bucketBundles = []byte("bundles") - // Store is a persistent map from an anchor's transaction number (height<<32|txIndex) // to its verifiable Bundle. A node that opts into serving fast-sync writes bundles // here at scan time (when it still has the full block to build the merkle branch), // so it can later serve PoW-verifiable proofs without retaining or re-fetching full // blocks. It is auxiliary serving data, kept in its own file (like the txid index), // so a node that does not serve fast-sync incurs no cost. +// +// It uses SQLite in WAL mode so the writing process (the indexer, `start`) and a +// reading process (the resolver/gateway, `serve`) can open it concurrently — the +// same property anchors.db relies on. Like the txid index it is fail-closed and +// needs no reorg rollback: a stale bundle left by a reorg fails the client's merkle +// check against its own header chain (the orphaned block's root no longer matches), +// and re-indexing the new branch overwrites the txnum. type Store struct { - db *bolt.DB + db *sql.DB } // Open opens (creating if needed) the bundle store at path. func Open(path string) (*Store, error) { - db, err := bolt.Open(path, 0o600, nil) + db, err := sql.Open("sqlite", path) if err != nil { return nil, fmt.Errorf("fastsync: open %s: %w", path, err) } - if err := db.Update(func(tx *bolt.Tx) error { - _, e := tx.CreateBucketIfNotExists(bucketBundles) - return e - }); err != nil { + db.SetMaxOpenConns(1) // serialize within a process; WAL handles cross-process + for _, pragma := range []string{ + "PRAGMA journal_mode=WAL", + "PRAGMA synchronous=NORMAL", + "PRAGMA busy_timeout=5000", + } { + if _, err := db.Exec(pragma); err != nil { + _ = db.Close() + return nil, fmt.Errorf("fastsync: %s: %w", pragma, err) + } + } + if _, err := db.Exec(`CREATE TABLE IF NOT EXISTS bundles ( + txnum INTEGER PRIMARY KEY, + data BLOB NOT NULL + )`); err != nil { _ = db.Close() - return nil, fmt.Errorf("fastsync: init bucket: %w", err) + return nil, fmt.Errorf("fastsync: create table: %w", err) } return &Store{db: db}, nil } -func txnumKey(txnum uint64) []byte { - k := make([]byte, 8) - binary.BigEndian.PutUint64(k, txnum) - return k -} - // Put stores b under txnum (idempotent — re-indexing a height overwrites, matching // the txid index's self-healing-after-reorg behavior). func (s *Store) Put(txnum uint64, b Bundle) error { - return s.db.Update(func(tx *bolt.Tx) error { - return tx.Bucket(bucketBundles).Put(txnumKey(txnum), b.encode()) - }) + _, err := s.db.Exec(`INSERT INTO bundles(txnum, data) VALUES(?, ?) + ON CONFLICT(txnum) DO UPDATE SET data=excluded.data`, int64(txnum), b.encode()) + if err != nil { + return fmt.Errorf("fastsync: put bundle %d: %w", txnum, err) + } + return nil } // Get returns the bundle for txnum, ok=false if absent. func (s *Store) Get(txnum uint64) (Bundle, bool, error) { - var ( - b Bundle - found bool - ) - err := s.db.View(func(tx *bolt.Tx) error { - v := tx.Bucket(bucketBundles).Get(txnumKey(txnum)) - if v == nil { - return nil - } - decoded, derr := decodeBundle(v) - if derr != nil { - return derr - } - b, found = decoded, true - return nil - }) - return b, found, err + var data []byte + err := s.db.QueryRow(`SELECT data FROM bundles WHERE txnum=?`, int64(txnum)).Scan(&data) + if err == sql.ErrNoRows { + return Bundle{}, false, nil + } + if err != nil { + return Bundle{}, false, fmt.Errorf("fastsync: get bundle %d: %w", txnum, err) + } + b, derr := decodeBundle(data) + if derr != nil { + return Bundle{}, false, derr + } + return b, true, nil } // Range invokes fn for every bundle with txnum in [fromTxnum, toTxnum], in ascending // txnum order, stopping early if fn returns false. Used to serve a height range. func (s *Store) Range(fromTxnum, toTxnum uint64, fn func(txnum uint64, b Bundle) bool) error { - return s.db.View(func(tx *bolt.Tx) error { - c := tx.Bucket(bucketBundles).Cursor() - lo, hi := txnumKey(fromTxnum), txnumKey(toTxnum) - for k, v := c.Seek(lo); k != nil && string(k) <= string(hi); k, v = c.Next() { - b, derr := decodeBundle(v) - if derr != nil { - return derr - } - if !fn(binary.BigEndian.Uint64(k), b) { - return nil - } + rows, err := s.db.Query(`SELECT txnum, data FROM bundles WHERE txnum >= ? AND txnum <= ? ORDER BY txnum`, + int64(fromTxnum), int64(toTxnum)) + if err != nil { + return fmt.Errorf("fastsync: range bundles: %w", err) + } + defer rows.Close() + for rows.Next() { + var ( + txnum int64 + data []byte + ) + if err := rows.Scan(&txnum, &data); err != nil { + return err } - return nil - }) + b, derr := decodeBundle(data) + if derr != nil { + return derr + } + if !fn(uint64(txnum), b) { + return nil + } + } + return rows.Err() } // Close releases the underlying database.