diff --git a/go.sum b/go.sum index c10e7464..2f92226d 100644 --- a/go.sum +++ b/go.sum @@ -111,7 +111,7 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.50 github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.50.0 h1:ig/FpDD2JofP/NExKQUbn7uOSZzJAQqogfqluZK4ed4= github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.50.0/go.mod h1:otE2jQekW/PqXk1Awf5lmfokJx4uwuqcj1ab5SpGeW0= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= -github.com/LumeraProtocol/lumera v1.12.0 h1:prh3k8yJrCli0qFLTQmmzTg2w4KyNzpHq6YaWPDWLNM= +github.com/LumeraProtocol/lumera v1.12.0 h1:BHkPF/vCKyGFKtl2MMxtRpUyzraJ96rWY9FniTbG6cU= github.com/LumeraProtocol/lumera v1.12.0/go.mod h1:/G9LTPZB+261tHoWoj7q+1fn+O/VV0zzagwLdsThSNo= github.com/LumeraProtocol/rq-go v0.2.1 h1:8B3UzRChLsGMmvZ+UVbJsJj6JZzL9P9iYxbdUwGsQI4= github.com/LumeraProtocol/rq-go v0.2.1/go.mod h1:APnKCZRh1Es2Vtrd2w4kCLgAyaL5Bqrkz/BURoRJ+O8= diff --git a/pkg/lumera/modules/audit/impl.go b/pkg/lumera/modules/audit/impl.go index 390c1fa2..eed2f4c7 100644 --- a/pkg/lumera/modules/audit/impl.go +++ b/pkg/lumera/modules/audit/impl.go @@ -75,6 +75,18 @@ func (m *module) GetEpochReport(ctx context.Context, epochID uint64, supernodeAc return resp, nil } +func (m *module) GetEpochReportsByReporter(ctx context.Context, reporterAccount string, epochID uint64) (*types.QueryEpochReportsByReporterResponse, error) { + resp, err := m.client.EpochReportsByReporter(ctx, &types.QueryEpochReportsByReporterRequest{ + SupernodeAccount: reporterAccount, + EpochId: epochID, + FilterByEpochId: true, + }) + if err != nil { + return nil, fmt.Errorf("failed to get epoch reports by reporter: %w", err) + } + return resp, nil +} + func (m *module) GetNodeSuspicionState(ctx context.Context, supernodeAccount string) (*types.QueryNodeSuspicionStateResponse, error) { resp, err := m.client.NodeSuspicionState(ctx, &types.QueryNodeSuspicionStateRequest{ SupernodeAccount: supernodeAccount, diff --git a/pkg/lumera/modules/audit/interface.go b/pkg/lumera/modules/audit/interface.go index 957488e5..074512d3 100644 --- a/pkg/lumera/modules/audit/interface.go +++ b/pkg/lumera/modules/audit/interface.go @@ -16,6 +16,7 @@ type Module interface { GetCurrentEpoch(ctx context.Context) (*types.QueryCurrentEpochResponse, error) GetAssignedTargets(ctx context.Context, supernodeAccount string, epochID uint64) (*types.QueryAssignedTargetsResponse, error) GetEpochReport(ctx context.Context, epochID uint64, supernodeAccount string) (*types.QueryEpochReportResponse, error) + GetEpochReportsByReporter(ctx context.Context, reporterAccount string, epochID uint64) (*types.QueryEpochReportsByReporterResponse, error) // LEP-6 storage-truth state queries. GetNodeSuspicionState(ctx context.Context, supernodeAccount string) (*types.QueryNodeSuspicionStateResponse, error) diff --git a/pkg/storage/queries/local.go b/pkg/storage/queries/local.go index f7fa5275..b4c92b33 100644 --- a/pkg/storage/queries/local.go +++ b/pkg/storage/queries/local.go @@ -14,4 +14,5 @@ type LocalStoreInterface interface { PingHistoryQueries HealthCheckChallengeQueries LEP6HealQueries + RecheckQueries } diff --git a/pkg/storage/queries/recheck.go b/pkg/storage/queries/recheck.go new file mode 100644 index 00000000..98b03b35 --- /dev/null +++ b/pkg/storage/queries/recheck.go @@ -0,0 +1,54 @@ +package queries + +import ( + "context" + "database/sql" + "fmt" + "time" + + audittypes "github.com/LumeraProtocol/lumera/x/audit/v1/types" +) + +type RecheckSubmissionRecord struct { + EpochID uint64 + TicketID string + TargetAccount string + ChallengedTranscriptHash string + RecheckTranscriptHash string + ResultClass audittypes.StorageProofResultClass + SubmittedAt int64 +} + +const createStorageRecheckSubmissions = ` +CREATE TABLE IF NOT EXISTS storage_recheck_submissions ( + epoch_id INTEGER NOT NULL, + ticket_id TEXT NOT NULL, + target_account TEXT NOT NULL, + challenged_transcript_hash TEXT NOT NULL, + recheck_transcript_hash TEXT NOT NULL, + result_class INTEGER NOT NULL, + submitted_at INTEGER NOT NULL, + PRIMARY KEY (epoch_id, ticket_id) +);` + +func (s *SQLiteStore) HasRecheckSubmission(ctx context.Context, epochID uint64, ticketID string) (bool, error) { + const stmt = `SELECT 1 FROM storage_recheck_submissions WHERE epoch_id = ? AND ticket_id = ? LIMIT 1` + var one int + err := s.db.QueryRowContext(ctx, stmt, epochID, ticketID).Scan(&one) + if err != nil { + if err == sql.ErrNoRows { + return false, nil + } + return false, err + } + return true, nil +} + +func (s *SQLiteStore) RecordRecheckSubmission(ctx context.Context, epochID uint64, ticketID, targetAccount, challengedTranscriptHash, recheckTranscriptHash string, resultClass audittypes.StorageProofResultClass) error { + const stmt = `INSERT OR IGNORE INTO storage_recheck_submissions (epoch_id, ticket_id, target_account, challenged_transcript_hash, recheck_transcript_hash, result_class, submitted_at) VALUES (?, ?, ?, ?, ?, ?, ?)` + if epochID == 0 || ticketID == "" { + return fmt.Errorf("epoch_id and ticket_id are required") + } + _, err := s.db.ExecContext(ctx, stmt, epochID, ticketID, targetAccount, challengedTranscriptHash, recheckTranscriptHash, int32(resultClass), time.Now().Unix()) + return err +} diff --git a/pkg/storage/queries/recheck_interface.go b/pkg/storage/queries/recheck_interface.go new file mode 100644 index 00000000..8cab83c8 --- /dev/null +++ b/pkg/storage/queries/recheck_interface.go @@ -0,0 +1,12 @@ +package queries + +import ( + "context" + + audittypes "github.com/LumeraProtocol/lumera/x/audit/v1/types" +) + +type RecheckQueries interface { + HasRecheckSubmission(ctx context.Context, epochID uint64, ticketID string) (bool, error) + RecordRecheckSubmission(ctx context.Context, epochID uint64, ticketID, targetAccount, challengedTranscriptHash, recheckTranscriptHash string, resultClass audittypes.StorageProofResultClass) error +} diff --git a/pkg/storage/queries/recheck_test.go b/pkg/storage/queries/recheck_test.go new file mode 100644 index 00000000..2319cff1 --- /dev/null +++ b/pkg/storage/queries/recheck_test.go @@ -0,0 +1,41 @@ +package queries + +import ( + "context" + "testing" + + audittypes "github.com/LumeraProtocol/lumera/x/audit/v1/types" + "github.com/jmoiron/sqlx" + _ "github.com/mattn/go-sqlite3" + "github.com/stretchr/testify/require" +) + +func TestRecheckSubmissionDedupKeyEpochTicket(t *testing.T) { + db := sqlx.MustConnect("sqlite3", ":memory:") + defer db.Close() + _, err := db.Exec(createStorageRecheckSubmissions) + require.NoError(t, err) + store := &SQLiteStore{db: db} + ctx := context.Background() + + exists, err := store.HasRecheckSubmission(ctx, 7, "ticket-1") + require.NoError(t, err) + require.False(t, exists) + + require.NoError(t, store.RecordRecheckSubmission(ctx, 7, "ticket-1", "target-a", "orig", "rh1", audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_PASS)) + exists, err = store.HasRecheckSubmission(ctx, 7, "ticket-1") + require.NoError(t, err) + require.True(t, exists) + + // Same ticket in a different epoch is intentionally a different replay key. + exists, err = store.HasRecheckSubmission(ctx, 8, "ticket-1") + require.NoError(t, err) + require.False(t, exists) + + // INSERT OR IGNORE makes local retry recording idempotent and preserves the + // first successful on-chain submission record. + require.NoError(t, store.RecordRecheckSubmission(ctx, 7, "ticket-1", "target-b", "orig2", "rh2", audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_RECHECK_CONFIRMED_FAIL)) + var target string + require.NoError(t, db.QueryRowContext(ctx, `SELECT target_account FROM storage_recheck_submissions WHERE epoch_id=? AND ticket_id=?`, 7, "ticket-1").Scan(&target)) + require.Equal(t, "target-a", target) +} diff --git a/pkg/storage/queries/sqlite.go b/pkg/storage/queries/sqlite.go index 35b5ef3d..dea02e90 100644 --- a/pkg/storage/queries/sqlite.go +++ b/pkg/storage/queries/sqlite.go @@ -396,6 +396,10 @@ func OpenHistoryDB() (LocalStoreInterface, error) { return nil, fmt.Errorf("cannot create heal_verifications_submitted: %w", err) } + if _, err := db.Exec(createStorageRecheckSubmissions); err != nil { + return nil, fmt.Errorf("cannot create storage_recheck_submissions: %w", err) + } + _, _ = db.Exec(alterTaskHistory) _, _ = db.Exec(alterTablePingHistory) diff --git a/pkg/testutil/lumera.go b/pkg/testutil/lumera.go index 1b35e0f1..9b20d5af 100644 --- a/pkg/testutil/lumera.go +++ b/pkg/testutil/lumera.go @@ -222,6 +222,10 @@ func (m *MockAuditModule) GetEpochReport(ctx context.Context, epochID uint64, su return &audittypes.QueryEpochReportResponse{}, nil } +func (m *MockAuditModule) GetEpochReportsByReporter(ctx context.Context, reporterAccount string, epochID uint64) (*audittypes.QueryEpochReportsByReporterResponse, error) { + return &audittypes.QueryEpochReportsByReporterResponse{}, nil +} + func (m *MockAuditModule) GetNodeSuspicionState(ctx context.Context, supernodeAccount string) (*audittypes.QueryNodeSuspicionStateResponse, error) { return &audittypes.QueryNodeSuspicionStateResponse{}, nil } diff --git a/supernode/cmd/start.go b/supernode/cmd/start.go index b0ac611f..353e8b97 100644 --- a/supernode/cmd/start.go +++ b/supernode/cmd/start.go @@ -24,6 +24,7 @@ import ( cascadeService "github.com/LumeraProtocol/supernode/v2/supernode/cascade" "github.com/LumeraProtocol/supernode/v2/supernode/config" hostReporterService "github.com/LumeraProtocol/supernode/v2/supernode/host_reporter" + recheckService "github.com/LumeraProtocol/supernode/v2/supernode/recheck" selfHealingService "github.com/LumeraProtocol/supernode/v2/supernode/self_healing" statusService "github.com/LumeraProtocol/supernode/v2/supernode/status" storageChallengeService "github.com/LumeraProtocol/supernode/v2/supernode/storage_challenge" @@ -219,6 +220,7 @@ The supernode will connect to the Lumera network and begin participating in the WithArtifactReader(newP2PArtifactReader(p2pService)). WithRecipientSigner(kr, appConfig.SupernodeConfig.KeyName) var storageChallengeRunner *storageChallengeService.Service + var recheckRunner *recheckService.Service if appConfig.StorageChallengeConfig.Enabled { storageChallengeRunner, err = storageChallengeService.NewService( appConfig.SupernodeConfig.Identity, @@ -254,6 +256,18 @@ The supernode will connect to the Lumera network and begin participating in the logtrace.Fatal(ctx, "Failed to initialize LEP-6 dispatcher", logtrace.Fields{"error": derr.Error()}) } storageChallengeRunner.SetLEP6Dispatcher(dispatcher) + + if appConfig.StorageChallengeConfig.LEP6.Recheck.Enabled { + rc := appConfig.StorageChallengeConfig.LEP6.Recheck + tickInterval := time.Duration(rc.TickIntervalMs) * time.Millisecond + recheckCfg := recheckService.Config{Enabled: true, LookbackEpochs: rc.LookbackEpochs, MaxPerTick: rc.MaxPerTick, TickInterval: tickInterval} + attestor := recheckService.NewAttestor(appConfig.SupernodeConfig.Identity, lumeraClient.AuditMsg(), historyStore) + reporterSource := recheckService.NewSupernodeReporterSource(lumeraClient.SuperNode(), appConfig.SupernodeConfig.Identity) + recheckRunner, err = recheckService.NewServiceWithReporters(recheckCfg, lumeraClient.Audit(), historyStore, dispatcher, attestor, appConfig.SupernodeConfig.Identity, reporterSource) + if err != nil { + logtrace.Fatal(ctx, "Failed to initialize LEP-6 recheck runner", logtrace.Fields{"error": err.Error()}) + } + } } } @@ -359,6 +373,9 @@ The supernode will connect to the Lumera network and begin participating in the if selfHealingRunner != nil { services = append(services, selfHealingRunner) } + if recheckRunner != nil { + services = append(services, recheckRunner) + } servicesErr <- RunServices(ctx, services...) }() diff --git a/supernode/config/config.go b/supernode/config/config.go index 619bdfed..1d1327ea 100644 --- a/supernode/config/config.go +++ b/supernode/config/config.go @@ -90,6 +90,15 @@ type StorageChallengeLEP6Config struct { // RecipientReadTimeout caps a single GetCompoundProof RPC. Default // 30s. RecipientReadTimeout time.Duration `yaml:"recipient_read_timeout,omitempty"` + // Recheck owns the PR-5 storage-truth recheck evidence submitter. + Recheck StorageRecheckConfig `yaml:"recheck,omitempty"` +} + +type StorageRecheckConfig struct { + Enabled bool `yaml:"enabled"` + LookbackEpochs uint64 `yaml:"lookback_epochs,omitempty"` + MaxPerTick int `yaml:"max_per_tick,omitempty"` + TickIntervalMs int `yaml:"tick_interval_ms,omitempty"` } // SelfHealingConfig configures the LEP-6 chain-driven self-healing runtime diff --git a/supernode/host_reporter/tick_behavior_test.go b/supernode/host_reporter/tick_behavior_test.go index 3096538c..572fd38c 100644 --- a/supernode/host_reporter/tick_behavior_test.go +++ b/supernode/host_reporter/tick_behavior_test.go @@ -27,6 +27,7 @@ import ( type stubAuditModule struct { currentEpoch *audittypes.QueryCurrentEpochResponse anchor *audittypes.QueryEpochAnchorResponse + epochReport *audittypes.QueryEpochReportResponse epochReportErr error assigned *audittypes.QueryAssignedTargetsResponse } @@ -50,7 +51,10 @@ func (s *stubAuditModule) GetEpochReport(ctx context.Context, epochID uint64, su if s.epochReportErr != nil { return nil, s.epochReportErr } - return &audittypes.QueryEpochReportResponse{}, nil + return s.epochReport, nil +} +func (s *stubAuditModule) GetEpochReportsByReporter(ctx context.Context, reporterAccount string, epochID uint64) (*audittypes.QueryEpochReportsByReporterResponse, error) { + return &audittypes.QueryEpochReportsByReporterResponse{}, nil } func (s *stubAuditModule) GetNodeSuspicionState(ctx context.Context, supernodeAccount string) (*audittypes.QueryNodeSuspicionStateResponse, error) { return &audittypes.QueryNodeSuspicionStateResponse{}, nil diff --git a/supernode/recheck/attestor.go b/supernode/recheck/attestor.go new file mode 100644 index 00000000..069809c9 --- /dev/null +++ b/supernode/recheck/attestor.go @@ -0,0 +1,65 @@ +package recheck + +import ( + "context" + "fmt" + "strings" + + audittypes "github.com/LumeraProtocol/lumera/x/audit/v1/types" + sdktx "github.com/cosmos/cosmos-sdk/types/tx" +) + +type TxSubmitter interface { + SubmitStorageRecheckEvidence(ctx context.Context, epochID uint64, challengedSupernodeAccount, ticketID, challengedResultTranscriptHash, recheckTranscriptHash string, recheckResultClass audittypes.StorageProofResultClass, details string) (*sdktx.BroadcastTxResponse, error) +} + +type Attestor struct { + self string + msg TxSubmitter + store Store +} + +func NewAttestor(self string, msg TxSubmitter, store Store) *Attestor { + return &Attestor{self: strings.TrimSpace(self), msg: msg, store: store} +} + +func (a *Attestor) Submit(ctx context.Context, c Candidate, r RecheckResult) error { + if a == nil || a.msg == nil || a.store == nil { + return fmt.Errorf("recheck attestor missing deps") + } + if !c.Valid() || c.TargetAccount == a.self || c.OriginalReporter == a.self { + return fmt.Errorf("invalid recheck candidate") + } + if strings.TrimSpace(r.TranscriptHash) == "" || !validRecheckResultClass(r.ResultClass) { + return fmt.Errorf("invalid recheck result") + } + _, err := a.msg.SubmitStorageRecheckEvidence(ctx, c.EpochID, c.TargetAccount, c.TicketID, c.ChallengedTranscriptHash, r.TranscriptHash, r.ResultClass, r.Details) + if err != nil { + if isAlreadySubmittedError(err) { + return a.store.RecordRecheckSubmission(ctx, c.EpochID, c.TicketID, c.TargetAccount, c.ChallengedTranscriptHash, r.TranscriptHash, r.ResultClass) + } + return err + } + return a.store.RecordRecheckSubmission(ctx, c.EpochID, c.TicketID, c.TargetAccount, c.ChallengedTranscriptHash, r.TranscriptHash, r.ResultClass) +} + +func validRecheckResultClass(cls audittypes.StorageProofResultClass) bool { + switch cls { + case audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_PASS, + audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_RECHECK_CONFIRMED_FAIL, + audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_TIMEOUT_OR_NO_RESPONSE, + audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_OBSERVER_QUORUM_FAIL, + audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_INVALID_TRANSCRIPT: + return true + default: + return false + } +} + +func isAlreadySubmittedError(err error) bool { + if err == nil { + return false + } + s := strings.ToLower(err.Error()) + return strings.Contains(s, "recheck evidence already submitted") +} diff --git a/supernode/recheck/attestor_test.go b/supernode/recheck/attestor_test.go new file mode 100644 index 00000000..ba7a9729 --- /dev/null +++ b/supernode/recheck/attestor_test.go @@ -0,0 +1,105 @@ +package recheck + +import ( + "context" + "errors" + "testing" + + audittypes "github.com/LumeraProtocol/lumera/x/audit/v1/types" + "github.com/stretchr/testify/require" +) + +func TestAttestor_SubmitsThenPersists(t *testing.T) { + callSeq = 0 + ctx := context.Background() + store := newMemoryStore() + msg := &recordingAuditMsg{} + a := NewAttestor("self", msg, store) + + candidate := Candidate{EpochID: 7, TargetAccount: "target", TicketID: "ticket-1", ChallengedTranscriptHash: "orig-hash", OriginalReporter: "reporter", OriginalResultClass: audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_HASH_MISMATCH} + result := RecheckResult{TranscriptHash: "recheck-hash", ResultClass: audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_PASS, Details: "ok"} + + require.NoError(t, a.Submit(ctx, candidate, result)) + require.Len(t, msg.calls, 1) + require.Equal(t, 1, msg.calls[0].callIndex) + require.Greater(t, store.recordCallIndex, msg.calls[0].callIndex) + exists, err := store.HasRecheckSubmission(ctx, 7, "ticket-1") + require.NoError(t, err) + require.True(t, exists) +} + +func TestAttestor_DoesNotPersistOnTxFailure(t *testing.T) { + ctx := context.Background() + store := newMemoryStore() + msg := &recordingAuditMsg{err: errBoom} + a := NewAttestor("self", msg, store) + + candidate := Candidate{EpochID: 7, TargetAccount: "target", TicketID: "ticket-1", ChallengedTranscriptHash: "orig-hash", OriginalReporter: "reporter", OriginalResultClass: audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_HASH_MISMATCH} + result := RecheckResult{TranscriptHash: "recheck-hash", ResultClass: audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_PASS} + + require.Error(t, a.Submit(ctx, candidate, result)) + exists, err := store.HasRecheckSubmission(ctx, 7, "ticket-1") + require.NoError(t, err) + require.False(t, exists) +} + +func TestAttestor_AcceptsExistingChainRecheckAsIdempotent(t *testing.T) { + ctx := context.Background() + store := newMemoryStore() + msg := &recordingAuditMsg{err: errAlreadyOnChain} + a := NewAttestor("self", msg, store) + + candidate := Candidate{EpochID: 7, TargetAccount: "target", TicketID: "ticket-1", ChallengedTranscriptHash: "orig-hash", OriginalReporter: "reporter", OriginalResultClass: audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_HASH_MISMATCH} + result := RecheckResult{TranscriptHash: "recheck-hash", ResultClass: audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_RECHECK_CONFIRMED_FAIL} + + require.NoError(t, a.Submit(ctx, candidate, result)) + exists, err := store.HasRecheckSubmission(ctx, 7, "ticket-1") + require.NoError(t, err) + require.True(t, exists) +} + +func TestAttestor_DoesNotTreatGenericDuplicateWordsAsIdempotent(t *testing.T) { + ctx := context.Background() + store := newMemoryStore() + msg := &recordingAuditMsg{err: errors.New("connection already closed before duplicate retry could be replayed")} + a := NewAttestor("self", msg, store) + + candidate := Candidate{EpochID: 7, TargetAccount: "target", TicketID: "ticket-1", ChallengedTranscriptHash: "orig-hash", OriginalReporter: "reporter", OriginalResultClass: audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_HASH_MISMATCH} + result := RecheckResult{TranscriptHash: "recheck-hash", ResultClass: audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_RECHECK_CONFIRMED_FAIL} + + require.Error(t, a.Submit(ctx, candidate, result)) + exists, err := store.HasRecheckSubmission(ctx, 7, "ticket-1") + require.NoError(t, err) + require.False(t, exists) +} + +func TestAttestor_RejectsSelfReportedOrSelfTargetCandidate(t *testing.T) { + ctx := context.Background() + store := newMemoryStore() + msg := &recordingAuditMsg{} + attestor := NewAttestor("self", msg, store) + base := Candidate{EpochID: 7, TargetAccount: "target", TicketID: "ticket-1", ChallengedTranscriptHash: "orig-hash", OriginalReporter: "reporter", OriginalResultClass: audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_HASH_MISMATCH} + result := RecheckResult{TranscriptHash: "recheck-hash", ResultClass: audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_PASS} + + selfReporter := base + selfReporter.OriginalReporter = "self" + require.Error(t, attestor.Submit(ctx, selfReporter, result)) + + selfTarget := base + selfTarget.TargetAccount = "self" + require.Error(t, attestor.Submit(ctx, selfTarget, result)) + require.Empty(t, msg.calls) +} + +func TestAttestor_RejectsEmptyRequiredFieldsBeforeTx(t *testing.T) { + ctx := context.Background() + store := newMemoryStore() + msg := &recordingAuditMsg{} + a := NewAttestor("self", msg, store) + + candidate := Candidate{EpochID: 7, TargetAccount: "target", TicketID: "ticket-1", ChallengedTranscriptHash: "orig-hash", OriginalReporter: "reporter", OriginalResultClass: audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_HASH_MISMATCH} + result := RecheckResult{TranscriptHash: "", ResultClass: audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_PASS} + + require.Error(t, a.Submit(ctx, candidate, result)) + require.Empty(t, msg.calls) +} diff --git a/supernode/recheck/eligibility_test.go b/supernode/recheck/eligibility_test.go new file mode 100644 index 00000000..1e58cb0a --- /dev/null +++ b/supernode/recheck/eligibility_test.go @@ -0,0 +1,38 @@ +package recheck + +import ( + "testing" + + audittypes "github.com/LumeraProtocol/lumera/x/audit/v1/types" + "github.com/stretchr/testify/require" +) + +func TestRecheckEligible_AcceptsChainEligibleFailureClasses(t *testing.T) { + for _, cls := range []audittypes.StorageProofResultClass{ + audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_HASH_MISMATCH, + audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_TIMEOUT_OR_NO_RESPONSE, + audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_OBSERVER_QUORUM_FAIL, + audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_INVALID_TRANSCRIPT, + } { + require.True(t, IsRecheckEligibleResultClass(cls), cls.String()) + } +} + +func TestRecheckEligible_RejectsPassAndRecheckConfirmedFail(t *testing.T) { + for _, cls := range []audittypes.StorageProofResultClass{ + audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_PASS, + audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_RECHECK_CONFIRMED_FAIL, + audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_NO_ELIGIBLE_TICKET, + audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_UNSPECIFIED, + } { + require.False(t, IsRecheckEligibleResultClass(cls), cls.String()) + } +} + +func TestMapRecheckOutcome_PreservesSpecFidelity(t *testing.T) { + require.Equal(t, audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_PASS, MapRecheckOutcome(OutcomePass)) + require.Equal(t, audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_RECHECK_CONFIRMED_FAIL, MapRecheckOutcome(OutcomeConfirmedHashMismatch)) + require.Equal(t, audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_TIMEOUT_OR_NO_RESPONSE, MapRecheckOutcome(OutcomeTimeout)) + require.Equal(t, audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_OBSERVER_QUORUM_FAIL, MapRecheckOutcome(OutcomeObserverQuorumFail)) + require.Equal(t, audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_INVALID_TRANSCRIPT, MapRecheckOutcome(OutcomeInvalidTranscript)) +} diff --git a/supernode/recheck/finder.go b/supernode/recheck/finder.go new file mode 100644 index 00000000..28b7c518 --- /dev/null +++ b/supernode/recheck/finder.go @@ -0,0 +1,138 @@ +package recheck + +import ( + "context" + "fmt" + "sort" + "strings" + + audittypes "github.com/LumeraProtocol/lumera/x/audit/v1/types" +) + +type FinderConfig struct { + LookbackEpochs uint64 + MaxPerTick int +} + +func (c FinderConfig) withDefaults() FinderConfig { + if c.LookbackEpochs == 0 { + c.LookbackEpochs = DefaultLookbackEpochs + } + if c.MaxPerTick <= 0 { + c.MaxPerTick = DefaultMaxPerTick + } + return c +} + +type Finder struct { + audit AuditReader + store Store + reporters ReporterSource + self string + cfg FinderConfig +} + +func NewFinder(audit AuditReader, store Store, self string, cfg FinderConfig) *Finder { + return NewFinderWithReporters(audit, store, self, cfg, NewStaticReporterSource(self)) +} + +func NewFinderWithReporters(audit AuditReader, store Store, self string, cfg FinderConfig, reporters ReporterSource) *Finder { + self = strings.TrimSpace(self) + if reporters == nil { + reporters = NewStaticReporterSource(self) + } + return &Finder{audit: audit, store: store, reporters: reporters, self: self, cfg: cfg.withDefaults()} +} + +func (f *Finder) Find(ctx context.Context) ([]Candidate, error) { + if f.audit == nil || f.store == nil { + return nil, fmt.Errorf("recheck finder missing deps") + } + cur, err := f.audit.GetCurrentEpoch(ctx) + if err != nil { + return nil, fmt.Errorf("current epoch: %w", err) + } + if cur == nil || cur.EpochId == 0 { + return nil, nil + } + start := uint64(1) + if cur.EpochId > f.cfg.LookbackEpochs { + start = cur.EpochId - f.cfg.LookbackEpochs + } + reporters, err := f.reporters.ReporterAccounts(ctx) + if err != nil { + return nil, err + } + out := make([]Candidate, 0, f.cfg.MaxPerTick) + seen := map[string]struct{}{} + for epoch := cur.EpochId; epoch >= start; epoch-- { + results := make([]Candidate, 0) + for _, reporter := range reporters { + rep, err := f.audit.GetEpochReportsByReporter(ctx, reporter, epoch) + if err != nil { + return nil, fmt.Errorf("epoch reports reporter %s epoch %d: %w", reporter, epoch, err) + } + if rep == nil { + continue + } + for _, report := range rep.Reports { + results = append(results, candidatesFromReport(epoch, report)...) + } + } + if len(results) == 0 { + if epoch == start { + break + } + continue + } + sort.SliceStable(results, func(i, j int) bool { + if results[i].TicketID == results[j].TicketID { + return results[i].TargetAccount < results[j].TargetAccount + } + return results[i].TicketID < results[j].TicketID + }) + for _, c := range results { + if !c.Valid() || c.TargetAccount == f.self || c.OriginalReporter == f.self { + continue + } + key := fmt.Sprintf("%d/%s", c.EpochID, c.TicketID) + if _, ok := seen[key]; ok { + continue + } + seen[key] = struct{}{} + done, err := f.store.HasRecheckSubmission(ctx, c.EpochID, c.TicketID) + if err != nil { + return nil, err + } + if done { + continue + } + out = append(out, c) + if len(out) >= f.cfg.MaxPerTick { + return out, nil + } + } + if epoch == 0 || epoch == start { + break + } + } + return out, nil +} + +func candidatesFromReport(epochID uint64, report audittypes.EpochReport) []Candidate { + out := make([]Candidate, 0, len(report.StorageProofResults)) + for _, r := range report.StorageProofResults { + if r == nil { + continue + } + out = append(out, Candidate{ + EpochID: epochID, + TargetAccount: strings.TrimSpace(r.TargetSupernodeAccount), + TicketID: strings.TrimSpace(r.TicketId), + ChallengedTranscriptHash: strings.TrimSpace(r.TranscriptHash), + OriginalReporter: strings.TrimSpace(r.ChallengerSupernodeAccount), + OriginalResultClass: r.ResultClass, + }) + } + return out +} diff --git a/supernode/recheck/finder_service_test.go b/supernode/recheck/finder_service_test.go new file mode 100644 index 00000000..e1316308 --- /dev/null +++ b/supernode/recheck/finder_service_test.go @@ -0,0 +1,125 @@ +package recheck + +import ( + "context" + "testing" + "time" + + audittypes "github.com/LumeraProtocol/lumera/x/audit/v1/types" + "github.com/stretchr/testify/require" +) + +func TestFinder_LookbackLimitDedupAndOrder(t *testing.T) { + store := newMemoryStore() + require.NoError(t, store.RecordRecheckSubmission(context.Background(), 9, "done", "target", "h", "rh", audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_PASS)) + a := &stubAudit{current: 10, reports: map[uint64]audittypes.EpochReport{ + 10: {StorageProofResults: []*audittypes.StorageProofResult{ + resFrom("peer", "z", "target-z", "hz", audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_HASH_MISMATCH), + resFrom("peer", "pass", "target-pass", "hp", audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_PASS), + }}, + 9: {StorageProofResults: []*audittypes.StorageProofResult{ + resFrom("peer", "done", "target-done", "hd", audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_HASH_MISMATCH), + resFrom("peer", "a", "target-a", "ha", audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_TIMEOUT_OR_NO_RESPONSE), + }}, + 2: {StorageProofResults: []*audittypes.StorageProofResult{ + resFrom("peer", "too-old", "target-old", "ho", audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_HASH_MISMATCH), + }}, + }} + f := NewFinder(a, store, "self", FinderConfig{LookbackEpochs: 2, MaxPerTick: 2}) + got, err := f.Find(context.Background()) + require.NoError(t, err) + require.Len(t, got, 2) + require.Equal(t, uint64(10), got[0].EpochID) + require.Equal(t, "z", got[0].TicketID) + require.Equal(t, uint64(9), got[1].EpochID) + require.Equal(t, "a", got[1].TicketID) +} + +func TestFinder_ScansNetworkReporterSetNotSelfOnly(t *testing.T) { + store := newMemoryStore() + a := &stubAudit{current: 5, reportsBySource: map[string]map[uint64]audittypes.EpochReport{ + "self": { + 5: {StorageProofResults: []*audittypes.StorageProofResult{ + resFrom("other", "self-ticket", "target-self", "h-self", audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_HASH_MISMATCH), + }}, + }, + "peer-reporter": { + 5: {StorageProofResults: []*audittypes.StorageProofResult{ + {TargetSupernodeAccount: "target-peer", ChallengerSupernodeAccount: "peer-reporter", TicketId: "peer-ticket", TranscriptHash: "h-peer", ResultClass: audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_TIMEOUT_OR_NO_RESPONSE}, + }}, + }, + }} + f := NewFinderWithReporters(a, store, "self", FinderConfig{LookbackEpochs: 1, MaxPerTick: 10}, NewStaticReporterSource("self", "peer-reporter")) + got, err := f.Find(context.Background()) + require.NoError(t, err) + require.Len(t, got, 2) + require.Equal(t, "peer-ticket", got[0].TicketID) + require.Equal(t, "peer-reporter", got[0].OriginalReporter) + require.Equal(t, "self-ticket", got[1].TicketID) +} + +func TestFinder_SkipsSelfTargetCandidate(t *testing.T) { + store := newMemoryStore() + a := &stubAudit{current: 5, reportsBySource: map[string]map[uint64]audittypes.EpochReport{ + "peer-reporter": { + 5: {StorageProofResults: []*audittypes.StorageProofResult{ + resFrom("peer-reporter", "against-self", "self", "h-self", audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_HASH_MISMATCH), + resFrom("peer-reporter", "against-peer", "target-peer", "h-peer", audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_HASH_MISMATCH), + }}, + }, + }} + f := NewFinderWithReporters(a, store, "self", FinderConfig{LookbackEpochs: 1, MaxPerTick: 10}, NewStaticReporterSource("peer-reporter")) + got, err := f.Find(context.Background()) + require.NoError(t, err) + require.Len(t, got, 1) + require.Equal(t, "against-peer", got[0].TicketID) +} + +func TestFinder_SkipsSelfReportedCandidate(t *testing.T) { + store := newMemoryStore() + a := &stubAudit{current: 5, reportsBySource: map[string]map[uint64]audittypes.EpochReport{ + "self": { + 5: {StorageProofResults: []*audittypes.StorageProofResult{ + resFrom("self", "own-report", "target-peer", "h-own", audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_HASH_MISMATCH), + resFrom("peer-reporter", "peer-report", "target-peer", "h-peer", audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_HASH_MISMATCH), + }}, + }, + }} + f := NewFinderWithReporters(a, store, "self", FinderConfig{LookbackEpochs: 1, MaxPerTick: 10}, NewStaticReporterSource("self")) + got, err := f.Find(context.Background()) + require.NoError(t, err) + require.Len(t, got, 1) + require.Equal(t, "peer-report", got[0].TicketID) +} + +func TestService_TickModeGateAndSubmit(t *testing.T) { + ctx := context.Background() + store := newMemoryStore() + msg := &recordingAuditMsg{} + a := &stubAudit{current: 10, mode: audittypes.StorageTruthEnforcementMode_STORAGE_TRUTH_ENFORCEMENT_MODE_UNSPECIFIED, reports: map[uint64]audittypes.EpochReport{10: {StorageProofResults: []*audittypes.StorageProofResult{resFrom("peer", "t", "target", "h", audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_HASH_MISMATCH)}}}} + r := &stubRechecker{result: RecheckResult{TranscriptHash: "rh", ResultClass: audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_PASS}} + svc, err := NewService(Config{Enabled: true, TickInterval: time.Millisecond}, a, store, r, NewAttestor("self", msg, store), "self") + require.NoError(t, err) + require.NoError(t, svc.Tick(ctx)) + require.Empty(t, msg.calls) + + a.mode = audittypes.StorageTruthEnforcementMode_STORAGE_TRUTH_ENFORCEMENT_MODE_FULL + require.NoError(t, svc.Tick(ctx)) + require.Len(t, msg.calls, 1) + require.Equal(t, "target", msg.calls[0].target) +} + +func TestConfigDefaults(t *testing.T) { + got := (Config{}).WithDefaults() + require.Equal(t, DefaultLookbackEpochs, got.LookbackEpochs) + require.Equal(t, DefaultMaxPerTick, got.MaxPerTick) + require.Equal(t, DefaultTickInterval, got.TickInterval) +} + +func res(ticket, target, transcript string, class audittypes.StorageProofResultClass) *audittypes.StorageProofResult { + return resFrom("self", ticket, target, transcript, class) +} + +func resFrom(reporter, ticket, target, transcript string, class audittypes.StorageProofResultClass) *audittypes.StorageProofResult { + return &audittypes.StorageProofResult{TargetSupernodeAccount: target, ChallengerSupernodeAccount: reporter, TicketId: ticket, TranscriptHash: transcript, ResultClass: class} +} diff --git a/supernode/recheck/reporters.go b/supernode/recheck/reporters.go new file mode 100644 index 00000000..980c5af4 --- /dev/null +++ b/supernode/recheck/reporters.go @@ -0,0 +1,68 @@ +package recheck + +import ( + "context" + "fmt" + "sort" + "strings" + + supernodemodule "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/supernode" +) + +type staticReporterSource struct { + accounts []string +} + +func NewStaticReporterSource(accounts ...string) ReporterSource { + return staticReporterSource{accounts: accounts} +} + +func (s staticReporterSource) ReporterAccounts(ctx context.Context) ([]string, error) { + return normalizeAccounts(s.accounts), nil +} + +type SupernodeReporterSource struct { + module supernodemodule.Module + self string +} + +func NewSupernodeReporterSource(module supernodemodule.Module, self string) *SupernodeReporterSource { + return &SupernodeReporterSource{module: module, self: strings.TrimSpace(self)} +} + +func (s *SupernodeReporterSource) ReporterAccounts(ctx context.Context) ([]string, error) { + if s == nil || s.module == nil { + return nil, fmt.Errorf("recheck reporter source missing supernode module") + } + resp, err := s.module.ListSuperNodes(ctx) + if err != nil { + return nil, fmt.Errorf("list supernodes: %w", err) + } + accounts := []string{s.self} + if resp != nil { + for _, sn := range resp.Supernodes { + if sn != nil { + accounts = append(accounts, sn.SupernodeAccount) + } + } + } + return normalizeAccounts(accounts), nil +} + +func normalizeAccounts(accounts []string) []string { + seen := map[string]struct{}{} + out := make([]string, 0, len(accounts)) + for _, account := range accounts { + account = strings.TrimSpace(account) + if account == "" { + continue + } + if _, ok := seen[account]; ok { + continue + } + seen[account] = struct{}{} + out = append(out, account) + } + sort.Strings(out) + return out +} diff --git a/supernode/recheck/service.go b/supernode/recheck/service.go new file mode 100644 index 00000000..6b0deefa --- /dev/null +++ b/supernode/recheck/service.go @@ -0,0 +1,115 @@ +package recheck + +import ( + "context" + "fmt" + "time" + + audittypes "github.com/LumeraProtocol/lumera/x/audit/v1/types" + "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" +) + +type Config struct { + Enabled bool + LookbackEpochs uint64 + MaxPerTick int + TickInterval time.Duration + Jitter time.Duration +} + +func (c Config) WithDefaults() Config { + if c.LookbackEpochs == 0 { + c.LookbackEpochs = DefaultLookbackEpochs + } + if c.MaxPerTick <= 0 { + c.MaxPerTick = DefaultMaxPerTick + } + if c.TickInterval <= 0 { + c.TickInterval = DefaultTickInterval + } + if c.Jitter < 0 { + c.Jitter = 0 + } + return c +} + +type Service struct { + cfg Config + audit AuditReader + finder *Finder + rechecker Rechecker + attestor *Attestor +} + +func NewService(cfg Config, audit AuditReader, store Store, rechecker Rechecker, attestor *Attestor, self string) (*Service, error) { + return NewServiceWithReporters(cfg, audit, store, rechecker, attestor, self, NewStaticReporterSource(self)) +} + +func NewServiceWithReporters(cfg Config, audit AuditReader, store Store, rechecker Rechecker, attestor *Attestor, self string, reporters ReporterSource) (*Service, error) { + cfg = cfg.WithDefaults() + if audit == nil || store == nil || attestor == nil || rechecker == nil || reporters == nil { + return nil, fmt.Errorf("recheck service missing deps") + } + finder := NewFinderWithReporters(audit, store, self, FinderConfig{LookbackEpochs: cfg.LookbackEpochs, MaxPerTick: cfg.MaxPerTick}, reporters) + return &Service{cfg: cfg, audit: audit, finder: finder, rechecker: rechecker, attestor: attestor}, nil +} + +func (s *Service) Run(ctx context.Context) error { + if !s.cfg.Enabled { + <-ctx.Done() + return nil + } + if s.cfg.Jitter > 0 { + select { + case <-time.After(s.cfg.Jitter): + case <-ctx.Done(): + return nil + } + } + if err := s.Tick(ctx); err != nil { + logtrace.Warn(ctx, "lep6 recheck: tick failed", logtrace.Fields{"error": err.Error()}) + } + t := time.NewTicker(s.cfg.TickInterval) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return nil + case <-t.C: + if err := s.Tick(ctx); err != nil { + logtrace.Warn(ctx, "lep6 recheck: tick failed", logtrace.Fields{"error": err.Error()}) + } + } + } +} + +func (s *Service) Tick(ctx context.Context) error { + if !s.cfg.Enabled { + return nil + } + params, err := s.audit.GetParams(ctx) + if err != nil { + return fmt.Errorf("params: %w", err) + } + if params == nil || params.Params.StorageTruthEnforcementMode == audittypes.StorageTruthEnforcementMode_STORAGE_TRUTH_ENFORCEMENT_MODE_UNSPECIFIED { + return nil + } + candidates, err := s.finder.Find(ctx) + if err != nil { + return err + } + for _, c := range candidates { + if err := ctx.Err(); err != nil { + return nil + } + result, err := s.rechecker.Recheck(ctx, c) + if err != nil { + logtrace.Warn(ctx, "lep6 recheck: execution failed", logtrace.Fields{"epoch_id": c.EpochID, "ticket_id": c.TicketID, "error": err.Error()}) + continue + } + if err := s.attestor.Submit(ctx, c, result); err != nil { + logtrace.Warn(ctx, "lep6 recheck: submit failed", logtrace.Fields{"epoch_id": c.EpochID, "ticket_id": c.TicketID, "error": err.Error()}) + } + } + return nil +} diff --git a/supernode/recheck/test_helpers_test.go b/supernode/recheck/test_helpers_test.go new file mode 100644 index 00000000..2df95676 --- /dev/null +++ b/supernode/recheck/test_helpers_test.go @@ -0,0 +1,96 @@ +package recheck + +import ( + "context" + "errors" + "fmt" + + audittypes "github.com/LumeraProtocol/lumera/x/audit/v1/types" + sdktx "github.com/cosmos/cosmos-sdk/types/tx" +) + +var ( + errBoom = errors.New("boom") + errAlreadyOnChain = errors.New("invalid recheck evidence: recheck evidence already submitted for epoch 7 ticket \"ticket-1\" by \"self\"") +) + +var callSeq int + +type memoryStore struct { + seen map[string]bool + recordCallIndex int +} + +func newMemoryStore() *memoryStore { return &memoryStore{seen: map[string]bool{}} } +func (m *memoryStore) HasRecheckSubmission(_ context.Context, epochID uint64, ticketID string) (bool, error) { + return m.seen[key(epochID, ticketID)], nil +} +func (m *memoryStore) RecordRecheckSubmission(_ context.Context, epochID uint64, ticketID, targetAccount, challengedTranscriptHash, recheckTranscriptHash string, resultClass audittypes.StorageProofResultClass) error { + callSeq++ + m.recordCallIndex = callSeq + m.seen[key(epochID, ticketID)] = true + return nil +} +func key(epochID uint64, ticketID string) string { return fmt.Sprintf("%d/%s", epochID, ticketID) } + +type recordingAuditMsg struct { + calls []submitCall + err error +} +type submitCall struct { + callIndex int + epochID uint64 + target, ticket, challenged, recheck string + class audittypes.StorageProofResultClass + details string +} + +func (m *recordingAuditMsg) SubmitStorageRecheckEvidence(ctx context.Context, epochID uint64, challengedSupernodeAccount, ticketID, challengedResultTranscriptHash, recheckTranscriptHash string, recheckResultClass audittypes.StorageProofResultClass, details string) (*sdktx.BroadcastTxResponse, error) { + callSeq++ + m.calls = append(m.calls, submitCall{callIndex: callSeq, epochID: epochID, target: challengedSupernodeAccount, ticket: ticketID, challenged: challengedResultTranscriptHash, recheck: recheckTranscriptHash, class: recheckResultClass, details: details}) + if m.err != nil { + return nil, m.err + } + return &sdktx.BroadcastTxResponse{}, nil +} + +type stubAudit struct { + current uint64 + reports map[uint64]audittypes.EpochReport + reportsBySource map[string]map[uint64]audittypes.EpochReport + mode audittypes.StorageTruthEnforcementMode +} + +func (s *stubAudit) GetCurrentEpoch(ctx context.Context) (*audittypes.QueryCurrentEpochResponse, error) { + return &audittypes.QueryCurrentEpochResponse{EpochId: s.current}, nil +} +func (s *stubAudit) GetEpochReport(ctx context.Context, epochID uint64, supernodeAccount string) (*audittypes.QueryEpochReportResponse, error) { + return &audittypes.QueryEpochReportResponse{Report: s.reports[epochID]}, nil +} +func (s *stubAudit) GetEpochReportsByReporter(ctx context.Context, reporterAccount string, epochID uint64) (*audittypes.QueryEpochReportsByReporterResponse, error) { + if s.reportsBySource != nil { + if byEpoch, ok := s.reportsBySource[reporterAccount]; ok { + if report, ok := byEpoch[epochID]; ok { + return &audittypes.QueryEpochReportsByReporterResponse{Reports: []audittypes.EpochReport{report}}, nil + } + } + } + if reporterAccount == "self" { + return &audittypes.QueryEpochReportsByReporterResponse{Reports: []audittypes.EpochReport{s.reports[epochID]}}, nil + } + return &audittypes.QueryEpochReportsByReporterResponse{}, nil +} +func (s *stubAudit) GetParams(ctx context.Context) (*audittypes.QueryParamsResponse, error) { + return &audittypes.QueryParamsResponse{Params: audittypes.Params{StorageTruthEnforcementMode: s.mode}}, nil +} + +type stubRechecker struct { + result RecheckResult + calls []Candidate + err error +} + +func (s *stubRechecker) Recheck(ctx context.Context, c Candidate) (RecheckResult, error) { + s.calls = append(s.calls, c) + return s.result, s.err +} diff --git a/supernode/recheck/types.go b/supernode/recheck/types.go new file mode 100644 index 00000000..b0d6888e --- /dev/null +++ b/supernode/recheck/types.go @@ -0,0 +1,93 @@ +package recheck + +import ( + "context" + "strings" + "time" + + audittypes "github.com/LumeraProtocol/lumera/x/audit/v1/types" +) + +const ( + DefaultLookbackEpochs = uint64(7) + DefaultMaxPerTick = 5 + DefaultTickInterval = time.Minute +) + +type Outcome int + +const ( + OutcomePass Outcome = iota + OutcomeConfirmedHashMismatch + OutcomeTimeout + OutcomeObserverQuorumFail + OutcomeInvalidTranscript +) + +type Candidate struct { + EpochID uint64 + TargetAccount string + TicketID string + ChallengedTranscriptHash string + OriginalReporter string + OriginalResultClass audittypes.StorageProofResultClass +} + +type RecheckResult struct { + TranscriptHash string + ResultClass audittypes.StorageProofResultClass + Details string +} + +type Store interface { + HasRecheckSubmission(ctx context.Context, epochID uint64, ticketID string) (bool, error) + RecordRecheckSubmission(ctx context.Context, epochID uint64, ticketID, targetAccount, challengedTranscriptHash, recheckTranscriptHash string, resultClass audittypes.StorageProofResultClass) error +} + +type AuditReader interface { + GetCurrentEpoch(ctx context.Context) (*audittypes.QueryCurrentEpochResponse, error) + GetEpochReport(ctx context.Context, epochID uint64, supernodeAccount string) (*audittypes.QueryEpochReportResponse, error) + GetEpochReportsByReporter(ctx context.Context, reporterAccount string, epochID uint64) (*audittypes.QueryEpochReportsByReporterResponse, error) + GetParams(ctx context.Context) (*audittypes.QueryParamsResponse, error) +} + +type ReporterSource interface { + ReporterAccounts(ctx context.Context) ([]string, error) +} + +type Rechecker interface { + Recheck(ctx context.Context, candidate Candidate) (RecheckResult, error) +} + +func IsRecheckEligibleResultClass(cls audittypes.StorageProofResultClass) bool { + switch cls { + case audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_HASH_MISMATCH, + audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_TIMEOUT_OR_NO_RESPONSE, + audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_OBSERVER_QUORUM_FAIL, + audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_INVALID_TRANSCRIPT: + return true + default: + return false + } +} + +func MapRecheckOutcome(outcome Outcome) audittypes.StorageProofResultClass { + switch outcome { + case OutcomePass: + return audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_PASS + case OutcomeConfirmedHashMismatch: + return audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_RECHECK_CONFIRMED_FAIL + case OutcomeTimeout: + return audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_TIMEOUT_OR_NO_RESPONSE + case OutcomeObserverQuorumFail: + return audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_OBSERVER_QUORUM_FAIL + case OutcomeInvalidTranscript: + return audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_INVALID_TRANSCRIPT + default: + return audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_INVALID_TRANSCRIPT + } +} + +func (c Candidate) Valid() bool { + return c.EpochID > 0 && strings.TrimSpace(c.TargetAccount) != "" && strings.TrimSpace(c.TicketID) != "" && strings.TrimSpace(c.ChallengedTranscriptHash) != "" && strings.TrimSpace(c.OriginalReporter) != "" && IsRecheckEligibleResultClass(c.OriginalResultClass) +} diff --git a/supernode/self_healing/mocks_test.go b/supernode/self_healing/mocks_test.go index 25814b70..ec0f5473 100644 --- a/supernode/self_healing/mocks_test.go +++ b/supernode/self_healing/mocks_test.go @@ -91,6 +91,9 @@ func (p *programmableAudit) GetAssignedTargets(ctx context.Context, supernodeAcc func (p *programmableAudit) GetEpochReport(ctx context.Context, epochID uint64, supernodeAccount string) (*audittypes.QueryEpochReportResponse, error) { return &audittypes.QueryEpochReportResponse{}, nil } +func (p *programmableAudit) GetEpochReportsByReporter(ctx context.Context, reporterAccount string, epochID uint64) (*audittypes.QueryEpochReportsByReporterResponse, error) { + return &audittypes.QueryEpochReportsByReporterResponse{}, nil +} func (p *programmableAudit) GetNodeSuspicionState(ctx context.Context, supernodeAccount string) (*audittypes.QueryNodeSuspicionStateResponse, error) { return &audittypes.QueryNodeSuspicionStateResponse{}, nil } diff --git a/supernode/storage_challenge/lep6_dispatch.go b/supernode/storage_challenge/lep6_dispatch.go index 2613fa19..69f7a800 100644 --- a/supernode/storage_challenge/lep6_dispatch.go +++ b/supernode/storage_challenge/lep6_dispatch.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "strings" + "sync" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -106,6 +107,7 @@ type LEP6Dispatcher struct { tickets TicketProvider meta CascadeMetaProvider buffer *Buffer + mu sync.Mutex } // NewLEP6Dispatcher constructs a dispatcher. supernodeClient, tickets, @@ -167,9 +169,12 @@ func NewLEP6Dispatcher( // rather than returning an error. func (d *LEP6Dispatcher) DispatchEpoch(ctx context.Context, epochID uint64) error { paramsResp, err := d.client.Audit().GetParams(ctx) - if err != nil || paramsResp == nil { + if err != nil { return fmt.Errorf("lep6 dispatch: get params: %w", err) } + if paramsResp == nil { + return fmt.Errorf("lep6 dispatch: get params returned nil response") + } params := paramsResp.Params mode := params.StorageTruthEnforcementMode @@ -218,6 +223,9 @@ func (d *LEP6Dispatcher) DispatchEpoch(ctx context.Context, epochID uint64) erro "targets": len(targets), }) + d.mu.Lock() + defer d.mu.Unlock() + for _, target := range targets { target = strings.TrimSpace(target) if target == "" || target == d.self { diff --git a/supernode/storage_challenge/lep6_dispatch_test.go b/supernode/storage_challenge/lep6_dispatch_test.go index 20285335..5c255a8d 100644 --- a/supernode/storage_challenge/lep6_dispatch_test.go +++ b/supernode/storage_challenge/lep6_dispatch_test.go @@ -14,6 +14,7 @@ import ( auditmod "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/audit" nodemod "github.com/LumeraProtocol/supernode/v2/pkg/lumera/modules/node" "github.com/LumeraProtocol/supernode/v2/pkg/storagechallenge/deterministic" + "github.com/LumeraProtocol/supernode/v2/supernode/recheck" "github.com/cosmos/cosmos-sdk/codec" codectypes "github.com/cosmos/cosmos-sdk/codec/types" cryptocodec "github.com/cosmos/cosmos-sdk/crypto/codec" @@ -31,14 +32,18 @@ import ( // dispatchAuditModule is an in-test stub of audit.Module used to drive // LEP6Dispatcher per-test; mirrors the host_reporter test pattern. type dispatchAuditModule struct { - params *audittypes.QueryParamsResponse - anchor *audittypes.QueryEpochAnchorResponse - assigned *audittypes.QueryAssignedTargetsResponse + params *audittypes.QueryParamsResponse + anchor *audittypes.QueryEpochAnchorResponse + assigned *audittypes.QueryAssignedTargetsResponse + getParamsHook func() } var _ auditmod.Module = (*dispatchAuditModule)(nil) func (s *dispatchAuditModule) GetParams(ctx context.Context) (*audittypes.QueryParamsResponse, error) { + if s.getParamsHook != nil { + s.getParamsHook() + } return s.params, nil } func (s *dispatchAuditModule) GetEpochAnchor(ctx context.Context, epochID uint64) (*audittypes.QueryEpochAnchorResponse, error) { @@ -56,6 +61,9 @@ func (s *dispatchAuditModule) GetAssignedTargets(ctx context.Context, supernodeA func (s *dispatchAuditModule) GetEpochReport(ctx context.Context, epochID uint64, supernodeAccount string) (*audittypes.QueryEpochReportResponse, error) { return &audittypes.QueryEpochReportResponse{}, nil } +func (s *dispatchAuditModule) GetEpochReportsByReporter(ctx context.Context, reporterAccount string, epochID uint64) (*audittypes.QueryEpochReportsByReporterResponse, error) { + return &audittypes.QueryEpochReportsByReporterResponse{}, nil +} func (s *dispatchAuditModule) GetNodeSuspicionState(ctx context.Context, supernodeAccount string) (*audittypes.QueryNodeSuspicionStateResponse, error) { return &audittypes.QueryNodeSuspicionStateResponse{}, nil } @@ -212,6 +220,36 @@ func TestDispatchEpoch_ModeUnspecified_NoOp(t *testing.T) { require.Empty(t, buf.CollectResults(7), "buffer must be empty under UNSPECIFIED mode") } +func TestDispatchEpoch_GetParamsDoesNotHoldDispatcherLock(t *testing.T) { + var dispatcher *LEP6Dispatcher + var sawUnlocked bool + audit := &dispatchAuditModule{ + params: &audittypes.QueryParamsResponse{ + Params: defaultParams(audittypes.StorageTruthEnforcementMode_STORAGE_TRUTH_ENFORCEMENT_MODE_UNSPECIFIED), + }, + } + audit.getParamsHook = func() { + if dispatcher.mu.TryLock() { + sawUnlocked = true + dispatcher.mu.Unlock() + } + } + dispatcher, _ = newDispatcher(t, audit, &stubFactory{}, NoTicketProvider{}, stubMetaProvider{}) + + require.NoError(t, dispatcher.DispatchEpoch(context.Background(), 7)) + require.True(t, sawUnlocked, "DispatchEpoch must not hold dispatcher mutex while querying params") +} + +func TestDispatchEpoch_GetParamsNilResponseIsClear(t *testing.T) { + audit := &dispatchAuditModule{} + dispatcher, _ := newDispatcher(t, audit, &stubFactory{}, NoTicketProvider{}, stubMetaProvider{}) + + err := dispatcher.DispatchEpoch(context.Background(), 7) + require.Error(t, err) + require.Contains(t, err.Error(), "lep6 dispatch: get params returned nil response") + require.NotContains(t, err.Error(), "") +} + func TestDispatchEpoch_ModeShadow_AppendsResults(t *testing.T) { const epochID uint64 = 11 anchor := makeAnchor(epochID, 500, "sn-target") @@ -427,3 +465,31 @@ func TestDispatchEpoch_HappyPath_EmitsPassResult(t *testing.T) { } require.True(t, sawPass, "expected a PASS-class result on happy path") } + +func TestRecheck_GetParamsNilResponseIsClearAndDoesNotHoldDispatcherLock(t *testing.T) { + var dispatcher *LEP6Dispatcher + var sawUnlocked bool + audit := &dispatchAuditModule{} + audit.getParamsHook = func() { + if dispatcher.mu.TryLock() { + sawUnlocked = true + dispatcher.mu.Unlock() + } + } + dispatcher, _ = newDispatcher(t, audit, &stubFactory{}, NoTicketProvider{}, stubMetaProvider{}) + + candidate := recheck.Candidate{ + EpochID: 7, + TargetAccount: "sn-target", + TicketID: "ticket-1", + ChallengedTranscriptHash: "original-transcript", + OriginalReporter: "sn-reporter", + OriginalResultClass: audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_HASH_MISMATCH, + } + + _, err := dispatcher.Recheck(context.Background(), candidate) + require.Error(t, err) + require.Contains(t, err.Error(), "lep6 recheck: get params returned nil response") + require.NotContains(t, err.Error(), "") + require.True(t, sawUnlocked, "Recheck must not hold dispatcher mutex while querying params") +} diff --git a/supernode/storage_challenge/lep6_recheck.go b/supernode/storage_challenge/lep6_recheck.go new file mode 100644 index 00000000..4c188537 --- /dev/null +++ b/supernode/storage_challenge/lep6_recheck.go @@ -0,0 +1,65 @@ +package storage_challenge + +import ( + "context" + "fmt" + + audittypes "github.com/LumeraProtocol/lumera/x/audit/v1/types" + "github.com/LumeraProtocol/supernode/v2/supernode/recheck" +) + +// Recheck executes a LEP-6 RECHECK-bucket proof for the candidate and returns +// the result shape expected by MsgSubmitStorageRecheckEvidence. It reuses the +// same deterministic compound-proof machinery as the epoch dispatcher, but +// writes into a temporary buffer so recheck results are never mixed into the +// host_reporter epoch-report buffer. +func (d *LEP6Dispatcher) Recheck(ctx context.Context, c recheck.Candidate) (recheck.RecheckResult, error) { + if !c.Valid() { + return recheck.RecheckResult{}, fmt.Errorf("invalid recheck candidate") + } + paramsResp, err := d.client.Audit().GetParams(ctx) + if err != nil { + return recheck.RecheckResult{}, fmt.Errorf("lep6 recheck: get params: %w", err) + } + if paramsResp == nil { + return recheck.RecheckResult{}, fmt.Errorf("lep6 recheck: get params returned nil response") + } + params := paramsResp.Params + if params.StorageTruthEnforcementMode == audittypes.StorageTruthEnforcementMode_STORAGE_TRUTH_ENFORCEMENT_MODE_UNSPECIFIED { + return recheck.RecheckResult{}, fmt.Errorf("lep6 recheck: enforcement mode unspecified") + } + anchorResp, err := d.client.Audit().GetEpochAnchor(ctx, c.EpochID) + if err != nil { + return recheck.RecheckResult{}, fmt.Errorf("lep6 recheck: get epoch anchor %d: %w", c.EpochID, err) + } + if anchorResp == nil { + return recheck.RecheckResult{}, fmt.Errorf("lep6 recheck: epoch anchor not yet available for epoch %d", c.EpochID) + } + if anchorResp.Anchor.EpochId != c.EpochID { + return recheck.RecheckResult{}, fmt.Errorf("lep6 recheck: epoch anchor not yet available for epoch %d", c.EpochID) + } + + d.mu.Lock() + defer d.mu.Unlock() + + orig := d.buffer + tmp := NewBuffer() + d.buffer = tmp + defer func() { d.buffer = orig }() + + if err := d.dispatchTicket(ctx, c.EpochID, anchorResp.Anchor, params, c.TargetAccount, audittypes.StorageProofBucketType_STORAGE_PROOF_BUCKET_TYPE_RECHECK, c.TicketID); err != nil { + return recheck.RecheckResult{}, err + } + results := tmp.CollectResults(c.EpochID) + for _, r := range results { + if r == nil || r.TicketId != c.TicketID || r.TargetSupernodeAccount != c.TargetAccount { + continue + } + cls := r.ResultClass + if cls == audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_HASH_MISMATCH { + cls = audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_RECHECK_CONFIRMED_FAIL + } + return recheck.RecheckResult{TranscriptHash: r.TranscriptHash, ResultClass: cls, Details: r.Details}, nil + } + return recheck.RecheckResult{}, fmt.Errorf("lep6 recheck: no result emitted for epoch=%d ticket=%s target=%s", c.EpochID, c.TicketID, c.TargetAccount) +} diff --git a/supernode/transport/grpc/self_healing/handler_test.go b/supernode/transport/grpc/self_healing/handler_test.go index 12adf91d..26375b5a 100644 --- a/supernode/transport/grpc/self_healing/handler_test.go +++ b/supernode/transport/grpc/self_healing/handler_test.go @@ -254,6 +254,9 @@ func (h *handlerStubAudit) GetAssignedTargets(ctx context.Context, supernodeAcco func (h *handlerStubAudit) GetEpochReport(ctx context.Context, epochID uint64, supernodeAccount string) (*audittypes.QueryEpochReportResponse, error) { return &audittypes.QueryEpochReportResponse{}, nil } +func (h *handlerStubAudit) GetEpochReportsByReporter(ctx context.Context, reporterAccount string, epochID uint64) (*audittypes.QueryEpochReportsByReporterResponse, error) { + return &audittypes.QueryEpochReportsByReporterResponse{}, nil +} func (h *handlerStubAudit) GetNodeSuspicionState(ctx context.Context, supernodeAccount string) (*audittypes.QueryNodeSuspicionStateResponse, error) { return &audittypes.QueryNodeSuspicionStateResponse{}, nil }