diff --git a/cmd/etracker/start.go b/cmd/etracker/start.go index 25a7438..1a861eb 100644 --- a/cmd/etracker/start.go +++ b/cmd/etracker/start.go @@ -132,7 +132,10 @@ func startService(cmd *cobra.Command, args []string) error { interval := time.Duration(cfg.ConsolidationInterval) * time.Second batchSize := cfg.ConsolidationBatchSize - cons := consolidator.New(id, egressTable, consolidatedTable, interval, batchSize) + cons, err := consolidator.New(id, egressTable, consolidatedTable, interval, batchSize) + if err != nil { + return fmt.Errorf("creating consolidator: %w", err) + } // Start consolidator in a goroutine go cons.Start(ctx) diff --git a/go.mod b/go.mod index 0c23e13..ddc5f35 100644 --- a/go.mod +++ b/go.mod @@ -14,8 +14,8 @@ require ( github.com/ipld/go-ipld-prime v0.21.1-0.20240917223228-6148356a4c2e github.com/spf13/cobra v1.2.1 github.com/spf13/viper v1.8.1 - github.com/storacha/go-libstoracha v0.2.7 - github.com/storacha/go-ucanto v0.6.4 + github.com/storacha/go-libstoracha v0.2.9 + github.com/storacha/go-ucanto v0.6.5 ) require ( diff --git a/go.sum b/go.sum index 98de1d8..bb119a8 100644 --- a/go.sum +++ b/go.sum @@ -578,10 +578,10 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.8.1 h1:Kq1fyeebqsBfbjZj4EL7gj2IO0mMaiyjYUWcUsl2O44= github.com/spf13/viper v1.8.1/go.mod h1:o0Pch8wJ9BVSWGQMbra6iw0oQ5oktSIBaujf1rJH9Ns= -github.com/storacha/go-libstoracha v0.2.7 h1:IlsffQq3mr8hDMqXIFyRLKVTHhq7qelyq8if/eivNC0= -github.com/storacha/go-libstoracha v0.2.7/go.mod h1:zzeqIZhBBuWR2dkGygYqv4Bhg3JsvHuuvDCcdXCwGhg= -github.com/storacha/go-ucanto v0.6.4 h1:2a0IKtdIVqq1Y36LYFKtUVQXo7t5fxxvZ17B3AhICWM= -github.com/storacha/go-ucanto v0.6.4/go.mod h1:O35Ze4x18EWtz3ftRXXd/mTZ+b8OQVjYYrnadJ/xNjg= +github.com/storacha/go-libstoracha v0.2.9 h1:1EMhYNpT72dsNBDrbgnXUN/A2wpyEcUWL63wCiSOZwA= +github.com/storacha/go-libstoracha v0.2.9/go.mod h1:nkVcVfEVeeGH1dA7SYpvYC6ip1hmoL1k6z/x+QTynXQ= +github.com/storacha/go-ucanto v0.6.5 h1:mxy1UkJDqszAGe6SkoT0N2SG9YJ62YX7fzU1Pg9lxnA= +github.com/storacha/go-ucanto v0.6.5/go.mod h1:O35Ze4x18EWtz3ftRXXd/mTZ+b8OQVjYYrnadJ/xNjg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= diff --git a/internal/consolidator/consolidator.go b/internal/consolidator/consolidator.go index e50097f..e693f91 100644 --- a/internal/consolidator/consolidator.go +++ b/internal/consolidator/consolidator.go @@ -3,6 +3,7 @@ package consolidator import ( "context" "fmt" + "iter" "net/http" "net/url" "strings" @@ -11,13 +12,19 @@ import ( logging "github.com/ipfs/go-log/v2" "github.com/storacha/go-libstoracha/capabilities/space/content" capegress "github.com/storacha/go-libstoracha/capabilities/space/egress" + "github.com/storacha/go-ucanto/client" "github.com/storacha/go-ucanto/core/car" + "github.com/storacha/go-ucanto/core/dag/blockstore" "github.com/storacha/go-ucanto/core/delegation" + "github.com/storacha/go-ucanto/core/invocation" "github.com/storacha/go-ucanto/core/receipt" + "github.com/storacha/go-ucanto/core/receipt/fx" "github.com/storacha/go-ucanto/core/receipt/ran" "github.com/storacha/go-ucanto/core/result" fdm "github.com/storacha/go-ucanto/core/result/failure/datamodel" "github.com/storacha/go-ucanto/principal" + ucanto "github.com/storacha/go-ucanto/server" + "github.com/storacha/go-ucanto/ucan" "github.com/storacha/etracker/internal/db/consolidated" "github.com/storacha/etracker/internal/db/egress" @@ -29,14 +36,15 @@ type Consolidator struct { id principal.Signer egressTable egress.EgressTable consolidatedTable consolidated.ConsolidatedTable + ucantoSrv ucanto.ServerView[ucanto.Service] httpClient *http.Client interval time.Duration batchSize int stopCh chan struct{} } -func New(id principal.Signer, egressTable egress.EgressTable, consolidatedTable consolidated.ConsolidatedTable, interval time.Duration, batchSize int) *Consolidator { - return &Consolidator{ +func New(id principal.Signer, egressTable egress.EgressTable, consolidatedTable consolidated.ConsolidatedTable, interval time.Duration, batchSize int) (*Consolidator, error) { + c := &Consolidator{ id: id, egressTable: egressTable, consolidatedTable: consolidatedTable, @@ -45,6 +53,18 @@ func New(id principal.Signer, egressTable egress.EgressTable, consolidatedTable batchSize: batchSize, stopCh: make(chan struct{}), } + + ucantoSrv, err := ucanto.NewServer( + id, + ucanto.WithServiceMethod(capegress.ConsolidateAbility, ucanto.Provide(capegress.Consolidate, c.ucanConsolidateHandler)), + ) + if err != nil { + return nil, err + } + + c.ucantoSrv = ucantoSrv + + return c, nil } func (c *Consolidator) Start(ctx context.Context) { @@ -82,7 +102,7 @@ func (c *Consolidator) Consolidate(ctx context.Context) error { } if len(records) == 0 { - log.Debug("No unprocessed records found") + log.Info("No unprocessed records found") return nil } @@ -90,68 +110,68 @@ func (c *Consolidator) Consolidate(ctx context.Context) error { // Process each record (each record represents a batch of receipts for a single node) for _, record := range records { + var rcpt capegress.ConsolidateReceipt + totalBytes := uint64(0) + + bLog := log.With("nodeID", record.NodeID, "batchCID", record.Receipts.String()) + // According to the spec, consolidation happens as a result of a `space/egress/consolidate` invocation. - // Since the service is invoking it on itself, we will generate it here. - // Is this acceptable or should we create and register a handler and follow the full go-ucanto flow instead? - inv, err := capegress.Consolidate.Invoke( + // We use the consolidator's own ucanto server to invoke the consolidate capability on itself. + consolidateInv, err := capegress.Consolidate.Invoke( c.id, c.id, c.id.DID().String(), capegress.ConsolidateCaveats{ - Cause: record.Cause, + Cause: record.Cause.Link(), }, delegation.WithNoExpiration(), ) if err != nil { - log.Errorf("generating consolidation invocation: %w", err) - continue - } - - // Fetch receipts from the endpoint - receipts, err := c.fetchReceipts(ctx, record) - if err != nil { - log.Errorf("Failed to fetch receipts for record (nodeID=%s): %v", record.NodeID, err) + bLog.Errorf("generating consolidation invocation: %v", err) continue } - // Process each receipt in the batch - totalBytes := uint64(0) - for _, rcpt := range receipts { - retrievalRcpt, err := receipt.Rebind[content.RetrieveOk, fdm.FailureModel](rcpt, content.RetrieveOkType(), fdm.FailureType()) + var attachErr error + for blk, err := range record.Cause.Blocks() { if err != nil { - log.Warnf("receipt doesn't seem a retrieval receipt: %w", err) - continue + attachErr = err + break } - if err := c.validateReceipt(retrievalRcpt); err != nil { - log.Warnf("Invalid receipt: %v", err) - continue + if err := consolidateInv.Attach(blk); err != nil { + attachErr = err + break } + } + if attachErr != nil { + bLog.Errorf("attaching blocks to consolidation invocation: %v", attachErr) + continue + } - size, err := c.extractSize(retrievalRcpt) + rcpt, err = c.execConsolidateInvocation(ctx, consolidateInv) + if err != nil { + rcpt, err = c.issueErrorReceipt(consolidateInv, capegress.NewConsolidateError(err.Error())) if err != nil { - log.Warnf("Failed to extract size from receipt: %v", err) + bLog.Errorf("issuing error receipt: %v", err) continue } + } - totalBytes += size + o, x := result.Unwrap(rcpt.Out()) + var emptyErr capegress.ConsolidateError + if x != emptyErr { + bLog.Errorf("invocation failed: %s", x.Message) + } else { + totalBytes = o.TotalEgress } // Store consolidated record (one per batch) if err := c.consolidatedTable.Add(ctx, record.NodeID, record.Receipts, totalBytes); err != nil { - log.Errorf("Failed to add consolidated record for node %s, batch %s: %v", record.NodeID, record.Receipts, err) + bLog.Errorf("Failed to add consolidated record: %v", err) continue } - // Issue the receipt for the consolidation operation - // TODO: store in the DB - _, err = receipt.Issue(c.id, result.Ok[capegress.ConsolidateOk, capegress.ConsolidateError](capegress.ConsolidateOk{}), ran.FromInvocation(inv)) - if err != nil { - log.Errorf("Failed to issue consolidation receipt: %v", err) - continue - } - - log.Infof("Consolidated %d bytes for node %s (batch %s)", totalBytes, record.NodeID, record.Receipts) + bLog.Infof("Consolidated %d bytes for node %s (batch %s)", totalBytes, record.NodeID, record.Receipts) } // Mark records as processed @@ -164,14 +184,130 @@ func (c *Consolidator) Consolidate(ctx context.Context) error { return nil } -func (c *Consolidator) fetchReceipts(ctx context.Context, record egress.EgressRecord) ([]receipt.AnyReceipt, error) { +func (c *Consolidator) execConsolidateInvocation(ctx context.Context, inv invocation.Invocation) (capegress.ConsolidateReceipt, error) { + conn, err := client.NewConnection(c.id, c.ucantoSrv) + if err != nil { + return nil, fmt.Errorf("creating connection: %w", err) + } + + resp, err := client.Execute(ctx, []invocation.Invocation{inv}, conn) + if err != nil { + return nil, fmt.Errorf("executing invocation: %w", err) + } + + rcptLnk, ok := resp.Get(inv.Link()) + if !ok { + return nil, fmt.Errorf("missing receipt for invocation: %s", inv.Link().String()) + } + + blocks, err := blockstore.NewBlockReader(blockstore.WithBlocksIterator(resp.Blocks())) + if err != nil { + return nil, fmt.Errorf("importing response blocks into blockstore: %w", err) + } + + rcptReader, err := capegress.NewConsolidateReceiptReader() + if err != nil { + return nil, fmt.Errorf("constructing receipt reader: %w", err) + } + + rcpt, err := rcptReader.Read(rcptLnk, blocks.Iterator()) + if err != nil { + return nil, fmt.Errorf("reading receipt: %w", err) + } + + return rcpt, nil +} + +func (c *Consolidator) issueErrorReceipt(ranInv invocation.Invocation, failure capegress.ConsolidateError) (capegress.ConsolidateReceipt, error) { + anyRcpt, err := receipt.Issue( + c.id, + result.Error[capegress.ConsolidateOk, capegress.ConsolidateError](failure), + ran.FromInvocation(ranInv), + ) + if err != nil { + return nil, err + } + + reader, err := capegress.NewConsolidateReceiptReader() + if err != nil { + return nil, err + } + + return reader.Read(anyRcpt.Root().Link(), anyRcpt.Blocks()) +} + +func (c *Consolidator) ucanConsolidateHandler( + ctx context.Context, + cap ucan.Capability[capegress.ConsolidateCaveats], + inv invocation.Invocation, + ictx ucanto.InvocationContext, +) (result.Result[capegress.ConsolidateOk, capegress.ConsolidateError], fx.Effects, error) { + // Fetch the original egress/track invocation from the egress/consolidate invocation + trackInvLink := cap.Nb().Cause + blocks, err := blockstore.NewBlockReader(blockstore.WithBlocksIterator(inv.Blocks())) + if err != nil { + return nil, nil, fmt.Errorf("importing invocation blocks: %w", err) + } + + trackInv, err := invocation.NewInvocationView(trackInvLink, blocks) + if err != nil { + return nil, nil, fmt.Errorf("fetching attached track invocation: %w", err) + } + + trackCaveats, err := capegress.TrackCaveatsReader.Read(trackInv.Capabilities()[0].Nb()) + if err != nil { + return nil, nil, fmt.Errorf("reading track caveats: %w", err) + } + + // Fetch receipts from the endpoint + receipts, err := c.fetchReceipts(ctx, trackCaveats.Endpoint, trackCaveats.Receipts) + if err != nil { + return nil, nil, fmt.Errorf("fetching receipts: %w", err) + } + + // Process each receipt in the batch + totalBytes := uint64(0) + for rcpt, err := range receipts { + if err != nil { + log.Errorf("Failed to fetch receipt from batch: %v", err) + continue + } + + retrievalRcpt, err := receipt.Rebind[content.RetrieveOk, fdm.FailureModel](rcpt, content.RetrieveOkType(), fdm.FailureType()) + if err != nil { + log.Warnf("Receipt doesn't seem to be a retrieval receipt: %v", err) + continue + } + + if err := c.validateReceipt(retrievalRcpt); err != nil { + log.Warnf("Invalid receipt: %v", err) + continue + } + + size, err := c.extractSize(retrievalRcpt) + if err != nil { + log.Warnf("Failed to extract size from receipt: %v", err) + continue + } + + totalBytes += size + } + + return result.Ok[capegress.ConsolidateOk, capegress.ConsolidateError](capegress.ConsolidateOk{TotalEgress: totalBytes}), nil, nil +} + +func (c *Consolidator) fetchReceipts(ctx context.Context, endpoint *url.URL, batchCID ucan.Link) (iter.Seq2[receipt.AnyReceipt, error], error) { // Substitute {cid} in the endpoint URL with the receipts CID - batchURLStr := record.Endpoint - batchCID := record.Receipts.String() + batchURLStr, err := url.PathUnescape(endpoint.String()) + if err != nil { + return nil, fmt.Errorf("unescaping endpoint URL: %w", err) + } + + batchCIDStr := batchCID.String() // Handle both {cid} and :cid patterns - batchURLStr = strings.ReplaceAll(batchURLStr, "{cid}", batchCID) - batchURLStr = strings.ReplaceAll(batchURLStr, ":cid", batchCID) + batchURLStr = strings.ReplaceAll(batchURLStr, "{cid}", batchCIDStr) + batchURLStr = strings.ReplaceAll(batchURLStr, ":cid", batchCIDStr) batchURL, err := url.Parse(batchURLStr) if err != nil { @@ -200,23 +336,31 @@ func (c *Consolidator) fetchReceipts(ctx context.Context, record egress.EgressRe if err != nil { return nil, fmt.Errorf("decoding receipt batch: %w", err) } - defer resp.Body.Close() - var rcpts []receipt.AnyReceipt - for blk, err := range blks { - if err != nil { - return nil, fmt.Errorf("iterating over receipt blocks: %w", err) - } + return func(yield func(receipt.AnyReceipt, error) bool) { + for blk, err := range blks { + if err != nil { + if !yield(nil, fmt.Errorf("iterating over batch blocks: %w", err)) { + return + } - rcpt, err := receipt.Extract(blk.Bytes()) - if err != nil { - return nil, fmt.Errorf("extracting receipt: %w", err) - } + continue + } - rcpts = append(rcpts, rcpt) - } + rcpt, err := receipt.Extract(blk.Bytes()) + if err != nil { + if !yield(nil, fmt.Errorf("extracting receipt: %w", err)) { + return + } + + continue + } - return rcpts, nil + if !yield(rcpt, nil) { + return + } + } + }, nil } func (c *Consolidator) validateReceipt(retrievalRcpt receipt.Receipt[content.RetrieveOk, fdm.FailureModel]) error { diff --git a/internal/db/consolidated/consolidated.go b/internal/db/consolidated/consolidated.go index 6d68d68..f1dce66 100644 --- a/internal/db/consolidated/consolidated.go +++ b/internal/db/consolidated/consolidated.go @@ -2,6 +2,7 @@ package consolidated import ( "context" + "iter" "github.com/storacha/go-ucanto/did" "github.com/storacha/go-ucanto/ucan" @@ -16,6 +17,6 @@ type ConsolidatedRecord struct { type ConsolidatedTable interface { Add(ctx context.Context, nodeDID did.DID, receiptsBatchCID ucan.Link, bytes uint64) error - Get(ctx context.Context, nodeDID did.DID, receiptsBatchCID ucan.Link) (*ConsolidatedRecord, error) - GetByNode(ctx context.Context, nodeDID did.DID) ([]ConsolidatedRecord, error) + Get(ctx context.Context, nodeDID did.DID, receiptsBatchCID ucan.Link) (ConsolidatedRecord, error) + List(ctx context.Context, nodeDID did.DID) (iter.Seq2[ConsolidatedRecord, error], error) } diff --git a/internal/db/consolidated/dynamodb.go b/internal/db/consolidated/dynamodb.go index 605ab45..65b6d87 100644 --- a/internal/db/consolidated/dynamodb.go +++ b/internal/db/consolidated/dynamodb.go @@ -3,6 +3,7 @@ package consolidated import ( "context" "fmt" + "iter" "time" "github.com/aws/aws-sdk-go-v2/aws" @@ -57,7 +58,7 @@ func (d *DynamoConsolidatedTable) Add(ctx context.Context, nodeDID did.DID, rece return nil } -func (d *DynamoConsolidatedTable) Get(ctx context.Context, nodeDID did.DID, receiptsBatchCID ucan.Link) (*ConsolidatedRecord, error) { +func (d *DynamoConsolidatedTable) Get(ctx context.Context, nodeDID did.DID, receiptsBatchCID ucan.Link) (ConsolidatedRecord, error) { result, err := d.client.GetItem(ctx, &dynamodb.GetItemInput{ TableName: aws.String(d.tableName), Key: map[string]types.AttributeValue{ @@ -66,17 +67,17 @@ func (d *DynamoConsolidatedTable) Get(ctx context.Context, nodeDID did.DID, rece }, }) if err != nil { - return nil, fmt.Errorf("getting consolidated record: %w", err) + return ConsolidatedRecord{}, fmt.Errorf("getting consolidated record: %w", err) } if result.Item == nil { - return nil, fmt.Errorf("record not found") + return ConsolidatedRecord{}, fmt.Errorf("record not found") } return d.unmarshalRecord(result.Item) } -func (d *DynamoConsolidatedTable) GetByNode(ctx context.Context, nodeDID did.DID) ([]ConsolidatedRecord, error) { +func (d *DynamoConsolidatedTable) List(ctx context.Context, nodeDID did.DID) (iter.Seq2[ConsolidatedRecord, error], error) { result, err := d.client.Query(ctx, &dynamodb.QueryInput{ TableName: aws.String(d.tableName), KeyConditionExpression: aws.String("NodeDID = :nodeDID"), @@ -88,36 +89,36 @@ func (d *DynamoConsolidatedTable) GetByNode(ctx context.Context, nodeDID did.DID return nil, fmt.Errorf("querying consolidated records by node: %w", err) } - records := make([]ConsolidatedRecord, 0, len(result.Items)) - for _, item := range result.Items { - record, err := d.unmarshalRecord(item) - if err != nil { - return nil, err + return func(yield func(ConsolidatedRecord, error) bool) { + for _, item := range result.Items { + record, err := d.unmarshalRecord(item) + if err != nil { + if !yield(record, err) { + return + } + } } - records = append(records, *record) - } - - return records, nil + }, nil } -func (d *DynamoConsolidatedTable) unmarshalRecord(item map[string]types.AttributeValue) (*ConsolidatedRecord, error) { +func (d *DynamoConsolidatedTable) unmarshalRecord(item map[string]types.AttributeValue) (ConsolidatedRecord, error) { var record consolidatedRecord if err := attributevalue.UnmarshalMap(item, &record); err != nil { - return nil, fmt.Errorf("unmarshaling consolidated record: %w", err) + return ConsolidatedRecord{}, fmt.Errorf("unmarshaling consolidated record: %w", err) } parsedDID, err := did.Parse(record.NodeDID) if err != nil { - return nil, fmt.Errorf("parsing node DID: %w", err) + return ConsolidatedRecord{}, fmt.Errorf("parsing node DID: %w", err) } c, err := cid.Decode(record.ReceiptsBatchCID) if err != nil { - return nil, fmt.Errorf("parsing receipts batch CID: %w", err) + return ConsolidatedRecord{}, fmt.Errorf("parsing receipts batch CID: %w", err) } receiptsBatchCID := cidlink.Link{Cid: c} - return &ConsolidatedRecord{ + return ConsolidatedRecord{ NodeDID: parsedDID, ReceiptsBatchCID: receiptsBatchCID, TotalBytes: record.TotalBytes, diff --git a/internal/db/egress/dynamodb.go b/internal/db/egress/dynamodb.go index 9fa3e7a..4a29133 100644 --- a/internal/db/egress/dynamodb.go +++ b/internal/db/egress/dynamodb.go @@ -2,7 +2,9 @@ package egress import ( "context" + "encoding/base64" "fmt" + "io" "math/rand" "net/url" "time" @@ -14,6 +16,8 @@ import ( "github.com/google/uuid" "github.com/ipfs/go-cid" cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/storacha/go-ucanto/core/delegation" + "github.com/storacha/go-ucanto/core/invocation" "github.com/storacha/go-ucanto/did" "github.com/storacha/go-ucanto/ucan" ) @@ -29,46 +33,13 @@ func NewDynamoEgressTable(client *dynamodb.Client, tableName string) *DynamoEgre return &DynamoEgressTable{client, tableName} } -type egressRecord struct { - // Partition key: "DATE#SHARD" (e.g., "2025-08-18#0") - // Where SHARD is a number 0-9 to distribute writes - PK string `dynamodbav:"PK"` - - // Sort key: "RECEIVED_AT#NODE_ID#UNIQUE_ID" - // This allows sorting by time within each date partition - SK string `dynamodbav:"SK"` - - NodeID string `dynamodbav:"nodeID"` - Receipts string `dynamodbav:"receipts"` - Endpoint string `dynamodbav:"endpoint"` - Cause string `dynamodbav:"cause"` - ReceivedAt string `dynamodbav:"receivedAt"` - Processed bool `dynamodbav:"proc"` -} - -func newRecord(nodeID did.DID, receipts ucan.Link, endpoint *url.URL, cause ucan.Link) egressRecord { - // TODO: review keys to improve performance and access patterns - receivedAt := time.Now().UTC() - dateStr := receivedAt.Format("2006-01-02") - shard := rand.Intn(10) - pk := fmt.Sprintf("%s#%d", dateStr, shard) - sk := fmt.Sprintf("%s#%s#%s", dateStr, nodeID, uuid.New()) - endpointStr, _ := url.PathUnescape(endpoint.String()) - - return egressRecord{ - PK: pk, - SK: sk, - NodeID: nodeID.String(), - Receipts: receipts.String(), - Endpoint: endpointStr, - Cause: cause.String(), - ReceivedAt: receivedAt.Format(time.RFC3339), - Processed: false, +func (d *DynamoEgressTable) Record(ctx context.Context, nodeID did.DID, receipts ucan.Link, endpoint *url.URL, cause invocation.Invocation) error { + record, err := newRecord(nodeID, receipts, endpoint, cause) + if err != nil { + return fmt.Errorf("creating egress record: %w", err) } -} -func (d *DynamoEgressTable) Record(ctx context.Context, nodeID did.DID, receipts ucan.Link, endpoint *url.URL, cause ucan.Link) error { - item, err := attributevalue.MarshalMap(newRecord(nodeID, receipts, endpoint, cause)) + item, err := attributevalue.MarshalMap(record) if err != nil { return fmt.Errorf("serializing egress record: %w", err) } @@ -79,6 +50,7 @@ func (d *DynamoEgressTable) Record(ctx context.Context, nodeID did.DID, receipts if err != nil { return fmt.Errorf("storing egress record: %w", err) } + return nil } @@ -105,43 +77,12 @@ func (d *DynamoEgressTable) GetUnprocessed(ctx context.Context, limit int) ([]Eg } for _, item := range result.Items { - var record egressRecord - if err := attributevalue.UnmarshalMap(item, &record); err != nil { - return nil, fmt.Errorf("unmarshaling egress record: %w", err) - } - - nodeID, err := did.Parse(record.NodeID) + record, err := d.unmarshalRecord(item) if err != nil { - return nil, fmt.Errorf("parsing node DID: %w", err) - } - - c, err := cid.Decode(record.Receipts) - if err != nil { - return nil, fmt.Errorf("parsing receipts CID: %w", err) - } - receipts := cidlink.Link{Cid: c} - - receivedAt, err := time.Parse(time.RFC3339, record.ReceivedAt) - if err != nil { - return nil, fmt.Errorf("parsing received at time: %w", err) + return nil, fmt.Errorf("unmarshaling egress record: %w", err) } - cause, err := cid.Decode(record.Cause) - if err != nil { - return nil, fmt.Errorf("parsing cause CID: %w", err) - } - causeLink := cidlink.Link{Cid: cause} - - allRecords = append(allRecords, EgressRecord{ - PK: record.PK, - SK: record.SK, - NodeID: nodeID, - Receipts: receipts, - Endpoint: record.Endpoint, - Cause: causeLink, - ReceivedAt: receivedAt, - Processed: record.Processed, - }) + allRecords = append(allRecords, *record) if len(allRecords) >= limit { return allRecords, nil @@ -152,6 +93,98 @@ func (d *DynamoEgressTable) GetUnprocessed(ctx context.Context, limit int) ([]Eg return allRecords, nil } +type egressRecord struct { + // Partition key: "DATE#SHARD" (e.g., "2025-08-18#0") + // Where SHARD is a number 0-9 to distribute writes + PK string `dynamodbav:"PK"` + + // Sort key: "RECEIVED_AT#NODE_ID#UNIQUE_ID" + // This allows sorting by time within each date partition + SK string `dynamodbav:"SK"` + + NodeID string `dynamodbav:"nodeID"` + Receipts string `dynamodbav:"receipts"` + Endpoint string `dynamodbav:"endpoint"` + Cause []byte `dynamodbav:"cause"` + ReceivedAt string `dynamodbav:"receivedAt"` + Processed bool `dynamodbav:"proc"` +} + +func newRecord(nodeID did.DID, receipts ucan.Link, endpoint *url.URL, cause invocation.Invocation) (*egressRecord, error) { + // TODO: review keys to improve performance and access patterns + receivedAt := time.Now().UTC() + dateStr := receivedAt.Format("2006-01-02") + shard := rand.Intn(10) + pk := fmt.Sprintf("%s#%d", dateStr, shard) + sk := fmt.Sprintf("%s#%s#%s", dateStr, nodeID, uuid.New()) + endpointStr, _ := url.PathUnescape(endpoint.String()) + + // binary values must be base64-encoded before sending them to DynamoDB + arch := cause.Archive() + archBytes, err := io.ReadAll(arch) + if err != nil { + return nil, fmt.Errorf("reading invocation archive: %w", err) + } + + causeBytes := make([]byte, base64.StdEncoding.EncodedLen(len(archBytes))) + base64.StdEncoding.Encode(causeBytes, archBytes) + + return &egressRecord{ + PK: pk, + SK: sk, + NodeID: nodeID.String(), + Receipts: receipts.String(), + Endpoint: endpointStr, + Cause: causeBytes, + ReceivedAt: receivedAt.Format(time.RFC3339), + Processed: false, + }, nil +} + +func (d *DynamoEgressTable) unmarshalRecord(item map[string]types.AttributeValue) (*EgressRecord, error) { + var record egressRecord + if err := attributevalue.UnmarshalMap(item, &record); err != nil { + return nil, fmt.Errorf("unmarshaling egress record: %w", err) + } + + nodeID, err := did.Parse(record.NodeID) + if err != nil { + return nil, fmt.Errorf("parsing node DID: %w", err) + } + + c, err := cid.Decode(record.Receipts) + if err != nil { + return nil, fmt.Errorf("parsing receipts CID: %w", err) + } + receipts := cidlink.Link{Cid: c} + + archBytes := make([]byte, base64.StdEncoding.DecodedLen(len(record.Cause))) + if _, err := base64.StdEncoding.Decode(archBytes, record.Cause); err != nil { + return nil, fmt.Errorf("decoding cause archive: %w", err) + } + + cause, err := delegation.Extract(archBytes) + if err != nil { + return nil, fmt.Errorf("extracting cause: %w", err) + } + + receivedAt, err := time.Parse(time.RFC3339, record.ReceivedAt) + if err != nil { + return nil, fmt.Errorf("parsing received at time: %w", err) + } + + return &EgressRecord{ + PK: record.PK, + SK: record.SK, + NodeID: nodeID, + Receipts: receipts, + Endpoint: record.Endpoint, + Cause: cause, + ReceivedAt: receivedAt, + Processed: record.Processed, + }, nil +} + func (d *DynamoEgressTable) MarkAsProcessed(ctx context.Context, records []EgressRecord) error { for _, record := range records { _, err := d.client.UpdateItem(ctx, &dynamodb.UpdateItemInput{ diff --git a/internal/db/egress/egress.go b/internal/db/egress/egress.go index caac0f1..ae85ddc 100644 --- a/internal/db/egress/egress.go +++ b/internal/db/egress/egress.go @@ -5,6 +5,7 @@ import ( "net/url" "time" + "github.com/storacha/go-ucanto/core/invocation" "github.com/storacha/go-ucanto/did" "github.com/storacha/go-ucanto/ucan" ) @@ -15,13 +16,13 @@ type EgressRecord struct { NodeID did.DID Receipts ucan.Link Endpoint string - Cause ucan.Link + Cause invocation.Invocation ReceivedAt time.Time Processed bool } type EgressTable interface { - Record(ctx context.Context, nodeID did.DID, receipt ucan.Link, endpoint *url.URL, cause ucan.Link) error + Record(ctx context.Context, nodeID did.DID, receipt ucan.Link, endpoint *url.URL, cause invocation.Invocation) error GetUnprocessed(ctx context.Context, limit int) ([]EgressRecord, error) MarkAsProcessed(ctx context.Context, records []EgressRecord) error } diff --git a/internal/server/handlers.go b/internal/server/handlers.go index 0336279..5a5e547 100644 --- a/internal/server/handlers.go +++ b/internal/server/handlers.go @@ -51,7 +51,7 @@ func (s *Server) ucanHandler() http.HandlerFunc { func (s *Server) getReceiptsHandler() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { cidStr := r.PathValue("cid") - cid, err := cid.Decode(cidStr) + cid, err := cid.Parse(cidStr) if err != nil { w.WriteHeader(http.StatusBadRequest) return diff --git a/internal/server/methods.go b/internal/server/methods.go index 49548fe..a46da8b 100644 --- a/internal/server/methods.go +++ b/internal/server/methods.go @@ -39,7 +39,7 @@ func ucanTrackHandler(svc *service.Service) func( receipts := cap.Nb().Receipts endpoint := cap.Nb().Endpoint - err := svc.Record(ctx, nodeDID, receipts, endpoint, inv.Link()) + err := svc.Record(ctx, nodeDID, receipts, endpoint, inv) if err != nil { return result.Error[egress.TrackOk, egress.TrackError](egress.NewTrackError(err.Error())), nil, nil } diff --git a/internal/service/service.go b/internal/service/service.go index 0b47024..93de49d 100644 --- a/internal/service/service.go +++ b/internal/service/service.go @@ -4,6 +4,7 @@ import ( "context" "net/url" + "github.com/storacha/go-ucanto/core/invocation" "github.com/storacha/go-ucanto/did" "github.com/storacha/go-ucanto/principal" "github.com/storacha/go-ucanto/ucan" @@ -20,6 +21,6 @@ func New(id principal.Signer, egressTable egress.EgressTable) (*Service, error) return &Service{id: id, egressTable: egressTable}, nil } -func (s *Service) Record(ctx context.Context, nodeDID did.DID, receipts ucan.Link, endpoint *url.URL, cause ucan.Link) error { +func (s *Service) Record(ctx context.Context, nodeDID did.DID, receipts ucan.Link, endpoint *url.URL, cause invocation.Invocation) error { return s.egressTable.Record(ctx, nodeDID, receipts, endpoint, cause) }