Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cmd/etracker/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,10 @@ func startService(cmd *cobra.Command, args []string) error {
interval := time.Duration(cfg.ConsolidationInterval) * time.Second
batchSize := cfg.ConsolidationBatchSize

cons := consolidator.New(id, egressTable, consolidatedTable, interval, batchSize)
cons, err := consolidator.New(id, egressTable, consolidatedTable, interval, batchSize)
if err != nil {
return fmt.Errorf("creating consolidator: %w", err)
}

// Start consolidator in a goroutine
go cons.Start(ctx)
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ require (
github.com/ipld/go-ipld-prime v0.21.1-0.20240917223228-6148356a4c2e
github.com/spf13/cobra v1.2.1
github.com/spf13/viper v1.8.1
github.com/storacha/go-libstoracha v0.2.7
github.com/storacha/go-ucanto v0.6.4
github.com/storacha/go-libstoracha v0.2.9
github.com/storacha/go-ucanto v0.6.5
)

require (
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -578,10 +578,10 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.8.1 h1:Kq1fyeebqsBfbjZj4EL7gj2IO0mMaiyjYUWcUsl2O44=
github.com/spf13/viper v1.8.1/go.mod h1:o0Pch8wJ9BVSWGQMbra6iw0oQ5oktSIBaujf1rJH9Ns=
github.com/storacha/go-libstoracha v0.2.7 h1:IlsffQq3mr8hDMqXIFyRLKVTHhq7qelyq8if/eivNC0=
github.com/storacha/go-libstoracha v0.2.7/go.mod h1:zzeqIZhBBuWR2dkGygYqv4Bhg3JsvHuuvDCcdXCwGhg=
github.com/storacha/go-ucanto v0.6.4 h1:2a0IKtdIVqq1Y36LYFKtUVQXo7t5fxxvZ17B3AhICWM=
github.com/storacha/go-ucanto v0.6.4/go.mod h1:O35Ze4x18EWtz3ftRXXd/mTZ+b8OQVjYYrnadJ/xNjg=
github.com/storacha/go-libstoracha v0.2.9 h1:1EMhYNpT72dsNBDrbgnXUN/A2wpyEcUWL63wCiSOZwA=
github.com/storacha/go-libstoracha v0.2.9/go.mod h1:nkVcVfEVeeGH1dA7SYpvYC6ip1hmoL1k6z/x+QTynXQ=
github.com/storacha/go-ucanto v0.6.5 h1:mxy1UkJDqszAGe6SkoT0N2SG9YJ62YX7fzU1Pg9lxnA=
github.com/storacha/go-ucanto v0.6.5/go.mod h1:O35Ze4x18EWtz3ftRXXd/mTZ+b8OQVjYYrnadJ/xNjg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
Expand Down
254 changes: 199 additions & 55 deletions internal/consolidator/consolidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package consolidator
import (
"context"
"fmt"
"iter"
"net/http"
"net/url"
"strings"
Expand All @@ -11,13 +12,19 @@ import (
logging "github.com/ipfs/go-log/v2"
"github.com/storacha/go-libstoracha/capabilities/space/content"
capegress "github.com/storacha/go-libstoracha/capabilities/space/egress"
"github.com/storacha/go-ucanto/client"
"github.com/storacha/go-ucanto/core/car"
"github.com/storacha/go-ucanto/core/dag/blockstore"
"github.com/storacha/go-ucanto/core/delegation"
"github.com/storacha/go-ucanto/core/invocation"
"github.com/storacha/go-ucanto/core/receipt"
"github.com/storacha/go-ucanto/core/receipt/fx"
"github.com/storacha/go-ucanto/core/receipt/ran"
"github.com/storacha/go-ucanto/core/result"
fdm "github.com/storacha/go-ucanto/core/result/failure/datamodel"
"github.com/storacha/go-ucanto/principal"
ucanto "github.com/storacha/go-ucanto/server"
"github.com/storacha/go-ucanto/ucan"

"github.com/storacha/etracker/internal/db/consolidated"
"github.com/storacha/etracker/internal/db/egress"
Expand All @@ -29,14 +36,15 @@ type Consolidator struct {
id principal.Signer
egressTable egress.EgressTable
consolidatedTable consolidated.ConsolidatedTable
ucantoSrv ucanto.ServerView[ucanto.Service]
httpClient *http.Client
interval time.Duration
batchSize int
stopCh chan struct{}
}

func New(id principal.Signer, egressTable egress.EgressTable, consolidatedTable consolidated.ConsolidatedTable, interval time.Duration, batchSize int) *Consolidator {
return &Consolidator{
func New(id principal.Signer, egressTable egress.EgressTable, consolidatedTable consolidated.ConsolidatedTable, interval time.Duration, batchSize int) (*Consolidator, error) {
c := &Consolidator{
id: id,
egressTable: egressTable,
consolidatedTable: consolidatedTable,
Expand All @@ -45,6 +53,18 @@ func New(id principal.Signer, egressTable egress.EgressTable, consolidatedTable
batchSize: batchSize,
stopCh: make(chan struct{}),
}

ucantoSrv, err := ucanto.NewServer(
id,
ucanto.WithServiceMethod(capegress.ConsolidateAbility, ucanto.Provide(capegress.Consolidate, c.ucanConsolidateHandler)),
)
if err != nil {
return nil, err
}

c.ucantoSrv = ucantoSrv

return c, nil
}

func (c *Consolidator) Start(ctx context.Context) {
Expand Down Expand Up @@ -82,76 +102,76 @@ func (c *Consolidator) Consolidate(ctx context.Context) error {
}

if len(records) == 0 {
log.Debug("No unprocessed records found")
log.Info("No unprocessed records found")
return nil
}

log.Infof("Processing %d unprocessed records", len(records))

// Process each record (each record represents a batch of receipts for a single node)
for _, record := range records {
var rcpt capegress.ConsolidateReceipt
totalBytes := uint64(0)

bLog := log.With("nodeID", record.NodeID, "batchCID", record.Receipts.String())

// According to the spec, consolidation happens as a result of a `space/egress/consolidate` invocation.
// Since the service is invoking it on itself, we will generate it here.
// Is this acceptable or should we create and register a handler and follow the full go-ucanto flow instead?
inv, err := capegress.Consolidate.Invoke(
// We use the consolidator's own ucanto server to invoke the consolidate capability on itself.
consolidateInv, err := capegress.Consolidate.Invoke(
c.id,
c.id,
c.id.DID().String(),
capegress.ConsolidateCaveats{
Cause: record.Cause,
Cause: record.Cause.Link(),
},
delegation.WithNoExpiration(),
)
if err != nil {
log.Errorf("generating consolidation invocation: %w", err)
continue
}

// Fetch receipts from the endpoint
receipts, err := c.fetchReceipts(ctx, record)
if err != nil {
log.Errorf("Failed to fetch receipts for record (nodeID=%s): %v", record.NodeID, err)
bLog.Errorf("generating consolidation invocation: %v", err)
continue
}

// Process each receipt in the batch
totalBytes := uint64(0)
for _, rcpt := range receipts {
retrievalRcpt, err := receipt.Rebind[content.RetrieveOk, fdm.FailureModel](rcpt, content.RetrieveOkType(), fdm.FailureType())
var attachErr error
for blk, err := range record.Cause.Blocks() {
if err != nil {
log.Warnf("receipt doesn't seem a retrieval receipt: %w", err)
continue
attachErr = err
break
}

if err := c.validateReceipt(retrievalRcpt); err != nil {
log.Warnf("Invalid receipt: %v", err)
continue
if err := consolidateInv.Attach(blk); err != nil {
attachErr = err
break
}
}
if attachErr != nil {
bLog.Errorf("attaching blocks to consolidation invocation: %v", attachErr)
continue
}

size, err := c.extractSize(retrievalRcpt)
rcpt, err = c.execConsolidateInvocation(ctx, consolidateInv)
if err != nil {
rcpt, err = c.issueErrorReceipt(consolidateInv, capegress.NewConsolidateError(err.Error()))
if err != nil {
log.Warnf("Failed to extract size from receipt: %v", err)
bLog.Errorf("issuing error receipt: %v", err)
continue
}
}

totalBytes += size
o, x := result.Unwrap(rcpt.Out())
var emptyErr capegress.ConsolidateError
if x != emptyErr {
bLog.Errorf("invocation failed: %s", x.Message)
} else {
totalBytes = o.TotalEgress
}

// Store consolidated record (one per batch)
if err := c.consolidatedTable.Add(ctx, record.NodeID, record.Receipts, totalBytes); err != nil {
log.Errorf("Failed to add consolidated record for node %s, batch %s: %v", record.NodeID, record.Receipts, err)
bLog.Errorf("Failed to add consolidated record: %v", err)
continue
}

// Issue the receipt for the consolidation operation
// TODO: store in the DB
_, err = receipt.Issue(c.id, result.Ok[capegress.ConsolidateOk, capegress.ConsolidateError](capegress.ConsolidateOk{}), ran.FromInvocation(inv))
if err != nil {
log.Errorf("Failed to issue consolidation receipt: %v", err)
continue
}

log.Infof("Consolidated %d bytes for node %s (batch %s)", totalBytes, record.NodeID, record.Receipts)
bLog.Infof("Consolidated %d bytes for node %s (batch %s)", totalBytes, record.NodeID, record.Receipts)
}

// Mark records as processed
Expand All @@ -164,14 +184,130 @@ func (c *Consolidator) Consolidate(ctx context.Context) error {
return nil
}

func (c *Consolidator) fetchReceipts(ctx context.Context, record egress.EgressRecord) ([]receipt.AnyReceipt, error) {
func (c *Consolidator) execConsolidateInvocation(ctx context.Context, inv invocation.Invocation) (capegress.ConsolidateReceipt, error) {
conn, err := client.NewConnection(c.id, c.ucantoSrv)
if err != nil {
return nil, fmt.Errorf("creating connection: %w", err)
}

resp, err := client.Execute(ctx, []invocation.Invocation{inv}, conn)
if err != nil {
return nil, fmt.Errorf("executing invocation: %w", err)
}

rcptLnk, ok := resp.Get(inv.Link())
if !ok {
return nil, fmt.Errorf("missing receipt for invocation: %s", inv.Link().String())
}

blocks, err := blockstore.NewBlockReader(blockstore.WithBlocksIterator(resp.Blocks()))
if err != nil {
return nil, fmt.Errorf("importing response blocks into blockstore: %w", err)
}

rcptReader, err := capegress.NewConsolidateReceiptReader()
if err != nil {
return nil, fmt.Errorf("constructing receipt reader: %w", err)
}

rcpt, err := rcptReader.Read(rcptLnk, blocks.Iterator())
if err != nil {
return nil, fmt.Errorf("reading receipt: %w", err)
}

return rcpt, nil
}

func (c *Consolidator) issueErrorReceipt(ranInv invocation.Invocation, failure capegress.ConsolidateError) (capegress.ConsolidateReceipt, error) {
anyRcpt, err := receipt.Issue(
c.id,
result.Error[capegress.ConsolidateOk, capegress.ConsolidateError](failure),
ran.FromInvocation(ranInv),
)
if err != nil {
return nil, err
}

reader, err := capegress.NewConsolidateReceiptReader()
if err != nil {
return nil, err
}

return reader.Read(anyRcpt.Root().Link(), anyRcpt.Blocks())
}

func (c *Consolidator) ucanConsolidateHandler(
ctx context.Context,
cap ucan.Capability[capegress.ConsolidateCaveats],
inv invocation.Invocation,
ictx ucanto.InvocationContext,
) (result.Result[capegress.ConsolidateOk, capegress.ConsolidateError], fx.Effects, error) {
// Fetch the original egress/track invocation from the egress/consolidate invocation
trackInvLink := cap.Nb().Cause
blocks, err := blockstore.NewBlockReader(blockstore.WithBlocksIterator(inv.Blocks()))
if err != nil {
return nil, nil, fmt.Errorf("importing invocation blocks: %w", err)
}

trackInv, err := invocation.NewInvocationView(trackInvLink, blocks)
if err != nil {
return nil, nil, fmt.Errorf("fetching attached track invocation: %w", err)
}

trackCaveats, err := capegress.TrackCaveatsReader.Read(trackInv.Capabilities()[0].Nb())
if err != nil {
return nil, nil, fmt.Errorf("reading track caveats: %w", err)
}

// Fetch receipts from the endpoint
receipts, err := c.fetchReceipts(ctx, trackCaveats.Endpoint, trackCaveats.Receipts)
if err != nil {
return nil, nil, fmt.Errorf("fetching receipts: %w", err)
}

// Process each receipt in the batch
totalBytes := uint64(0)
for rcpt, err := range receipts {
if err != nil {
log.Errorf("Failed to fetch receipt from batch: %v", err)
continue
}

retrievalRcpt, err := receipt.Rebind[content.RetrieveOk, fdm.FailureModel](rcpt, content.RetrieveOkType(), fdm.FailureType())
if err != nil {
log.Warnf("Receipt doesn't seem to be a retrieval receipt: %v", err)
continue
}

if err := c.validateReceipt(retrievalRcpt); err != nil {
log.Warnf("Invalid receipt: %v", err)
continue
}

size, err := c.extractSize(retrievalRcpt)
if err != nil {
log.Warnf("Failed to extract size from receipt: %v", err)
continue
}

totalBytes += size
}

return result.Ok[capegress.ConsolidateOk, capegress.ConsolidateError](capegress.ConsolidateOk{TotalEgress: totalBytes}), nil, nil
}

func (c *Consolidator) fetchReceipts(ctx context.Context, endpoint *url.URL, batchCID ucan.Link) (iter.Seq2[receipt.AnyReceipt, error], error) {
// Substitute {cid} in the endpoint URL with the receipts CID
batchURLStr := record.Endpoint
batchCID := record.Receipts.String()
batchURLStr, err := url.PathUnescape(endpoint.String())
if err != nil {
return nil, fmt.Errorf("unescaping endpoint URL: %w", err)
}

batchCIDStr := batchCID.String()

// Handle both {cid} and :cid patterns
batchURLStr = strings.ReplaceAll(batchURLStr, "{cid}", batchCID)
batchURLStr = strings.ReplaceAll(batchURLStr, ":cid", batchCID)
batchURLStr = strings.ReplaceAll(batchURLStr, "{cid}", batchCIDStr)
batchURLStr = strings.ReplaceAll(batchURLStr, ":cid", batchCIDStr)

batchURL, err := url.Parse(batchURLStr)
if err != nil {
Expand Down Expand Up @@ -200,23 +336,31 @@ func (c *Consolidator) fetchReceipts(ctx context.Context, record egress.EgressRe
if err != nil {
return nil, fmt.Errorf("decoding receipt batch: %w", err)
}
defer resp.Body.Close()

var rcpts []receipt.AnyReceipt
for blk, err := range blks {
if err != nil {
return nil, fmt.Errorf("iterating over receipt blocks: %w", err)
}
return func(yield func(receipt.AnyReceipt, error) bool) {
for blk, err := range blks {
if err != nil {
if !yield(nil, fmt.Errorf("iterating over batch blocks: %w", err)) {
return
}

rcpt, err := receipt.Extract(blk.Bytes())
if err != nil {
return nil, fmt.Errorf("extracting receipt: %w", err)
}
continue
}

rcpts = append(rcpts, rcpt)
}
rcpt, err := receipt.Extract(blk.Bytes())
if err != nil {
if !yield(nil, fmt.Errorf("extracting receipt: %w", err)) {
return
}

continue
}

return rcpts, nil
if !yield(rcpt, nil) {
return
}
}
}, nil
}

func (c *Consolidator) validateReceipt(retrievalRcpt receipt.Receipt[content.RetrieveOk, fdm.FailureModel]) error {
Expand Down
Loading
Loading