Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.50
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.50.0 h1:ig/FpDD2JofP/NExKQUbn7uOSZzJAQqogfqluZK4ed4=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.50.0/go.mod h1:otE2jQekW/PqXk1Awf5lmfokJx4uwuqcj1ab5SpGeW0=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/LumeraProtocol/lumera v1.12.0 h1:prh3k8yJrCli0qFLTQmmzTg2w4KyNzpHq6YaWPDWLNM=
github.com/LumeraProtocol/lumera v1.12.0 h1:BHkPF/vCKyGFKtl2MMxtRpUyzraJ96rWY9FniTbG6cU=
github.com/LumeraProtocol/lumera v1.12.0/go.mod h1:/G9LTPZB+261tHoWoj7q+1fn+O/VV0zzagwLdsThSNo=
github.com/LumeraProtocol/rq-go v0.2.1 h1:8B3UzRChLsGMmvZ+UVbJsJj6JZzL9P9iYxbdUwGsQI4=
github.com/LumeraProtocol/rq-go v0.2.1/go.mod h1:APnKCZRh1Es2Vtrd2w4kCLgAyaL5Bqrkz/BURoRJ+O8=
Expand Down
12 changes: 12 additions & 0 deletions pkg/lumera/modules/audit/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,18 @@ func (m *module) GetEpochReport(ctx context.Context, epochID uint64, supernodeAc
return resp, nil
}

func (m *module) GetEpochReportsByReporter(ctx context.Context, reporterAccount string, epochID uint64) (*types.QueryEpochReportsByReporterResponse, error) {
resp, err := m.client.EpochReportsByReporter(ctx, &types.QueryEpochReportsByReporterRequest{
SupernodeAccount: reporterAccount,
EpochId: epochID,
FilterByEpochId: true,
})
if err != nil {
return nil, fmt.Errorf("failed to get epoch reports by reporter: %w", err)
}
return resp, nil
}

