Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 140 additions & 0 deletions internal/fastsync/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package fastsync

import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"

"github.com/btcsuite/btcd/chaincfg/chainhash"
)

// maxAnchorsResponseBytes bounds the /fastsync/anchors response a client will read,
// so an untrusted peer can't OOM the client with an endless body.
const maxAnchorsResponseBytes = 64 << 20 // 64 MiB

// RootSource yields the active block's merkle root at a height, from the CALLER'S OWN
// PoW-verified header chain. ok=false when the caller's chain does not reach that
// height yet — the bundle then can't be verified and is skipped. This is the trust
// anchor: a fast-sync peer is untrusted, so every bundle is checked against roots the
// caller derived itself.
type RootSource func(height int32) (chainhash.Hash, bool)

// Client pulls verifiable anchor bundles (and content blobs) from an ion-node's HTTP
// fast-sync gateway (#115). It is the consumer half of the trust model: it verifies
// every bundle against the caller's own headers (RootSource), so the peer can only
// omit, never forge.
type Client struct {
http *http.Client
base string // gateway base URL, e.g. http://peer:8081
}

// NewClient builds a fast-sync client against baseURL. A nil httpClient uses a
// default with no timeout (callers should pass one with a deadline / use ctx).
func NewClient(httpClient *http.Client, baseURL string) *Client {
if httpClient == nil {
httpClient = http.DefaultClient
}
return &Client{http: httpClient, base: strings.TrimRight(baseURL, "/")}
}

type anchorJSON struct {
Height int32 `json:"height"`
TxIndex int `json:"txIndex"`
RawTx string `json:"rawTx"`
Branch []string `json:"branch"`
}

// FetchAnchors GETs /fastsync/anchors for [from,to], verifies each returned bundle's
// merkle proof against src, and returns the bundles that VERIFIED. Bundles that fail
// the proof — or whose height the caller cannot yet verify — are dropped and counted
// in rejected; the caller should treat a high rejected count as a misbehaving peer.
func (c *Client) FetchAnchors(ctx context.Context, from, to int32, src RootSource) (verified []Bundle, rejected int, err error) {
u := fmt.Sprintf("%s/fastsync/anchors?from=%d&to=%d", c.base, from, to)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
if err != nil {
return nil, 0, err
}
resp, err := c.http.Do(req)
if err != nil {
return nil, 0, fmt.Errorf("fastsync: get anchors: %w", err)
}
defer drainClose(resp.Body)
if resp.StatusCode != http.StatusOK {
return nil, 0, fmt.Errorf("fastsync: get anchors: status %d", resp.StatusCode)
}
var body struct {
Anchors []anchorJSON `json:"anchors"`
}
if err := json.NewDecoder(io.LimitReader(resp.Body, maxAnchorsResponseBytes)).Decode(&body); err != nil {
return nil, 0, fmt.Errorf("fastsync: decode anchors: %w", err)
}

for _, a := range body.Anchors {
b, ok := decodeAnchorJSON(a)
if !ok {
rejected++
continue
}
root, ok := src(b.Height)
if !ok { // our header chain doesn't reach this height yet — can't verify
rejected++
continue
}
if _, ok := b.Verify(root); !ok { // bad proof: the peer cannot forge this
rejected++
continue
}
verified = append(verified, b)
}
return verified, rejected, nil
}

// FetchCAS GETs /fastsync/cas/<cid>, returning the raw content blob. The caller MUST
// verify multihash(blob) == cid (the gateway is untrusted; content is self-verifying).
func (c *Client) FetchCAS(ctx context.Context, cid string) ([]byte, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.base+"/fastsync/cas/"+url.PathEscape(cid), nil)
if err != nil {
return nil, err
}
resp, err := c.http.Do(req)
if err != nil {
return nil, fmt.Errorf("fastsync: get cas %s: %w", cid, err)
}
defer drainClose(resp.Body)
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("fastsync: get cas %s: status %d", cid, resp.StatusCode)
}
return io.ReadAll(io.LimitReader(resp.Body, maxGatewayBlobBytes))
}

func decodeAnchorJSON(a anchorJSON) (Bundle, bool) {
if a.Height < 0 || a.TxIndex < 0 {
return Bundle{}, false
}
raw, err := hex.DecodeString(a.RawTx)
if err != nil {
return Bundle{}, false
}
branch := make([]chainhash.Hash, len(a.Branch))
for i, hx := range a.Branch {
hb, err := hex.DecodeString(hx)
if err != nil || len(hb) != chainhash.HashSize {
return Bundle{}, false
}
copy(branch[i][:], hb)
}
return Bundle{Height: a.Height, TxIndex: a.TxIndex, RawTx: raw, Branch: branch}, true
}