func (m *module) GetNodeSuspicionState(ctx context.Context, supernodeAccount string) (*types.QueryNodeSuspicionStateResponse, error) {
resp, err := m.client.NodeSuspicionState(ctx, &types.QueryNodeSuspicionStateRequest{
SupernodeAccount: supernodeAccount,
Expand Down
1 change: 1 addition & 0 deletions pkg/lumera/modules/audit/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Module interface {
GetCurrentEpoch(ctx context.Context) (*types.QueryCurrentEpochResponse, error)
GetAssignedTargets(ctx context.Context, supernodeAccount string, epochID uint64) (*types.QueryAssignedTargetsResponse, error)
GetEpochReport(ctx context.Context, epochID uint64, supernodeAccount string) (*types.QueryEpochReportResponse, error)
GetEpochReportsByReporter(ctx context.Context, reporterAccount string, epochID uint64) (*types.QueryEpochReportsByReporterResponse, error)

// LEP-6 storage-truth state queries.
GetNodeSuspicionState(ctx context.Context, supernodeAccount string) (*types.QueryNodeSuspicionStateResponse, error)
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/queries/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ type LocalStoreInterface interface {
PingHistoryQueries
HealthCheckChallengeQueries
LEP6HealQueries
RecheckQueries
}
54 changes: 54 additions & 0 deletions pkg/storage/queries/recheck.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package queries

import (
"context"
"database/sql"
"fmt"
"time"

audittypes "github.com/LumeraProtocol/lumera/x/audit/v1/types"
)

type RecheckSubmissionRecord struct {
EpochID uint64
TicketID string
TargetAccount string
ChallengedTranscriptHash string
RecheckTranscriptHash string
ResultClass audittypes.StorageProofResultClass
SubmittedAt int64
}

const createStorageRecheckSubmissions = `
CREATE TABLE IF NOT EXISTS storage_recheck_submissions (
epoch_id INTEGER NOT NULL,
ticket_id TEXT NOT NULL,
target_account TEXT NOT NULL,
challenged_transcript_hash TEXT NOT NULL,
recheck_transcript_hash TEXT NOT NULL,
result_class INTEGER NOT NULL,
submitted_at INTEGER NOT NULL,
PRIMARY KEY (epoch_id, ticket_id)
);`

func (s *SQLiteStore) HasRecheckSubmission(ctx context.Context, epochID uint64, ticketID string) (bool, error) {
const stmt = `SELECT 1 FROM storage_recheck_submissions WHERE epoch_id = ? AND ticket_id = ? LIMIT 1`
var one int
err := s.db.QueryRowContext(ctx, stmt, epochID, ticketID).Scan(&one)
if err != nil {
if err == sql.ErrNoRows {
return false, nil
}
return false, err
}
return true, nil
}

func (s *SQLiteStore) RecordRecheckSubmission(ctx context.Context, epochID uint64, ticketID, targetAccount, challengedTranscriptHash, recheckTranscriptHash string, resultClass audittypes.StorageProofResultClass) error {
const stmt = `INSERT OR IGNORE INTO storage_recheck_submissions (epoch_id, ticket_id, target_account, challenged_transcript_hash, recheck_transcript_hash, result_class, submitted_at) VALUES (?, ?, ?, ?, ?, ?, ?)`
if epochID == 0 || ticketID == "" {
return fmt.Errorf("epoch_id and ticket_id are required")
}
_, err := s.db.ExecContext(ctx, stmt, epochID, ticketID, targetAccount, challengedTranscriptHash, recheckTranscriptHash, int32(resultClass), time.Now().Unix())
return err
}
12 changes: 12 additions & 0 deletions pkg/storage/queries/recheck_interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package queries

import (
"context"

audittypes "github.com/LumeraProtocol/lumera/x/audit/v1/types"
)

type RecheckQueries interface {
HasRecheckSubmission(ctx context.Context, epochID uint64, ticketID string) (bool, error)
RecordRecheckSubmission(ctx context.Context, epochID uint64, ticketID, targetAccount, challengedTranscriptHash, recheckTranscriptHash string, resultClass audittypes.StorageProofResultClass) error
}
41 changes: 41 additions & 0 deletions pkg/storage/queries/recheck_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package queries

import (
"context"
"testing"

audittypes "github.com/LumeraProtocol/lumera/x/audit/v1/types"
"github.com/jmoiron/sqlx"
_ "github.com/mattn/go-sqlite3"
"github.com/stretchr/testify/require"
)

func TestRecheckSubmissionDedupKeyEpochTicket(t *testing.T) {
db := sqlx.MustConnect("sqlite3", ":memory:")
defer db.Close()
_, err := db.Exec(createStorageRecheckSubmissions)
require.NoError(t, err)
store := &SQLiteStore{db: db}
ctx := context.Background()

exists, err := store.HasRecheckSubmission(ctx, 7, "ticket-1")
require.NoError(t, err)
require.False(t, exists)

require.NoError(t, store.RecordRecheckSubmission(ctx, 7, "ticket-1", "target-a", "orig", "rh1", audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_PASS))
exists, err = store.HasRecheckSubmission(ctx, 7, "ticket-1")
require.NoError(t, err)
require.True(t, exists)

// Same ticket in a different epoch is intentionally a different replay key.
exists, err = store.HasRecheckSubmission(ctx, 8, "ticket-1")
require.NoError(t, err)
require.False(t, exists)

// INSERT OR IGNORE makes local retry recording idempotent and preserves the
// first successful on-chain submission record.
require.NoError(t, store.RecordRecheckSubmission(ctx, 7, "ticket-1", "target-b", "orig2", "rh2", audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_RECHECK_CONFIRMED_FAIL))
var target string
require.NoError(t, db.QueryRowContext(ctx, `SELECT target_account FROM storage_recheck_submissions WHERE epoch_id=? AND ticket_id=?`, 7, "ticket-1").Scan(&target))
require.Equal(t, "target-a", target)
}
4 changes: 4 additions & 0 deletions pkg/storage/queries/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,10 @@ func OpenHistoryDB() (LocalStoreInterface, error) {
return nil, fmt.Errorf("cannot create heal_verifications_submitted: %w", err)
}

if _, err := db.Exec(createStorageRecheckSubmissions); err != nil {
return nil, fmt.Errorf("cannot create storage_recheck_submissions: %w", err)
}

_, _ = db.Exec(alterTaskHistory)

_, _ = db.Exec(alterTablePingHistory)
Expand Down
4 changes: 4 additions & 0 deletions pkg/testutil/lumera.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,10 @@ func (m *MockAuditModule) GetEpochReport(ctx context.Context, epochID uint64, su
return &audittypes.QueryEpochReportResponse{}, nil
}

func (m *MockAuditModule) GetEpochReportsByReporter(ctx context.Context, reporterAccount string, epochID uint64) (*audittypes.QueryEpochReportsByReporterResponse, error) {
return &audittypes.QueryEpochReportsByReporterResponse{}, nil
}

func (m *MockAuditModule) GetNodeSuspicionState(ctx context.Context, supernodeAccount string) (*audittypes.QueryNodeSuspicionStateResponse, error) {
return &audittypes.QueryNodeSuspicionStateResponse{}, nil
}
Expand Down
17 changes: 17 additions & 0 deletions supernode/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
cascadeService "github.com/LumeraProtocol/supernode/v2/supernode/cascade"
"github.com/LumeraProtocol/supernode/v2/supernode/config"
hostReporterService "github.com/LumeraProtocol/supernode/v2/supernode/host_reporter"
recheckService "github.com/LumeraProtocol/supernode/v2/supernode/recheck"
selfHealingService "github.com/LumeraProtocol/supernode/v2/supernode/self_healing"
statusService "github.com/LumeraProtocol/supernode/v2/supernode/status"
storageChallengeService "github.com/LumeraProtocol/supernode/v2/supernode/storage_challenge"
Expand Down Expand Up @@ -219,6 +220,7 @@ The supernode will connect to the Lumera network and begin participating in the
WithArtifactReader(newP2PArtifactReader(p2pService)).
WithRecipientSigner(kr, appConfig.SupernodeConfig.KeyName)
var storageChallengeRunner *storageChallengeService.Service
var recheckRunner *recheckService.Service
if appConfig.StorageChallengeConfig.Enabled {
storageChallengeRunner, err = storageChallengeService.NewService(
appConfig.SupernodeConfig.Identity,
Expand Down Expand Up @@ -254,6 +256,18 @@ The supernode will connect to the Lumera network and begin participating in the
logtrace.Fatal(ctx, "Failed to initialize LEP-6 dispatcher", logtrace.Fields{"error": derr.Error()})
}
storageChallengeRunner.SetLEP6Dispatcher(dispatcher)

if appConfig.StorageChallengeConfig.LEP6.Recheck.Enabled {
rc := appConfig.StorageChallengeConfig.LEP6.Recheck
tickInterval := time.Duration(rc.TickIntervalMs) * time.Millisecond
recheckCfg := recheckService.Config{Enabled: true, LookbackEpochs: rc.LookbackEpochs, MaxPerTick: rc.MaxPerTick, TickInterval: tickInterval}
attestor := recheckService.NewAttestor(appConfig.SupernodeConfig.Identity, lumeraClient.AuditMsg(), historyStore)
reporterSource := recheckService.NewSupernodeReporterSource(lumeraClient.SuperNode(), appConfig.SupernodeConfig.Identity)
recheckRunner, err = recheckService.NewServiceWithReporters(recheckCfg, lumeraClient.Audit(), historyStore, dispatcher, attestor, appConfig.SupernodeConfig.Identity, reporterSource)
if err != nil {
logtrace.Fatal(ctx, "Failed to initialize LEP-6 recheck runner", logtrace.Fields{"error": err.Error()})
}
}
}
}

Expand Down Expand Up @@ -359,6 +373,9 @@ The supernode will connect to the Lumera network and begin participating in the
if selfHealingRunner != nil {
services = append(services, selfHealingRunner)
}
if recheckRunner != nil {
services = append(services, recheckRunner)
}
servicesErr <- RunServices(ctx, services...)
}()

Expand Down
9 changes: 9 additions & 0 deletions supernode/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ type StorageChallengeLEP6Config struct {
// RecipientReadTimeout caps a single GetCompoundProof RPC. Default
// 30s.
RecipientReadTimeout time.Duration `yaml:"recipient_read_timeout,omitempty"`
// Recheck owns the PR-5 storage-truth recheck evidence submitter.
Recheck StorageRecheckConfig `yaml:"recheck,omitempty"`
}

type StorageRecheckConfig struct {
Enabled bool `yaml:"enabled"`
LookbackEpochs uint64 `yaml:"lookback_epochs,omitempty"`
MaxPerTick int `yaml:"max_per_tick,omitempty"`
TickIntervalMs int `yaml:"tick_interval_ms,omitempty"`
}

// SelfHealingConfig configures the LEP-6 chain-driven self-healing runtime
Expand Down
6 changes: 5 additions & 1 deletion supernode/host_reporter/tick_behavior_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
type stubAuditModule struct {
currentEpoch *audittypes.QueryCurrentEpochResponse
anchor *audittypes.QueryEpochAnchorResponse
epochReport *audittypes.QueryEpochReportResponse
epochReportErr error
assigned *audittypes.QueryAssignedTargetsResponse
}
Expand All @@ -50,7 +51,10 @@ func (s *stubAuditModule) GetEpochReport(ctx context.Context, epochID uint64, su
if s.epochReportErr != nil {
return nil, s.epochReportErr
}
return &audittypes.QueryEpochReportResponse{}, nil
return s.epochReport, nil
}
func (s *stubAuditModule) GetEpochReportsByReporter(ctx context.Context, reporterAccount string, epochID uint64) (*audittypes.QueryEpochReportsByReporterResponse, error) {
return &audittypes.QueryEpochReportsByReporterResponse{}, nil
}
func (s *stubAuditModule) GetNodeSuspicionState(ctx context.Context, supernodeAccount string) (*audittypes.QueryNodeSuspicionStateResponse, error) {
return &audittypes.QueryNodeSuspicionStateResponse{}, nil
Expand Down
65 changes: 65 additions & 0 deletions supernode/recheck/attestor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package recheck

import (
"context"
"fmt"
"strings"

audittypes "github.com/LumeraProtocol/lumera/x/audit/v1/types"
sdktx "github.com/cosmos/cosmos-sdk/types/tx"
)

type TxSubmitter interface {
SubmitStorageRecheckEvidence(ctx context.Context, epochID uint64, challengedSupernodeAccount, ticketID, challengedResultTranscriptHash, recheckTranscriptHash string, recheckResultClass audittypes.StorageProofResultClass, details string) (*sdktx.BroadcastTxResponse, error)
}

type Attestor struct {
self string
msg TxSubmitter
store Store
}

func NewAttestor(self string, msg TxSubmitter, store Store) *Attestor {
return &Attestor{self: strings.TrimSpace(self), msg: msg, store: store}
}

func (a *Attestor) Submit(ctx context.Context, c Candidate, r RecheckResult) error {
if a == nil || a.msg == nil || a.store == nil {
return fmt.Errorf("recheck attestor missing deps")
}
if !c.Valid() || c.TargetAccount == a.self || c.OriginalReporter == a.self {
return fmt.Errorf("invalid recheck candidate")
}
if strings.TrimSpace(r.TranscriptHash) == "" || !validRecheckResultClass(r.ResultClass) {
return fmt.Errorf("invalid recheck result")
}
_, err := a.msg.SubmitStorageRecheckEvidence(ctx, c.EpochID, c.TargetAccount, c.TicketID, c.ChallengedTranscriptHash, r.TranscriptHash, r.ResultClass, r.Details)
if err != nil {
if isAlreadySubmittedError(err) {
return a.store.RecordRecheckSubmission(ctx, c.EpochID, c.TicketID, c.TargetAccount, c.ChallengedTranscriptHash, r.TranscriptHash, r.ResultClass)
}
return err
}
return a.store.RecordRecheckSubmission(ctx, c.EpochID, c.TicketID, c.TargetAccount, c.ChallengedTranscriptHash, r.TranscriptHash, r.ResultClass)
}

func validRecheckResultClass(cls audittypes.StorageProofResultClass) bool {
switch cls {
case audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_PASS,
audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_RECHECK_CONFIRMED_FAIL,
audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_TIMEOUT_OR_NO_RESPONSE,
audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_OBSERVER_QUORUM_FAIL,
audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_INVALID_TRANSCRIPT:
return true
default:
return false
}
}

func isAlreadySubmittedError(err error) bool {
if err == nil {
return false
}
s := strings.ToLower(err.Error())
return strings.Contains(s, "recheck evidence already submitted")
}
Comment thread
j-rafique marked this conversation as resolved.
Loading