// maxGatewayBlobBytes mirrors the server-side content cap (api).
const maxGatewayBlobBytes = 1 << 22 // 4 MiB

func drainClose(rc io.ReadCloser) {
_, _ = io.Copy(io.Discard, io.LimitReader(rc, 4<<10))
_ = rc.Close()
}
113 changes: 113 additions & 0 deletions internal/fastsync/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package fastsync

import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"

"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
)

// oneTxBundle builds a single-tx "block" bundle: an empty branch whose merkle root is
// the txid itself, so verification is just txid == root.
func oneTxBundle(t *testing.T, height int32) (Bundle, chainhash.Hash) {
t.Helper()
mt := wire.NewMsgTx(1)
mt.AddTxIn(&wire.TxIn{PreviousOutPoint: wire.OutPoint{Index: uint32(height)}, SignatureScript: []byte{0x51}})
mt.AddTxOut(wire.NewTxOut(int64(height), []byte{txscript.OP_TRUE}))
var raw bytes.Buffer
if err := mt.SerializeNoWitness(&raw); err != nil {
t.Fatal(err)
}
return Bundle{Height: height, TxIndex: 0, RawTx: raw.Bytes(), Branch: nil}, mt.TxHash()
}

// anchorsServer serves a canned /fastsync/anchors response (and a CAS blob).
func anchorsServer(t *testing.T, bundles []Bundle, blobs map[string][]byte) *httptest.Server {
t.Helper()
mux := http.NewServeMux()
mux.HandleFunc("/fastsync/anchors", func(w http.ResponseWriter, r *http.Request) {
out := make([]map[string]any, 0, len(bundles))
for _, b := range bundles {
branch := make([]string, len(b.Branch))
for i, h := range b.Branch {
branch[i] = hex.EncodeToString(h[:])
}
out = append(out, map[string]any{
"height": b.Height, "txIndex": b.TxIndex,
"rawTx": hex.EncodeToString(b.RawTx), "branch": branch,
})
}
_ = json.NewEncoder(w).Encode(map[string]any{"anchors": out})
})
mux.HandleFunc("/fastsync/cas/", func(w http.ResponseWriter, r *http.Request) {
cid := r.URL.Path[len("/fastsync/cas/"):]
if b, ok := blobs[cid]; ok {
_, _ = w.Write(b)
return
}
http.NotFound(w, r)
})
srv := httptest.NewServer(mux)
t.Cleanup(srv.Close)
return srv
}

func TestClientFetchAnchorsVerifies(t *testing.T) {
b1, root1 := oneTxBundle(t, 700001)
b2, root2 := oneTxBundle(t, 700002)
srv := anchorsServer(t, []Bundle{b1, b2}, nil)
c := NewClient(srv.Client(), srv.URL)

roots := map[int32]chainhash.Hash{700001: root1, 700002: root2}
src := func(h int32) (chainhash.Hash, bool) { r, ok := roots[h]; return r, ok }

verified, rejected, err := c.FetchAnchors(context.Background(), 700001, 700002, src)
if err != nil {
t.Fatalf("FetchAnchors: %v", err)
}
if len(verified) != 2 || rejected != 0 {
t.Fatalf("verified=%d rejected=%d, want 2/0", len(verified), rejected)
}

// A wrong local root → the proof fails → the bundle is rejected (the peer can't
// forge against our own headers).
bad := func(int32) (chainhash.Hash, bool) {
var wrong chainhash.Hash
wrong[0] = 0xff
return wrong, true
}
v2, rej2, err := c.FetchAnchors(context.Background(), 700001, 700002, bad)
if err != nil {
t.Fatal(err)
}
if len(v2) != 0 || rej2 != 2 {
t.Errorf("with wrong roots: verified=%d rejected=%d, want 0/2", len(v2), rej2)
}

// A header chain that doesn't reach those heights → can't verify → rejected.
none := func(int32) (chainhash.Hash, bool) { return chainhash.Hash{}, false }
v3, rej3, _ := c.FetchAnchors(context.Background(), 700001, 700002, none)
if len(v3) != 0 || rej3 != 2 {
t.Errorf("with no local headers: verified=%d rejected=%d, want 0/2", len(v3), rej3)
}
}

func TestClientFetchCAS(t *testing.T) {
srv := anchorsServer(t, nil, map[string][]byte{"QmGood": []byte("blob-bytes")})
c := NewClient(srv.Client(), srv.URL)

got, err := c.FetchCAS(context.Background(), "QmGood")
if err != nil || string(got) != "blob-bytes" {
t.Fatalf("FetchCAS = %q, %v; want blob-bytes", got, err)
}
if _, err := c.FetchCAS(context.Background(), "QmMissing"); err == nil {
t.Error("FetchCAS of a missing cid should error")
}
}
Loading