diff --git a/go.mod b/go.mod index 37fedbd0..9db3b77b 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ replace ( ) require ( + cosmossdk.io/errors v1.0.2 cosmossdk.io/math v1.5.3 github.com/AlecAivazis/survey/v2 v2.3.7 github.com/DataDog/zstd v1.5.7 @@ -55,7 +56,6 @@ require ( cosmossdk.io/collections v1.3.1 // indirect cosmossdk.io/core v0.11.3 // indirect cosmossdk.io/depinject v1.2.1 // indirect - cosmossdk.io/errors v1.0.2 // indirect cosmossdk.io/log v1.6.1 // indirect cosmossdk.io/schema v1.1.0 // indirect cosmossdk.io/store v1.1.2 // indirect diff --git a/pkg/logtrace/log.go b/pkg/logtrace/log.go index 6e27b020..5263bf67 100644 --- a/pkg/logtrace/log.go +++ b/pkg/logtrace/log.go @@ -5,6 +5,7 @@ import ( "os" "runtime" "strings" + "sync/atomic" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -20,10 +21,14 @@ const CorrelationIDKey ContextKey = "correlation_id" const OriginKey ContextKey = "origin" var ( - logger *zap.Logger - minLevel zapcore.Level = zapcore.InfoLevel // effective minimum log level + loggerPtr atomic.Pointer[zap.Logger] + minLevel atomic.Int32 // effective minimum log level as zapcore.Level ) +func init() { + minLevel.Store(int32(zapcore.InfoLevel)) +} + // Setup initializes the logger for readable output in all modes. func Setup(serviceName string) { var err error @@ -42,20 +47,24 @@ func Setup(serviceName string) { // Always respect the LOG_LEVEL environment variable. lvl := getLogLevel() config.Level = zap.NewAtomicLevelAt(lvl) - // Persist the effective minimum so non-core sinks (e.g., Datadog) can - // filter entries consistently with the console logger. - minLevel = lvl - // Build the logger from the customized config. + var built *zap.Logger if tracingEnabled { - logger, err = config.Build(zap.AddCallerSkip(1), zap.AddStacktrace(zapcore.ErrorLevel)) + built, err = config.Build(zap.AddCallerSkip(1), zap.AddStacktrace(zapcore.ErrorLevel)) } else { - logger, err = config.Build() + built, err = config.Build() } if err != nil { panic(err) } + // Publish atomically so concurrent Setup/log calls cannot race on package + // globals. The effective minimum is stored after the logger so a racing log + // call always sees either the old complete pair or a conservative new logger + // with the previous Datadog gate for one call. + loggerPtr.Store(built) + minLevel.Store(int32(lvl)) + // Initialize Datadog forwarding (minimal integration in separate file) SetupDatadog(serviceName) } @@ -120,12 +129,17 @@ func extractCorrelationID(ctx context.Context) string { // logWithLevel logs a message with structured fields. func logWithLevel(level zapcore.Level, ctx context.Context, message string, fields Fields) { - if logger == nil { + lg := loggerPtr.Load() + if lg == nil { Setup("unknown-service") // Fallback if Setup wasn't called + lg = loggerPtr.Load() + if lg == nil { + return + } } // Drop early if below the configured level (keeps Datadog in sync) - if !logger.Core().Enabled(level) { + if !lg.Core().Enabled(level) { return } @@ -149,7 +163,7 @@ func logWithLevel(level zapcore.Level, ctx context.Context, message string, fiel } // Log with the structured fields using a level check/write - if ce := logger.Check(level, message); ce != nil { + if ce := lg.Check(level, message); ce != nil { ce.Write(zapFields...) } else { // Should not happen due to early Enabled check, but guard anyway @@ -159,7 +173,7 @@ func logWithLevel(level zapcore.Level, ctx context.Context, message string, fiel // Forward to Datadog (non-blocking, best-effort) only if level is enabled // for the current configuration. This prevents forwarding debug entries // when the logger is configured for info and above. - if level >= minLevel { + if int32(level) >= minLevel.Load() { ForwardDatadog(level, ctx, message, fields) } } diff --git a/pkg/logtrace/race_test.go b/pkg/logtrace/race_test.go new file mode 100644 index 00000000..75f86775 --- /dev/null +++ b/pkg/logtrace/race_test.go @@ -0,0 +1,27 @@ +//go:build race + +package logtrace + +import ( + "context" + "sync" + "testing" +) + +func TestSetupConcurrentWithLoggingRaceFree(t *testing.T) { + ctx := context.Background() + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + if i%10 == 0 { + Setup("race-test") + } + Debug(ctx, "debug", Fields{"i": i}) + Info(ctx, "info", Fields{"i": i}) + Warn(ctx, "warn", Fields{"i": i}) + }(i) + } + wg.Wait() +} diff --git a/pkg/lumera/chainerrors/chainerrors.go b/pkg/lumera/chainerrors/chainerrors.go index 37e5fa23..3457130e 100644 --- a/pkg/lumera/chainerrors/chainerrors.go +++ b/pkg/lumera/chainerrors/chainerrors.go @@ -16,12 +16,12 @@ // // The predicates here: // -// 1. Prefer typed sentinel matching via errors.Is. -// 2. Fall through to gRPC status codes for query-side rejections. -// 3. Keep an English-substring fallback so we remain correct against any -// currently-deployed chain build whose error path doesn't preserve the -// typed sentinel through the wire (defense-in-depth, removable once -// every chain build in production guarantees end-to-end ABCIError). +// 1. Prefer typed sentinel matching via errors.Is. +// 2. Fall through to gRPC status codes for query-side rejections. +// 3. Keep an English-substring fallback so we remain correct against any +// currently-deployed chain build whose error path doesn't preserve the +// typed sentinel through the wire (defense-in-depth, removable once +// every chain build in production guarantees end-to-end ABCIError). // // IsTransientGrpc is the safety valve: any path that classifies an error as // "definitely a chain-side reject" (and would therefore destructively clean @@ -69,6 +69,23 @@ func IsHealOpInvalidState(err error) bool { // matched any error containing "not found" (gRPC "block N not found", codec // lookup miss, key-not-found inside Cosmos SDK), which led to destructive // cleanup on transient query failures. + +// IsHealOpPastDeadline reports whether err is the chain-side invalid-state +// rejection for a heal-op whose deadline has already passed. As of Lumera +// chain x/audit/v1/types/errors.go there is no dedicated past-deadline +// sentinel; the tx path uses ErrHealOpInvalidState for several heal-op +// rejections. Keep this predicate phrase-anchored so callers can short-circuit +// deadline rejects without treating every invalid-state error as expired. +func IsHealOpPastDeadline(err error) bool { + if err == nil { + return false + } + msg := strings.ToLower(err.Error()) + return errors.Is(err, audittypes.ErrHealOpInvalidState) && + strings.Contains(msg, "heal op") && + strings.Contains(msg, "deadline") +} + func IsHealOpNotFound(err error) bool { if err == nil { return false diff --git a/pkg/lumera/chainerrors/chainerrors_test.go b/pkg/lumera/chainerrors/chainerrors_test.go index 75e06559..c1d9ea16 100644 --- a/pkg/lumera/chainerrors/chainerrors_test.go +++ b/pkg/lumera/chainerrors/chainerrors_test.go @@ -6,8 +6,8 @@ import ( "fmt" "testing" - audittypes "github.com/LumeraProtocol/lumera/x/audit/v1/types" errorsmod "cosmossdk.io/errors" + audittypes "github.com/LumeraProtocol/lumera/x/audit/v1/types" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -167,3 +167,14 @@ func TestRegression_TransientNotFoundDoesNotMatchHealOpNotFound(t *testing.T) { } } } + +func TestIsHealOpPastDeadline(t *testing.T) { + deadlineErr := fmt.Errorf("submit claim: %w", errorsmod.Wrap(audittypes.ErrHealOpInvalidState, "heal op deadline has passed")) + if !IsHealOpPastDeadline(deadlineErr) { + t.Fatalf("expected deadline invalid-state error to match") + } + stateErr := fmt.Errorf("submit claim: %w", errorsmod.Wrap(audittypes.ErrHealOpInvalidState, "heal op status VERIFIED does not accept healer completion claim")) + if IsHealOpPastDeadline(stateErr) { + t.Fatalf("generic invalid-state error must not be treated as deadline") + } +} diff --git a/pkg/lumera/modules/audit/impl.go b/pkg/lumera/modules/audit/impl.go index eed2f4c7..9b54e603 100644 --- a/pkg/lumera/modules/audit/impl.go +++ b/pkg/lumera/modules/audit/impl.go @@ -87,36 +87,6 @@ func (m *module) GetEpochReportsByReporter(ctx context.Context, reporterAccount return resp, nil } -func (m *module) GetNodeSuspicionState(ctx context.Context, supernodeAccount string) (*types.QueryNodeSuspicionStateResponse, error) { - resp, err := m.client.NodeSuspicionState(ctx, &types.QueryNodeSuspicionStateRequest{ - SupernodeAccount: supernodeAccount, - }) - if err != nil { - return nil, fmt.Errorf("failed to get node suspicion state: %w", err) - } - return resp, nil -} - -func (m *module) GetReporterReliabilityState(ctx context.Context, reporterAccount string) (*types.QueryReporterReliabilityStateResponse, error) { - resp, err := m.client.ReporterReliabilityState(ctx, &types.QueryReporterReliabilityStateRequest{ - ReporterSupernodeAccount: reporterAccount, - }) - if err != nil { - return nil, fmt.Errorf("failed to get reporter reliability state: %w", err) - } - return resp, nil -} - -func (m *module) GetTicketDeteriorationState(ctx context.Context, ticketID string) (*types.QueryTicketDeteriorationStateResponse, error) { - resp, err := m.client.TicketDeteriorationState(ctx, &types.QueryTicketDeteriorationStateRequest{ - TicketId: ticketID, - }) - if err != nil { - return nil, fmt.Errorf("failed to get ticket deterioration state: %w", err) - } - return resp, nil -} - func (m *module) GetHealOp(ctx context.Context, healOpID uint64) (*types.QueryHealOpResponse, error) { resp, err := m.client.HealOp(ctx, &types.QueryHealOpRequest{ HealOpId: healOpID, diff --git a/pkg/lumera/modules/audit/interface.go b/pkg/lumera/modules/audit/interface.go index 074512d3..48297164 100644 --- a/pkg/lumera/modules/audit/interface.go +++ b/pkg/lumera/modules/audit/interface.go @@ -18,11 +18,6 @@ type Module interface { GetEpochReport(ctx context.Context, epochID uint64, supernodeAccount string) (*types.QueryEpochReportResponse, error) GetEpochReportsByReporter(ctx context.Context, reporterAccount string, epochID uint64) (*types.QueryEpochReportsByReporterResponse, error) - // LEP-6 storage-truth state queries. - GetNodeSuspicionState(ctx context.Context, supernodeAccount string) (*types.QueryNodeSuspicionStateResponse, error) - GetReporterReliabilityState(ctx context.Context, reporterAccount string) (*types.QueryReporterReliabilityStateResponse, error) - GetTicketDeteriorationState(ctx context.Context, ticketID string) (*types.QueryTicketDeteriorationStateResponse, error) - // LEP-6 heal-op queries. GetHealOp(ctx context.Context, healOpID uint64) (*types.QueryHealOpResponse, error) GetHealOpsByStatus(ctx context.Context, status types.HealOpStatus, pagination *query.PageRequest) (*types.QueryHealOpsByStatusResponse, error) diff --git a/pkg/lumera/modules/audit_msg/impl_test.go b/pkg/lumera/modules/audit_msg/impl_test.go index 07e9b020..0daab82c 100644 --- a/pkg/lumera/modules/audit_msg/impl_test.go +++ b/pkg/lumera/modules/audit_msg/impl_test.go @@ -44,3 +44,12 @@ func TestSubmitStorageRecheckEvidenceValidatesInputsBeforeTxExecution(t *testing _, err = m.SubmitStorageRecheckEvidence(context.Background(), 7, "target", "ticket", "challenged", strings.Repeat(" ", 3), audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_RECHECK_CONFIRMED_FAIL, "") require.ErrorContains(t, err, "recheck transcript hash cannot be empty") } + +func TestSubmitEvidenceValidatesInputsBeforeTxExecution(t *testing.T) { + m := &module{} + _, err := m.SubmitEvidence(context.Background(), " ", audittypes.EvidenceType_EVIDENCE_TYPE_UNSPECIFIED, "action", "{}") + require.ErrorContains(t, err, "subject address cannot be empty") + + _, err = m.SubmitEvidence(context.Background(), "subject", audittypes.EvidenceType_EVIDENCE_TYPE_UNSPECIFIED, "action", " ") + require.ErrorContains(t, err, "metadata cannot be empty") +} diff --git a/pkg/metrics/lep6/metrics.go b/pkg/metrics/lep6/metrics.go index c505f799..3bf2b00c 100644 --- a/pkg/metrics/lep6/metrics.go +++ b/pkg/metrics/lep6/metrics.go @@ -36,6 +36,7 @@ type MetricsSnapshot struct { HealVerificationsAlreadyExistsTotal uint64 HealFinalizePublishesTotal uint64 HealFinalizeCleanupsTotal map[string]uint64 // status + HealOrphanedStagingCleanupsTotal uint64 SelfHealingPendingClaims int64 SelfHealingStagingBytes int64 @@ -135,6 +136,7 @@ var metrics = struct { healVerificationsAlreadyExist atomic.Uint64 healFinalizePublishes atomic.Uint64 healFinalizeCleanups counterMap + healOrphanedStagingCleanups atomic.Uint64 selfHealingPendingClaims atomic.Int64 selfHealingStagingBytes atomic.Int64 @@ -162,6 +164,7 @@ func Reset() { metrics.healVerificationsAlreadyExist.Store(0) metrics.healFinalizePublishes.Store(0) metrics.healFinalizeCleanups.reset() + metrics.healOrphanedStagingCleanups.Store(0) metrics.selfHealingPendingClaims.Store(0) metrics.selfHealingStagingBytes.Store(0) metrics.recheckCandidatesFound.Store(0) @@ -189,6 +192,7 @@ func Snapshot() MetricsSnapshot { HealVerificationsAlreadyExistsTotal: metrics.healVerificationsAlreadyExist.Load(), HealFinalizePublishesTotal: metrics.healFinalizePublishes.Load(), HealFinalizeCleanupsTotal: metrics.healFinalizeCleanups.snapshot(), + HealOrphanedStagingCleanupsTotal: metrics.healOrphanedStagingCleanups.Load(), SelfHealingPendingClaims: metrics.selfHealingPendingClaims.Load(), SelfHealingStagingBytes: metrics.selfHealingStagingBytes.Load(), RecheckCandidatesFoundTotal: metrics.recheckCandidatesFound.Load(), @@ -199,8 +203,8 @@ func Snapshot() MetricsSnapshot { } } -func IncDispatchResult(resultClass string) { metrics.dispatchResults.inc(resultClass, 1) } -func IncDispatchSignFailure(context string) { metrics.dispatchSignFailures.inc(context, 1) } +func IncDispatchResult(resultClass string) { metrics.dispatchResults.inc(resultClass, 1) } +func IncDispatchSignFailure(context string) { metrics.dispatchSignFailures.inc(context, 1) } func IncDispatchInternalFailure(stage string) { metrics.dispatchInternalFailures.inc(stage, 1) } func IncDispatchThrottled(policy string, dropped int) { if dropped > 0 { @@ -237,6 +241,7 @@ func IncHealVerification(outcome string, verified bool) { func IncHealVerificationAlreadyExists() { metrics.healVerificationsAlreadyExist.Add(1) } func IncHealFinalizePublish() { metrics.healFinalizePublishes.Add(1) } func IncHealFinalizeCleanup(status string) { metrics.healFinalizeCleanups.inc(status, 1) } +func IncHealOrphanedStagingCleanup() { metrics.healOrphanedStagingCleanups.Add(1) } func SetSelfHealingPendingClaims(count int) { metrics.selfHealingPendingClaims.Store(nonNegativeInt64(count)) } diff --git a/pkg/storage/queries/recheck.go b/pkg/storage/queries/recheck.go index 6cb12266..bfe4fafc 100644 --- a/pkg/storage/queries/recheck.go +++ b/pkg/storage/queries/recheck.go @@ -58,11 +58,82 @@ CREATE TABLE IF NOT EXISTS recheck_attempt_failures ( attempts INTEGER NOT NULL DEFAULT 1, last_error TEXT, expires_at INTEGER NOT NULL, - PRIMARY KEY (epoch_id, ticket_id) + PRIMARY KEY (epoch_id, ticket_id, target_account) );` const createRecheckAttemptFailuresExpiresIndex = `CREATE INDEX IF NOT EXISTS idx_recheck_attempt_failures_expires ON recheck_attempt_failures(expires_at);` +func migrateRecheckAttemptFailuresPK(ctx context.Context, db sqliteExecQuerier) error { + pkCols, err := primaryKeyColumns(ctx, db, "recheck_attempt_failures") + if err != nil { + return err + } + hasTarget := false + for _, c := range pkCols { + if c == "target_account" { + hasTarget = true + break + } + } + if hasTarget { + return nil + } + if len(pkCols) == 0 { + return fmt.Errorf("recheck_attempt_failures has no detectable primary key") + } + exec, ok := db.(interface { + BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) + }) + if !ok { + return fmt.Errorf("recheck_attempt_failures migration: db handle does not support BeginTx") + } + tx, err := exec.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("begin recheck failure migration tx: %w", err) + } + committed := false + defer func() { + if !committed { + _ = tx.Rollback() + } + }() + if _, err := tx.ExecContext(ctx, `DROP TABLE IF EXISTS recheck_attempt_failures_new;`); err != nil { + return fmt.Errorf("drop stale recheck failure migration table: %w", err) + } + const createNew = ` +CREATE TABLE recheck_attempt_failures_new ( + epoch_id INTEGER NOT NULL, + ticket_id TEXT NOT NULL, + target_account TEXT NOT NULL, + attempts INTEGER NOT NULL DEFAULT 1, + last_error TEXT, + expires_at INTEGER NOT NULL, + PRIMARY KEY (epoch_id, ticket_id, target_account) +);` + if _, err := tx.ExecContext(ctx, createNew); err != nil { + return fmt.Errorf("create new recheck failure table: %w", err) + } + const copyData = ` +INSERT INTO recheck_attempt_failures_new + (epoch_id, ticket_id, target_account, attempts, last_error, expires_at) +SELECT epoch_id, ticket_id, target_account, attempts, last_error, expires_at +FROM recheck_attempt_failures;` + if _, err := tx.ExecContext(ctx, copyData); err != nil { + return fmt.Errorf("copy recheck failure rows: %w", err) + } + if _, err := tx.ExecContext(ctx, `DROP TABLE recheck_attempt_failures;`); err != nil { + return fmt.Errorf("drop old recheck failure table: %w", err) + } + if _, err := tx.ExecContext(ctx, `ALTER TABLE recheck_attempt_failures_new RENAME TO recheck_attempt_failures;`); err != nil { + return fmt.Errorf("rename new recheck failure table: %w", err) + } + if err := tx.Commit(); err != nil { + return fmt.Errorf("commit recheck failure migration: %w", err) + } + committed = true + return nil +} + // migrateStorageRecheckSubmissionsPK migrates an old DB whose // storage_recheck_submissions table has PK (epoch_id, ticket_id) up to the // Wave 1 schema with PK (epoch_id, ticket_id, target_account). @@ -234,19 +305,19 @@ func (s *SQLiteStore) RecordRecheckAttemptFailure(ctx context.Context, epochID u expiresAt := time.Now().Add(ttl).Unix() const stmt = `INSERT INTO recheck_attempt_failures (epoch_id, ticket_id, target_account, attempts, last_error, expires_at) VALUES (?, ?, ?, 1, ?, ?) -ON CONFLICT(epoch_id, ticket_id) DO UPDATE SET attempts = attempts + 1, last_error = excluded.last_error, expires_at = excluded.expires_at` +ON CONFLICT(epoch_id, ticket_id, target_account) DO UPDATE SET attempts = attempts + 1, last_error = excluded.last_error, expires_at = excluded.expires_at` _, execErr := s.db.ExecContext(ctx, stmt, epochID, ticketID, targetAccount, msg, expiresAt) return execErr } -func (s *SQLiteStore) HasRecheckAttemptFailureBudgetExceeded(ctx context.Context, epochID uint64, ticketID string, maxAttempts int) (bool, error) { +func (s *SQLiteStore) HasRecheckAttemptFailureBudgetExceeded(ctx context.Context, epochID uint64, ticketID, targetAccount string, maxAttempts int) (bool, error) { if maxAttempts <= 0 { return false, nil } - const stmt = `SELECT attempts, expires_at FROM recheck_attempt_failures WHERE epoch_id = ? AND ticket_id = ? LIMIT 1` + const stmt = `SELECT attempts, expires_at FROM recheck_attempt_failures WHERE epoch_id = ? AND ticket_id = ? AND target_account = ? LIMIT 1` var attempts int var expiresAt int64 - err := s.db.QueryRowContext(ctx, stmt, epochID, ticketID).Scan(&attempts, &expiresAt) + err := s.db.QueryRowContext(ctx, stmt, epochID, ticketID, targetAccount).Scan(&attempts, &expiresAt) if err == sql.ErrNoRows { return false, nil } @@ -254,7 +325,7 @@ func (s *SQLiteStore) HasRecheckAttemptFailureBudgetExceeded(ctx context.Context return false, err } if expiresAt <= time.Now().Unix() { - _, _ = s.db.ExecContext(ctx, `DELETE FROM recheck_attempt_failures WHERE epoch_id = ? AND ticket_id = ?`, epochID, ticketID) + _, _ = s.db.ExecContext(ctx, `DELETE FROM recheck_attempt_failures WHERE epoch_id = ? AND ticket_id = ? AND target_account = ?`, epochID, ticketID, targetAccount) return false, nil } return attempts >= maxAttempts, nil diff --git a/pkg/storage/queries/recheck_interface.go b/pkg/storage/queries/recheck_interface.go index bdd71336..48b2bc49 100644 --- a/pkg/storage/queries/recheck_interface.go +++ b/pkg/storage/queries/recheck_interface.go @@ -14,6 +14,6 @@ type RecheckQueries interface { DeletePendingRecheckSubmission(ctx context.Context, epochID uint64, ticketID, targetAccount string) error RecordRecheckSubmission(ctx context.Context, epochID uint64, ticketID, targetAccount, challengedTranscriptHash, recheckTranscriptHash string, resultClass audittypes.StorageProofResultClass) error RecordRecheckAttemptFailure(ctx context.Context, epochID uint64, ticketID, targetAccount string, err error, ttl time.Duration) error - HasRecheckAttemptFailureBudgetExceeded(ctx context.Context, epochID uint64, ticketID string, maxAttempts int) (bool, error) + HasRecheckAttemptFailureBudgetExceeded(ctx context.Context, epochID uint64, ticketID, targetAccount string, maxAttempts int) (bool, error) PurgeExpiredRecheckAttemptFailures(ctx context.Context) error } diff --git a/pkg/storage/queries/recheck_test.go b/pkg/storage/queries/recheck_test.go index 3d19a3a5..da6e8724 100644 --- a/pkg/storage/queries/recheck_test.go +++ b/pkg/storage/queries/recheck_test.go @@ -92,15 +92,15 @@ func TestRecheckPendingSubmittedAndFailureBudget(t *testing.T) { require.True(t, has) require.NoError(t, store.MarkRecheckSubmissionSubmitted(ctx, 7, "ticket-7", "target")) - blocked, err := store.HasRecheckAttemptFailureBudgetExceeded(ctx, 7, "ticket-7", 2) + blocked, err := store.HasRecheckAttemptFailureBudgetExceeded(ctx, 7, "ticket-7", "target", 2) require.NoError(t, err) require.False(t, blocked) require.NoError(t, store.RecordRecheckAttemptFailure(ctx, 7, "ticket-7", "target", assert.AnError, time.Hour)) - blocked, err = store.HasRecheckAttemptFailureBudgetExceeded(ctx, 7, "ticket-7", 2) + blocked, err = store.HasRecheckAttemptFailureBudgetExceeded(ctx, 7, "ticket-7", "target", 2) require.NoError(t, err) require.False(t, blocked) require.NoError(t, store.RecordRecheckAttemptFailure(ctx, 7, "ticket-7", "target", assert.AnError, time.Hour)) - blocked, err = store.HasRecheckAttemptFailureBudgetExceeded(ctx, 7, "ticket-7", 2) + blocked, err = store.HasRecheckAttemptFailureBudgetExceeded(ctx, 7, "ticket-7", "target", 2) require.NoError(t, err) require.True(t, blocked) } diff --git a/pkg/storage/queries/sqlite.go b/pkg/storage/queries/sqlite.go index 5a9e3d51..f60a78e9 100644 --- a/pkg/storage/queries/sqlite.go +++ b/pkg/storage/queries/sqlite.go @@ -433,6 +433,9 @@ func OpenHistoryDBAt(baseDir string) (LocalStoreInterface, error) { if _, err := db.Exec(createRecheckAttemptFailures); err != nil { return nil, fmt.Errorf("cannot create recheck_attempt_failures: %w", err) } + if err := migrateRecheckAttemptFailuresPK(context.Background(), db); err != nil { + return nil, fmt.Errorf("migrate recheck_attempt_failures PK: %w", err) + } if _, err := db.Exec(createRecheckAttemptFailuresExpiresIndex); err != nil { return nil, fmt.Errorf("cannot create recheck_attempt_failures expires index: %w", err) } diff --git a/pkg/storage/queries/wave1_schema_test.go b/pkg/storage/queries/wave1_schema_test.go index c9c83079..ccbcd40b 100644 --- a/pkg/storage/queries/wave1_schema_test.go +++ b/pkg/storage/queries/wave1_schema_test.go @@ -2,7 +2,9 @@ package queries import ( "context" + "fmt" "testing" + "time" audittypes "github.com/LumeraProtocol/lumera/x/audit/v1/types" "github.com/jmoiron/sqlx" @@ -117,3 +119,38 @@ func TestMigrateStorageRecheckSubmissionsPK_AlreadyMigratedNoOp(t *testing.T) { require.NoError(t, err) require.Equal(t, []string{"epoch_id", "ticket_id", "target_account"}, pk) } + +func TestMigrateRecheckAttemptFailuresPK(t *testing.T) { + ctx := context.Background() + db, err := sqlx.Open("sqlite3", ":memory:") + require.NoError(t, err) + defer db.Close() + + _, err = db.Exec(` +CREATE TABLE recheck_attempt_failures ( + epoch_id INTEGER NOT NULL, + ticket_id TEXT NOT NULL, + target_account TEXT NOT NULL, + attempts INTEGER NOT NULL DEFAULT 1, + last_error TEXT, + expires_at INTEGER NOT NULL, + PRIMARY KEY (epoch_id, ticket_id) +);`) + require.NoError(t, err) + _, err = db.Exec(`INSERT INTO recheck_attempt_failures VALUES (7, 'ticket-1', 'target-a', 1, 'boom', 999999);`) + require.NoError(t, err) + + require.NoError(t, migrateRecheckAttemptFailuresPK(ctx, db)) + pk, err := primaryKeyColumns(ctx, db, "recheck_attempt_failures") + require.NoError(t, err) + require.Equal(t, []string{"epoch_id", "ticket_id", "target_account"}, pk) + + store := &SQLiteStore{db: db} + require.NoError(t, store.RecordRecheckAttemptFailure(ctx, 7, "ticket-1", "target-b", fmt.Errorf("nope"), time.Hour)) + blockedA, err := store.HasRecheckAttemptFailureBudgetExceeded(ctx, 7, "ticket-1", "target-a", 2) + require.NoError(t, err) + require.False(t, blockedA) + blockedB, err := store.HasRecheckAttemptFailureBudgetExceeded(ctx, 7, "ticket-1", "target-b", 2) + require.NoError(t, err) + require.False(t, blockedB) +} diff --git a/pkg/storagechallenge/lep6_resolution.go b/pkg/storagechallenge/lep6_resolution.go index ea410707..604cf552 100644 --- a/pkg/storagechallenge/lep6_resolution.go +++ b/pkg/storagechallenge/lep6_resolution.go @@ -23,7 +23,7 @@ import ( // pinned chain commit. The supernode result buffer must self-throttle to this // cap before handing results to the host reporter — see // supernode/storage_challenge/result_buffer.go. -const MaxStorageProofResultsPerReport = 16 +const MaxStorageProofResultsPerReport = audittypes.MaxStorageProofResultsPerReport // ErrUnspecifiedArtifactClass is returned when a caller passes the zero/UNSPECIFIED // StorageProofArtifactClass to a resolver that requires a concrete class. diff --git a/pkg/storagechallenge/lep6_resolution_test.go b/pkg/storagechallenge/lep6_resolution_test.go index 19ee7f38..29d77908 100644 --- a/pkg/storagechallenge/lep6_resolution_test.go +++ b/pkg/storagechallenge/lep6_resolution_test.go @@ -8,6 +8,12 @@ import ( audittypes "github.com/LumeraProtocol/lumera/x/audit/v1/types" ) +func TestMaxStorageProofResultsPerReportTracksChainConstant(t *testing.T) { + if MaxStorageProofResultsPerReport != audittypes.MaxStorageProofResultsPerReport { + t.Fatalf("MaxStorageProofResultsPerReport drifted from chain constant: got %d want %d", MaxStorageProofResultsPerReport, audittypes.MaxStorageProofResultsPerReport) + } +} + func TestResolveArtifactCount_Index_Symbol_Unspecified(t *testing.T) { meta := &actiontypes.CascadeMetadata{ RqIdsIc: 7, diff --git a/supernode/cascade/reseed.go b/supernode/cascade/reseed.go index 1b849cdf..9db451b6 100644 --- a/supernode/cascade/reseed.go +++ b/supernode/cascade/reseed.go @@ -60,11 +60,11 @@ type RecoveryReseedResult struct { type stagedManifest struct { ActionID string `json:"action_id"` Layout codec.Layout `json:"layout"` - IDFiles []string `json:"id_files"` // base64 of idFile bytes - SymbolKeys []string `json:"symbol_keys"` // ordered, deduped - SymbolsDir string `json:"symbols_dir"` // absolute path inside StagingDir/symbols - ReconstructedRel string `json:"reconstructed_rel"`// staging-dir-relative path of the reconstructed file - ManifestHashB64 string `json:"manifest_hash_b64"`// = action.DataHash recipe; HealManifestHash + IDFiles []string `json:"id_files"` // base64 of idFile bytes + SymbolKeys []string `json:"symbol_keys"` // ordered, deduped + SymbolsDir string `json:"symbols_dir"` // absolute path inside StagingDir/symbols + ReconstructedRel string `json:"reconstructed_rel"` // staging-dir-relative path of the reconstructed file + ManifestHashB64 string `json:"manifest_hash_b64"` // = action.DataHash recipe; HealManifestHash } const stagedManifestFilename = "manifest.json" @@ -369,6 +369,10 @@ func streamCopyFile(srcPath, dstPath string) error { _ = dst.Close() return fmt.Errorf("copy %q → %q: %w", srcPath, dstPath, err) } + if err := dst.Sync(); err != nil { + _ = dst.Close() + return fmt.Errorf("sync dst %q: %w", dstPath, err) + } if err := dst.Close(); err != nil { return fmt.Errorf("close dst %q: %w", dstPath, err) } diff --git a/supernode/cascade/reseed_wave3_test.go b/supernode/cascade/reseed_wave3_test.go new file mode 100644 index 00000000..889fa7ab --- /dev/null +++ b/supernode/cascade/reseed_wave3_test.go @@ -0,0 +1,26 @@ +package cascade + +import ( + "os" + "strings" + "testing" +) + +func TestStreamCopyFileSyncsBeforeClose(t *testing.T) { + src, err := os.ReadFile("reseed.go") + if err != nil { + t.Fatal(err) + } + body := string(src) + syncIdx := strings.Index(body, "dst.Sync()") + closeIdx := strings.LastIndex(body, "dst.Close()") + if syncIdx < 0 { + t.Fatalf("streamCopyFile must fsync destination before close") + } + if closeIdx < 0 { + t.Fatalf("streamCopyFile close call not found") + } + if syncIdx > closeIdx { + t.Fatalf("streamCopyFile must call dst.Sync before final dst.Close") + } +} diff --git a/supernode/cmd/start.go b/supernode/cmd/start.go index 7e47cc18..799e9267 100644 --- a/supernode/cmd/start.go +++ b/supernode/cmd/start.go @@ -231,7 +231,8 @@ The supernode will connect to the Lumera network and begin participating in the storageChallengeServer := storageChallengeRPC.NewServer(appConfig.SupernodeConfig.Identity, p2pService, historyStore). WithArtifactReader(newP2PArtifactReader(p2pService)). - WithRecipientSigner(kr, appConfig.SupernodeConfig.KeyName) + WithRecipientSigner(kr, appConfig.SupernodeConfig.KeyName). + WithAuditParams(lumeraClient.Audit()) var storageChallengeRunner *storageChallengeService.Service var recheckRunner *recheckService.Service if appConfig.StorageChallengeConfig.Enabled { diff --git a/supernode/host_reporter/service.go b/supernode/host_reporter/service.go index a57817d0..73c96353 100644 --- a/supernode/host_reporter/service.go +++ b/supernode/host_reporter/service.go @@ -42,6 +42,14 @@ type ProofResultProvider interface { CollectResults(epochID uint64) []*audittypes.StorageProofResult } +// ProofResultRequeuer is implemented by providers that can put drained results +// back if the host reporter decides not to submit the epoch report. This keeps +// the FULL-mode coverage guard from losing late-arriving results when it aborts +// a would-be chain-rejected partial report. +type ProofResultRequeuer interface { + RequeueResults(epochID uint64, results []*audittypes.StorageProofResult) +} + // Service submits one MsgSubmitEpochReport per epoch for the local supernode. // All runtime behavior is driven by on-chain params/queries; there are no local config knobs. type Service struct { @@ -178,6 +186,21 @@ func (s *Service) tick(ctx context.Context) { var storageProofResults []*audittypes.StorageProofResult if proofResultProvider := s.getProofResultProvider(); proofResultProvider != nil { storageProofResults = proofResultProvider.CollectResults(epochID) + if s.fullModeStorageProofCoverageRequired(tickCtx) { + complete, reason := storageProofCoverageComplete(storageProofResults, assignResp.TargetSupernodeAccounts) + if !complete { + if requeuer, ok := proofResultProvider.(ProofResultRequeuer); ok { + requeuer.RequeueResults(epochID, storageProofResults) + } + logtrace.Warn(tickCtx, "epoch report skipped: incomplete FULL-mode storage proof coverage", logtrace.Fields{ + "epoch_id": epochID, + "assigned_targets": len(assignResp.TargetSupernodeAccounts), + "proof_results": len(storageProofResults), + "reason": reason, + }) + return + } + } } hostReport := audittypes.HostReport{ @@ -208,6 +231,50 @@ func (s *Service) tick(ctx context.Context) { }) } +func (s *Service) fullModeStorageProofCoverageRequired(ctx context.Context) bool { + paramsResp, err := s.lumera.Audit().GetParams(ctx) + if err != nil || paramsResp == nil { + return false + } + return paramsResp.Params.StorageTruthEnforcementMode == audittypes.StorageTruthEnforcementMode_STORAGE_TRUTH_ENFORCEMENT_MODE_FULL +} + +func storageProofCoverageComplete(results []*audittypes.StorageProofResult, targets []string) (bool, string) { + if len(targets) == 0 { + return true, "" + } + type coverage struct{ recent, old int } + byTarget := make(map[string]*coverage, len(targets)) + for _, target := range targets { + target = strings.TrimSpace(target) + if target == "" { + continue + } + byTarget[target] = &coverage{} + } + for _, result := range results { + if result == nil { + continue + } + cov := byTarget[result.TargetSupernodeAccount] + if cov == nil { + continue + } + switch result.BucketType { + case audittypes.StorageProofBucketType_STORAGE_PROOF_BUCKET_TYPE_RECENT: + cov.recent++ + case audittypes.StorageProofBucketType_STORAGE_PROOF_BUCKET_TYPE_OLD: + cov.old++ + } + } + for target, cov := range byTarget { + if cov.recent != 1 || cov.old != 1 { + return false, fmt.Sprintf("target %s has recent=%d old=%d; FULL requires exactly one each", target, cov.recent, cov.old) + } + } + return true, "" +} + func (s *Service) diskUsagePercent(ctx context.Context) (float64, bool) { if s.metrics == nil || len(s.storagePaths) == 0 { return 0, false diff --git a/supernode/host_reporter/tick_behavior_test.go b/supernode/host_reporter/tick_behavior_test.go index 572fd38c..b2e02937 100644 --- a/supernode/host_reporter/tick_behavior_test.go +++ b/supernode/host_reporter/tick_behavior_test.go @@ -30,10 +30,11 @@ type stubAuditModule struct { epochReport *audittypes.QueryEpochReportResponse epochReportErr error assigned *audittypes.QueryAssignedTargetsResponse + params audittypes.Params } func (s *stubAuditModule) GetParams(ctx context.Context) (*audittypes.QueryParamsResponse, error) { - return &audittypes.QueryParamsResponse{}, nil + return &audittypes.QueryParamsResponse{Params: s.params}, nil } func (s *stubAuditModule) GetEpochAnchor(ctx context.Context, epochID uint64) (*audittypes.QueryEpochAnchorResponse, error) { return s.anchor, nil @@ -254,8 +255,9 @@ func TestTick_SkipsOnEpochReportLookupError(t *testing.T) { // stubProofResultProvider records the epoch it was queried with and returns a // fixed slice of synthetic StorageProofResult records. type stubProofResultProvider struct { - queriedEpochs []uint64 - results []*audittypes.StorageProofResult + queriedEpochs []uint64 + requeuedEpochs []uint64 + results []*audittypes.StorageProofResult } func (s *stubProofResultProvider) CollectResults(epochID uint64) []*audittypes.StorageProofResult { @@ -263,6 +265,11 @@ func (s *stubProofResultProvider) CollectResults(epochID uint64) []*audittypes.S return s.results } +func (s *stubProofResultProvider) RequeueResults(epochID uint64, results []*audittypes.StorageProofResult) { + s.requeuedEpochs = append(s.requeuedEpochs, epochID) + s.results = append([]*audittypes.StorageProofResult(nil), results...) +} + func TestTick_AttachedProofResultProviderIsDrainedAndForwarded(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -313,3 +320,48 @@ func TestTick_AttachedProofResultProviderIsDrainedAndForwarded(t *testing.T) { t.Fatalf("expected provider queried once for epoch 11, got %v", provider.queriedEpochs) } } + +func TestTick_FULLModeIncompleteStorageProofCoverageSkipsSubmitAndRequeues(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + kr, keyName, identity := testKeyringAndIdentity(t) + auditMod := &stubAuditModule{ + currentEpoch: &audittypes.QueryCurrentEpochResponse{EpochId: 12}, + anchor: &audittypes.QueryEpochAnchorResponse{Anchor: audittypes.EpochAnchor{EpochId: 12}}, + epochReportErr: status.Error(codes.NotFound, "not found"), + assigned: &audittypes.QueryAssignedTargetsResponse{ + TargetSupernodeAccounts: []string{"snA"}, + RequiredOpenPorts: nil, + }, + params: audittypes.Params{StorageTruthEnforcementMode: audittypes.StorageTruthEnforcementMode_STORAGE_TRUTH_ENFORCEMENT_MODE_FULL}, + } + auditMsg := auditmsgmod.NewMockModule(ctrl) + node := nodemod.NewMockModule(ctrl) + sn := supernodemod.NewMockModule(ctrl) + client := lumeraMock.NewMockClient(ctrl) + client.EXPECT().Audit().AnyTimes().Return(auditMod) + client.EXPECT().AuditMsg().AnyTimes().Return(auditMsg) + client.EXPECT().SuperNode().AnyTimes().Return(sn) + client.EXPECT().Node().AnyTimes().Return(node) + sn.EXPECT().GetSupernodeWithLatestAddress(gomock.Any(), "snA").Return(&supernodemod.SuperNodeInfo{LatestAddress: "127.0.0.1:4444"}, nil) + auditMsg.EXPECT().SubmitEpochReport(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(0) + + provider := &stubProofResultProvider{results: []*audittypes.StorageProofResult{ + {TargetSupernodeAccount: "snA", BucketType: audittypes.StorageProofBucketType_STORAGE_PROOF_BUCKET_TYPE_RECENT, TicketId: "ticket-recent", TranscriptHash: "hash-recent"}, + }} + + svc, err := NewService(identity, client, kr, keyName, "", "") + if err != nil { + t.Fatalf("new service: %v", err) + } + svc.SetProofResultProvider(provider) + svc.tick(context.Background()) + + if len(provider.queriedEpochs) != 1 || provider.queriedEpochs[0] != 12 { + t.Fatalf("expected provider queried once for epoch 12, got %v", provider.queriedEpochs) + } + if len(provider.requeuedEpochs) != 1 || provider.requeuedEpochs[0] != 12 { + t.Fatalf("expected incomplete FULL proofs requeued for epoch 12, got %v", provider.requeuedEpochs) + } +} diff --git a/supernode/recheck/attestor.go b/supernode/recheck/attestor.go index e49babad..72717ff7 100644 --- a/supernode/recheck/attestor.go +++ b/supernode/recheck/attestor.go @@ -7,6 +7,7 @@ import ( "strings" audittypes "github.com/LumeraProtocol/lumera/x/audit/v1/types" + "github.com/LumeraProtocol/supernode/v2/pkg/logtrace" "github.com/LumeraProtocol/supernode/v2/pkg/lumera/chainerrors" lep6metrics "github.com/LumeraProtocol/supernode/v2/pkg/metrics/lep6" "github.com/LumeraProtocol/supernode/v2/pkg/storage/queries" @@ -35,6 +36,12 @@ func (a *Attestor) Submit(ctx context.Context, c Candidate, r RecheckResult) err return fmt.Errorf("invalid recheck candidate") } if strings.TrimSpace(r.TranscriptHash) == "" || !validRecheckResultClass(r.ResultClass) { + logtrace.Warn(ctx, "lep6 recheck: dropping invalid local recheck result", logtrace.Fields{ + "epoch_id": c.EpochID, + "ticket_id": c.TicketID, + "target": c.TargetAccount, + "result_class": r.ResultClass.String(), + }) return fmt.Errorf("invalid recheck result") } if err := a.store.RecordPendingRecheckSubmission(ctx, c.EpochID, c.TicketID, c.TargetAccount, c.ChallengedTranscriptHash, r.TranscriptHash, r.ResultClass); err != nil { @@ -75,6 +82,7 @@ func (a *Attestor) Submit(ctx context.Context, c Candidate, r RecheckResult) err func validRecheckResultClass(cls audittypes.StorageProofResultClass) bool { switch cls { case audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_PASS, + audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_HASH_MISMATCH, audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_RECHECK_CONFIRMED_FAIL, audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_TIMEOUT_OR_NO_RESPONSE, audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_OBSERVER_QUORUM_FAIL, diff --git a/supernode/recheck/attestor_test.go b/supernode/recheck/attestor_test.go index 8972c530..841ad47a 100644 --- a/supernode/recheck/attestor_test.go +++ b/supernode/recheck/attestor_test.go @@ -103,3 +103,9 @@ func TestAttestor_RejectsEmptyRequiredFieldsBeforeTx(t *testing.T) { require.Error(t, a.Submit(ctx, candidate, result)) require.Empty(t, msg.calls) } + +func TestValidRecheckResultClassAcceptsHashMismatch(t *testing.T) { + if !validRecheckResultClass(audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_HASH_MISMATCH) { + t.Fatal("HASH_MISMATCH must pass local validation because Lumera chain accepts it for recheck evidence") + } +} diff --git a/supernode/recheck/finder_service_test.go b/supernode/recheck/finder_service_test.go index ecb82dab..35c78140 100644 --- a/supernode/recheck/finder_service_test.go +++ b/supernode/recheck/finder_service_test.go @@ -112,7 +112,7 @@ func TestService_TickModeGateAndSubmit(t *testing.T) { func TestService_TickSkipsRecheckWhenFailureBudgetExhausted(t *testing.T) { ctx := context.Background() store := newMemoryStore() - store.failures[failureKey(10, "t")] = 2 + store.failures[failureKey(10, "t", "target")] = 2 msg := &recordingAuditMsg{} a := &stubAudit{current: 10, mode: audittypes.StorageTruthEnforcementMode_STORAGE_TRUTH_ENFORCEMENT_MODE_FULL, reports: map[uint64]audittypes.EpochReport{10: {StorageProofResults: []*audittypes.StorageProofResult{resFrom("peer", "t", "target", "h", audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_HASH_MISMATCH)}}}} r := &stubRechecker{result: RecheckResult{TranscriptHash: "rh", ResultClass: audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_PASS}} diff --git a/supernode/recheck/service.go b/supernode/recheck/service.go index 2aa86db1..0f71f551 100644 --- a/supernode/recheck/service.go +++ b/supernode/recheck/service.go @@ -115,7 +115,7 @@ func (s *Service) Tick(ctx context.Context) error { if err := ctx.Err(); err != nil { return nil } - blocked, err := s.store.HasRecheckAttemptFailureBudgetExceeded(ctx, c.EpochID, c.TicketID, s.cfg.MaxFailureAttemptsPerTicket) + blocked, err := s.store.HasRecheckAttemptFailureBudgetExceeded(ctx, c.EpochID, c.TicketID, c.TargetAccount, s.cfg.MaxFailureAttemptsPerTicket) if err != nil { logtrace.Warn(ctx, "lep6 recheck: failure budget lookup failed", logtrace.Fields{"epoch_id": c.EpochID, "ticket_id": c.TicketID, "error": err.Error()}) continue diff --git a/supernode/recheck/test_helpers_test.go b/supernode/recheck/test_helpers_test.go index 7b659e6e..970656f4 100644 --- a/supernode/recheck/test_helpers_test.go +++ b/supernode/recheck/test_helpers_test.go @@ -55,17 +55,19 @@ func (m *memoryStore) RecordRecheckSubmission(_ context.Context, epochID uint64, return nil } func (m *memoryStore) RecordRecheckAttemptFailure(_ context.Context, epochID uint64, ticketID, targetAccount string, err error, ttl time.Duration) error { - m.failures[failureKey(epochID, ticketID)]++ + m.failures[failureKey(epochID, ticketID, targetAccount)]++ return nil } -func (m *memoryStore) HasRecheckAttemptFailureBudgetExceeded(_ context.Context, epochID uint64, ticketID string, maxAttempts int) (bool, error) { - return maxAttempts > 0 && m.failures[failureKey(epochID, ticketID)] >= maxAttempts, nil +func (m *memoryStore) HasRecheckAttemptFailureBudgetExceeded(_ context.Context, epochID uint64, ticketID, targetAccount string, maxAttempts int) (bool, error) { + return maxAttempts > 0 && m.failures[failureKey(epochID, ticketID, targetAccount)] >= maxAttempts, nil } func (m *memoryStore) PurgeExpiredRecheckAttemptFailures(_ context.Context) error { return nil } func key(epochID uint64, ticketID, targetAccount string) string { return fmt.Sprintf("%d/%s/%s", epochID, ticketID, targetAccount) } -func failureKey(epochID uint64, ticketID string) string { return fmt.Sprintf("%d/%s", epochID, ticketID) } +func failureKey(epochID uint64, ticketID, targetAccount string) string { + return fmt.Sprintf("%d/%s/%s", epochID, ticketID, targetAccount) +} type recordingAuditMsg struct { calls []submitCall diff --git a/supernode/recheck/types.go b/supernode/recheck/types.go index edee1b93..9f916c61 100644 --- a/supernode/recheck/types.go +++ b/supernode/recheck/types.go @@ -48,7 +48,7 @@ type Store interface { DeletePendingRecheckSubmission(ctx context.Context, epochID uint64, ticketID, targetAccount string) error RecordRecheckSubmission(ctx context.Context, epochID uint64, ticketID, targetAccount, challengedTranscriptHash, recheckTranscriptHash string, resultClass audittypes.StorageProofResultClass) error RecordRecheckAttemptFailure(ctx context.Context, epochID uint64, ticketID, targetAccount string, err error, ttl time.Duration) error - HasRecheckAttemptFailureBudgetExceeded(ctx context.Context, epochID uint64, ticketID string, maxAttempts int) (bool, error) + HasRecheckAttemptFailureBudgetExceeded(ctx context.Context, epochID uint64, ticketID, targetAccount string, maxAttempts int) (bool, error) PurgeExpiredRecheckAttemptFailures(ctx context.Context) error } @@ -79,6 +79,15 @@ func IsRecheckEligibleResultClass(cls audittypes.StorageProofResultClass) bool { } } +// MapRecheckOutcome translates the local recheck verifier outcome into the +// chain result class submitted via MsgSubmitStorageRecheckEvidence. A locally +// confirmed hash mismatch intentionally maps to RECHECK_CONFIRMED_FAIL rather +// than re-emitting HASH_MISMATCH: Lumera chain accepts both classes in +// x/audit/v1/keeper/msg_storage_truth.go, and scoring deliberately gives the +// recheck-confirmed class its own impact bucket in +// x/audit/v1/keeper/storage_truth_scoring.go:492-541. Keeping the conversion +// here makes the supernode submission match the chain's second-stage evidence +// semantics without changing first-stage storage-proof report semantics. func MapRecheckOutcome(outcome Outcome) audittypes.StorageProofResultClass { switch outcome { case OutcomePass: diff --git a/supernode/self_healing/finalizer.go b/supernode/self_healing/finalizer.go index 255d6320..129d1abb 100644 --- a/supernode/self_healing/finalizer.go +++ b/supernode/self_healing/finalizer.go @@ -80,6 +80,9 @@ func (s *Service) publishStagingDir(ctx context.Context, claim queries.HealClaim // has already recorded VERIFIED so no on-chain work pending. return fmt.Errorf("publish staged artefacts: %w", err) } + if err := s.store.DeleteHealClaim(ctx, claim.HealOpID); err != nil { + return fmt.Errorf("delete heal claim row: %w", err) + } if err := os.RemoveAll(claim.StagingDir); err != nil { logtrace.Warn(ctx, "self_healing(LEP-6): staging cleanup after publish failed", logtrace.Fields{ logtrace.FieldError: err.Error(), @@ -87,9 +90,6 @@ func (s *Service) publishStagingDir(ctx context.Context, claim queries.HealClaim "staging_dir": claim.StagingDir, }) } - if err := s.store.DeleteHealClaim(ctx, claim.HealOpID); err != nil { - return fmt.Errorf("delete heal claim row: %w", err) - } lep6metrics.IncHealFinalizePublish() logtrace.Info(ctx, "self_healing(LEP-6): published staged artefacts to KAD", logtrace.Fields{ "heal_op_id": claim.HealOpID, diff --git a/supernode/self_healing/healer.go b/supernode/self_healing/healer.go index 3b770439..9258f09f 100644 --- a/supernode/self_healing/healer.go +++ b/supernode/self_healing/healer.go @@ -105,9 +105,9 @@ func (s *Service) reconstructAndClaim(ctx context.Context, op audittypes.HealOp) _ = os.RemoveAll(stagingDir) lep6metrics.IncHealClaim("deadline_skipped") logtrace.Warn(ctx, "self_healing(LEP-6): heal op deadline passed before submit; skipping", logtrace.Fields{ - "heal_op_id": op.HealOpId, - "deadline": op.DeadlineEpochId, - "staging_dir": stagingDir, + "heal_op_id": op.HealOpId, + "deadline": op.DeadlineEpochId, + "staging_dir": stagingDir, }) return nil } @@ -119,17 +119,28 @@ func (s *Service) reconstructAndClaim(ctx context.Context, op audittypes.HealOp) lep6metrics.IncHealClaim("submit_transient") return fmt.Errorf("submit claim (transient, will retry): %w", err) } - if chainerrors.IsHealOpInvalidState(err) { - if recErr := s.reconcileExistingClaim(ctx, op, manifestHash, stagingDir); recErr != nil { - _ = os.RemoveAll(stagingDir) - return fmt.Errorf("submit failed (%v) and reconcile failed: %w", err, recErr) - } + if chainerrors.IsHealOpPastDeadline(err) { + _ = s.store.DeletePendingHealClaim(ctx, op.HealOpId) + _ = os.RemoveAll(stagingDir) + lep6metrics.IncHealClaim("deadline_rejected") + logtrace.Warn(ctx, "self_healing(LEP-6): chain rejected heal claim after deadline; skipping reconcile", logtrace.Fields{ + "heal_op_id": op.HealOpId, + "deadline": op.DeadlineEpochId, + logtrace.FieldError: err.Error(), + }) return nil } - _ = s.store.DeletePendingHealClaim(ctx, op.HealOpId) - _ = os.RemoveAll(stagingDir) - lep6metrics.IncHealClaim("submit_error") - return fmt.Errorf("submit claim: %w", err) + if chainerrors.IsHealOpInvalidState(err) { + return s.reconcilePendingClaimSubmitError(ctx, op, err) + } + // Matee C3 follow-up: do not destructively drop staging on an + // unclassified submit error until we query chain state. A tx can be + // committed while the client receives a non-canonical transport / ABCI + // wrapper error that is neither IsTransientGrpc nor the typed invalid- + // state sentinel. resumePendingHealClaim promotes the row when chain + // shows our manifest, or deletes pending+staging only when chain still + // has no accepted claim / accepted a different manifest. + return s.reconcilePendingClaimSubmitError(ctx, op, err) } if err := s.store.MarkHealClaimSubmitted(ctx, op.HealOpId); err != nil { @@ -146,6 +157,21 @@ func (s *Service) reconstructAndClaim(ctx context.Context, op audittypes.HealOp) return nil } +func (s *Service) reconcilePendingClaimSubmitError(ctx context.Context, op audittypes.HealOp, submitErr error) error { + if recErr := s.resumePendingHealClaim(ctx, op); recErr != nil { + return fmt.Errorf("submit failed (%v) and pending reconcile failed: %w", submitErr, recErr) + } + hasSubmitted, err := s.store.HasHealClaim(ctx, op.HealOpId) + if err != nil { + return fmt.Errorf("submit failed (%v) and post-reconcile submitted lookup failed: %w", submitErr, err) + } + if hasSubmitted { + return nil + } + lep6metrics.IncHealClaim("submit_error") + return fmt.Errorf("submit claim: %w", submitErr) +} + // reconcileExistingClaim handles the post-crash case where the chain has // advanced past SCHEDULED (i.e. our prior submit was accepted but we lost // the response or crashed before persisting). We re-fetch the op, confirm @@ -231,6 +257,7 @@ func (s *Service) healOpDeadlinePassed(ctx context.Context, op audittypes.HealOp } return resp.EpochId >= op.DeadlineEpochId, nil } + // resumePendingHealClaim is the C5 fix: a `pending` claim row from a // previous tick (crashed between RecordPendingHealClaim and chain ack) // exists locally. We must reconcile against the chain BEFORE either @@ -316,11 +343,11 @@ func (s *Service) resumePendingHealClaim(ctx context.Context, op audittypes.Heal } lep6metrics.IncHealClaim("resume_foreign") logtrace.Warn(ctx, "self_healing(LEP-6): resume foreign-hash (different healer's claim accepted)", logtrace.Fields{ - "heal_op_id": op.HealOpId, - "chain_hash": chainOp.ResultHash, - "pending_hash": row.ManifestHash, - "chain_status": chainOp.Status.String(), - "staging_dir": row.StagingDir, + "heal_op_id": op.HealOpId, + "chain_hash": chainOp.ResultHash, + "pending_hash": row.ManifestHash, + "chain_status": chainOp.Status.String(), + "staging_dir": row.StagingDir, }) return nil default: diff --git a/supernode/self_healing/mocks_test.go b/supernode/self_healing/mocks_test.go index 65736ace..2ecaa488 100644 --- a/supernode/self_healing/mocks_test.go +++ b/supernode/self_healing/mocks_test.go @@ -16,13 +16,15 @@ import ( // reads only GetParams, GetHealOp, and GetHealOpsByStatus, so other methods // are unused and may be left zero. type programmableAudit struct { - mu sync.Mutex - params audittypes.Params - opsByStatus map[audittypes.HealOpStatus][]audittypes.HealOp - opsByID map[uint64]audittypes.HealOp - getOpErr error - blockStatus map[audittypes.HealOpStatus]bool - currentEpoch uint64 // wired into GetCurrentEpoch (H1 deadline-pre-check tests) + mu sync.Mutex + params audittypes.Params + opsByStatus map[audittypes.HealOpStatus][]audittypes.HealOp + opsByID map[uint64]audittypes.HealOp + getOpErr error + blockStatus map[audittypes.HealOpStatus]bool + currentEpoch uint64 // wired into GetCurrentEpoch (H1 deadline-pre-check tests) + currentAnchor audittypes.EpochAnchor + epochAnchors map[uint64]audittypes.EpochAnchor } func newProgrammableAudit(mode audittypes.StorageTruthEnforcementMode) *programmableAudit { @@ -30,9 +32,10 @@ func newProgrammableAudit(mode audittypes.StorageTruthEnforcementMode) *programm params: audittypes.Params{ StorageTruthEnforcementMode: mode, }, - opsByStatus: map[audittypes.HealOpStatus][]audittypes.HealOp{}, - opsByID: map[uint64]audittypes.HealOp{}, - blockStatus: map[audittypes.HealOpStatus]bool{}, + opsByStatus: map[audittypes.HealOpStatus][]audittypes.HealOp{}, + opsByID: map[uint64]audittypes.HealOp{}, + blockStatus: map[audittypes.HealOpStatus]bool{}, + epochAnchors: map[uint64]audittypes.EpochAnchor{}, } } @@ -95,10 +98,14 @@ func (p *programmableAudit) GetHealOpsByTicket(ctx context.Context, ticketID str return &audittypes.QueryHealOpsByTicketResponse{}, nil } func (p *programmableAudit) GetEpochAnchor(ctx context.Context, epochID uint64) (*audittypes.QueryEpochAnchorResponse, error) { - return &audittypes.QueryEpochAnchorResponse{}, nil + p.mu.Lock() + defer p.mu.Unlock() + return &audittypes.QueryEpochAnchorResponse{Anchor: p.epochAnchors[epochID]}, nil } func (p *programmableAudit) GetCurrentEpochAnchor(ctx context.Context) (*audittypes.QueryCurrentEpochAnchorResponse, error) { - return &audittypes.QueryCurrentEpochAnchorResponse{}, nil + p.mu.Lock() + defer p.mu.Unlock() + return &audittypes.QueryCurrentEpochAnchorResponse{Anchor: p.currentAnchor}, nil } func (p *programmableAudit) GetCurrentEpoch(ctx context.Context) (*audittypes.QueryCurrentEpochResponse, error) { p.mu.Lock() diff --git a/supernode/self_healing/service.go b/supernode/self_healing/service.go index b16c313d..394c1c44 100644 --- a/supernode/self_healing/service.go +++ b/supernode/self_healing/service.go @@ -51,6 +51,7 @@ import ( "fmt" "os" "path/filepath" + "strconv" "strings" "sync" "time" @@ -82,6 +83,11 @@ const ( // or hung RaptorQ from holding its semaphore slot + inFlight key // forever. defaultDispatchOpTimeout = 15 * time.Minute + // defaultEstimatedChainBlockTime is used only to translate chain epoch-anchor + // block deltas into a wall-clock deadline for per-op contexts when the chain + // exposes heights but not timestamps. The hard DispatchOpTimeout remains the + // upper safety cap. + defaultEstimatedChainBlockTime = 6 * time.Second ) // Config captures supernode-binary-owned tunables for the LEP-6 heal runtime. @@ -245,6 +251,63 @@ func New( }, nil } +func (s *Service) cleanupOrphanedStagingDirs(ctx context.Context) error { + claims, err := s.store.ListHealClaims(ctx) + if err != nil { + return fmt.Errorf("list heal claims: %w", err) + } + known := make(map[uint64]struct{}, len(claims)) + for _, claim := range claims { + known[claim.HealOpID] = struct{}{} + } + entries, err := os.ReadDir(s.cfg.StagingRoot) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return fmt.Errorf("read staging root: %w", err) + } + for _, entry := range entries { + if !entry.IsDir() { + continue + } + healOpID, ok := parseNumericHealOpDir(entry.Name()) + if !ok { + continue + } + if _, exists := known[healOpID]; exists { + continue + } + path := filepath.Join(s.cfg.StagingRoot, entry.Name()) + if err := os.RemoveAll(path); err != nil { + return fmt.Errorf("remove orphaned staging dir %q: %w", path, err) + } + lep6metrics.IncHealOrphanedStagingCleanup() + logtrace.Warn(ctx, "self_healing(LEP-6): removed orphaned staging dir", logtrace.Fields{ + "heal_op_id": healOpID, + "staging_dir": path, + "staging_root": s.cfg.StagingRoot, + }) + } + return nil +} + +func parseNumericHealOpDir(name string) (uint64, bool) { + if name == "" { + return 0, false + } + for _, r := range name { + if r < '0' || r > '9' { + return 0, false + } + } + id, err := strconv.ParseUint(name, 10, 64) + if err != nil { + return 0, false + } + return id, true +} + // Run blocks until ctx is cancelled, ticking every cfg.PollInterval. // Tick steps (single mechanism per LEP-6 plan §C.4 finalizer Opt-2b decision): // @@ -274,6 +337,9 @@ func (s *Service) Run(ctx context.Context) error { "max_concurrent_publishes": s.cfg.MaxConcurrentPublishes, "staging_root": s.cfg.StagingRoot, }) + if err := s.cleanupOrphanedStagingDirs(ctx); err != nil { + logtrace.Warn(ctx, "self_healing(LEP-6): cleanup orphaned staging dirs", logtrace.Fields{logtrace.FieldError: err.Error()}) + } t := time.NewTicker(s.cfg.PollInterval) defer t.Stop() for { @@ -393,7 +459,7 @@ func (s *Service) dispatchHealerOps(ctx context.Context) error { defer s.inFlight.Delete(key) // M2 fix: bound the per-op goroutine so a wedged // reconstruct or hung RaptorQ releases its semaphore slot. - opCtx, cancel := s.dispatchOpContext(ctx) + opCtx, cancel := s.dispatchOpContextForHealOp(ctx, op) defer cancel() if err := s.reconstructAndClaim(opCtx, op); err != nil { logtrace.Warn(ctx, "self_healing(LEP-6): reconstructAndClaim", logtrace.Fields{ @@ -467,7 +533,7 @@ func (s *Service) dispatchVerifierOps(ctx context.Context) error { go func(op audittypes.HealOp, key string) { defer s.inFlight.Delete(key) // M2 fix: bound per-op verifier goroutine. - opCtx, cancel := s.dispatchOpContext(ctx) + opCtx, cancel := s.dispatchOpContextForHealOp(ctx, op) defer cancel() logtrace.Info(opCtx, "self_healing(LEP-6): verifier dispatch start", logtrace.Fields{ "identity": s.identity, @@ -575,6 +641,66 @@ func (s *Service) dispatchOpContext(ctx context.Context) (context.Context, conte return context.WithTimeout(ctx, timeout) } +// dispatchOpContextForHealOp derives the per-op context from the earlier of: +// +// - the configured hard DispatchOpTimeout; and +// - op.DeadlineEpochId translated through chain epoch-anchor heights. +// +// Lumera chain epoch anchors carry block heights (not timestamps), so the +// translation uses a conservative estimated block time and still falls back to +// DispatchOpTimeout if anchor queries fail or produce unusable data. This keeps +// the M2 leak guard intact while respecting the chain heal-op deadline when it +// can be derived locally. +func (s *Service) dispatchOpContextForHealOp(ctx context.Context, op audittypes.HealOp) (context.Context, context.CancelFunc) { + deadline, ok := s.healOpWallDeadline(ctx, op) + if !ok { + return s.dispatchOpContext(ctx) + } + hardTimeout := s.cfg.DispatchOpTimeout + if hardTimeout <= 0 { + hardTimeout = defaultDispatchOpTimeout + } + hardDeadline := time.Now().Add(hardTimeout) + if deadline.After(hardDeadline) { + deadline = hardDeadline + } + return context.WithDeadline(ctx, deadline) +} + +func (s *Service) healOpWallDeadline(ctx context.Context, op audittypes.HealOp) (time.Time, bool) { + if op.DeadlineEpochId == 0 { + return time.Time{}, false + } + queryCtx, cancel := s.auditQueryContext(ctx) + currentResp, err := s.lumera.Audit().GetCurrentEpochAnchor(queryCtx) + cancel() + if err != nil || currentResp == nil { + return time.Time{}, false + } + current := currentResp.Anchor + if current.EpochId == 0 || current.EpochEndHeight <= 0 { + return time.Time{}, false + } + if current.EpochId >= op.DeadlineEpochId { + return time.Now(), true + } + queryCtx, cancel = s.auditQueryContext(ctx) + deadlineResp, err := s.lumera.Audit().GetEpochAnchor(queryCtx, op.DeadlineEpochId) + cancel() + if err != nil || deadlineResp == nil { + return time.Time{}, false + } + deadlineAnchor := deadlineResp.Anchor + if deadlineAnchor.EpochEndHeight <= current.EpochEndHeight { + return time.Now(), true + } + remainingBlocks := deadlineAnchor.EpochEndHeight - current.EpochEndHeight + if remainingBlocks <= 0 { + return time.Time{}, false + } + return time.Now().Add(time.Duration(remainingBlocks) * defaultEstimatedChainBlockTime), true +} + func totalStagingBytes(claims []queries.HealClaimRecord) int64 { var total int64 for _, claim := range claims { diff --git a/supernode/self_healing/service_test.go b/supernode/self_healing/service_test.go index 3924c669..97727da1 100644 --- a/supernode/self_healing/service_test.go +++ b/supernode/self_healing/service_test.go @@ -12,6 +12,7 @@ import ( audittypes "github.com/LumeraProtocol/lumera/x/audit/v1/types" "github.com/LumeraProtocol/supernode/v2/pkg/cascadekit" + lep6metrics "github.com/LumeraProtocol/supernode/v2/pkg/metrics/lep6" "github.com/LumeraProtocol/supernode/v2/pkg/storage/queries" cascadeService "github.com/LumeraProtocol/supernode/v2/supernode/cascade" "lukechampine.com/blake3" @@ -76,6 +77,50 @@ func newTestStore(t *testing.T) queries.LocalStoreInterface { return store } +func TestCleanupOrphanedStagingDirsRemovesOnlyNumericDirsWithoutClaims(t *testing.T) { + h := newHarness(t, "self", audittypes.StorageTruthEnforcementMode_STORAGE_TRUTH_ENFORCEMENT_MODE_SHADOW) + ctx := context.Background() + lep6metrics.Reset() + + pendingHash := hashOf(t, []byte("pending")) + pendingDir := makeStagingDir(t, h.stagingRoot, 101, pendingHash, []byte("pending")) + if err := h.store.RecordPendingHealClaim(ctx, 101, "ticket-101", pendingHash, pendingDir); err != nil { + t.Fatalf("seed pending claim: %v", err) + } + orphanDir := filepath.Join(h.stagingRoot, "202") + if err := os.MkdirAll(orphanDir, 0o700); err != nil { + t.Fatalf("mkdir orphan: %v", err) + } + nonNumericDir := filepath.Join(h.stagingRoot, "not-a-heal-op") + if err := os.MkdirAll(nonNumericDir, 0o700); err != nil { + t.Fatalf("mkdir nonnumeric: %v", err) + } + regularFile := filepath.Join(h.stagingRoot, "303") + if err := os.WriteFile(regularFile, []byte("not a dir"), 0o600); err != nil { + t.Fatalf("write regular file: %v", err) + } + + if err := h.svc.cleanupOrphanedStagingDirs(ctx); err != nil { + t.Fatalf("cleanup: %v", err) + } + + if _, err := os.Stat(pendingDir); err != nil { + t.Fatalf("pending dir must remain: %v", err) + } + if _, err := os.Stat(orphanDir); !os.IsNotExist(err) { + t.Fatalf("orphan dir must be removed, stat err=%v", err) + } + if _, err := os.Stat(nonNumericDir); err != nil { + t.Fatalf("non-numeric dir must remain: %v", err) + } + if _, err := os.Stat(regularFile); err != nil { + t.Fatalf("numeric regular file must remain: %v", err) + } + if got := lep6metrics.Snapshot().HealOrphanedStagingCleanupsTotal; got != 1 { + t.Fatalf("orphan cleanup metric: got %d want 1", got) + } +} + // fakeFetcher returns a configurable response. Configure per-test by // reassigning .body / .err. type fakeFetcher struct { @@ -402,6 +447,49 @@ func TestHealer_ReconcilesExistingChainClaimAfterCrash(t *testing.T) { } } +func TestHealer_UnclassifiedSubmitErrorQueriesChainBeforeCleanup(t *testing.T) { + h := newHarness(t, "sn-healer", audittypes.StorageTruthEnforcementMode_STORAGE_TRUTH_ENFORCEMENT_MODE_FULL) + body := []byte("recovered-payload-unclassified") + wantHash := hashOf(t, body) + h.cascade.reseedFn = func(ctx context.Context, req *cascadeService.RecoveryReseedRequest) (*cascadeService.RecoveryReseedResult, error) { + _ = makeStagingDir(t, h.stagingRoot, 24, wantHash, body) + return &cascadeService.RecoveryReseedResult{ + ActionID: req.ActionID, + DataHashVerified: true, + ReconstructedHashB64: wantHash, + StagingDir: req.StagingDir, + }, nil + } + // This is intentionally not a typed invalid-state error and not a + // transient gRPC code. It models the C3 lost/garbled-ack window where + // BroadcastTx may have committed, but the client receives an opaque wrapper. + h.auditMsg.claimErr = errors.New("opaque broadcast failure after commit") + h.audit.put(audittypes.HealOp{ + HealOpId: 24, + TicketId: "ticket-unclassified", + Status: audittypes.HealOpStatus_HEAL_OP_STATUS_HEALER_REPORTED, + HealerSupernodeAccount: "sn-healer", + ResultHash: wantHash, + }) + op := audittypes.HealOp{ + HealOpId: 24, + TicketId: "ticket-unclassified", + Status: audittypes.HealOpStatus_HEAL_OP_STATUS_SCHEDULED, + HealerSupernodeAccount: "sn-healer", + } + if err := h.svc.reconstructAndClaim(context.Background(), op); err != nil { + t.Fatalf("reconstructAndClaim: %v", err) + } + has, _ := h.store.HasHealClaim(context.Background(), 24) + if !has { + t.Fatalf("unclassified submit error must reconcile accepted chain claim before cleanup") + } + stagingDir := filepath.Join(h.stagingRoot, "24") + if _, err := os.Stat(stagingDir); err != nil { + t.Fatalf("staging dir must remain for finalizer after accepted chain claim: %v", err) + } +} + func TestHealer_ReconcileHashMismatchCleansStagingWithoutPersisting(t *testing.T) { h := newHarness(t, "sn-healer", audittypes.StorageTruthEnforcementMode_STORAGE_TRUTH_ENFORCEMENT_MODE_FULL) body := []byte("recovered-payload-23") diff --git a/supernode/self_healing/wave3_regression_test.go b/supernode/self_healing/wave3_regression_test.go new file mode 100644 index 00000000..39986d76 --- /dev/null +++ b/supernode/self_healing/wave3_regression_test.go @@ -0,0 +1,61 @@ +package self_healing + +import ( + "context" + "os" + "strings" + "testing" + "time" + + audittypes "github.com/LumeraProtocol/lumera/x/audit/v1/types" +) + +func TestDispatchOpContextForHealOpUsesEpochAnchorDeadlineWhenEarlier(t *testing.T) { + h := newHarness(t, "sn-healer", audittypes.StorageTruthEnforcementMode_STORAGE_TRUTH_ENFORCEMENT_MODE_FULL) + h.svc.cfg.DispatchOpTimeout = time.Hour + h.audit.currentAnchor = audittypes.EpochAnchor{EpochId: 10, EpochEndHeight: 100} + h.audit.epochAnchors[11] = audittypes.EpochAnchor{EpochId: 11, EpochEndHeight: 101} + + ctx, cancel := h.svc.dispatchOpContextForHealOp(context.Background(), audittypes.HealOp{HealOpId: 1, DeadlineEpochId: 11}) + defer cancel() + deadline, ok := ctx.Deadline() + if !ok { + t.Fatalf("expected derived deadline") + } + remaining := time.Until(deadline) + if remaining <= 0 || remaining > 30*time.Second { + t.Fatalf("expected deadline derived from 1 remaining chain block, got %s", remaining) + } +} + +func TestDispatchOpContextForHealOpFallsBackToHardTimeoutWhenAnchorsMissing(t *testing.T) { + h := newHarness(t, "sn-healer", audittypes.StorageTruthEnforcementMode_STORAGE_TRUTH_ENFORCEMENT_MODE_FULL) + h.svc.cfg.DispatchOpTimeout = 50 * time.Millisecond + + ctx, cancel := h.svc.dispatchOpContextForHealOp(context.Background(), audittypes.HealOp{HealOpId: 1, DeadlineEpochId: 99}) + defer cancel() + deadline, ok := ctx.Deadline() + if !ok { + t.Fatalf("expected hard timeout deadline") + } + remaining := time.Until(deadline) + if remaining <= 0 || remaining > time.Second { + t.Fatalf("expected hard timeout fallback, got %s", remaining) + } +} + +func TestPublishStagingDirDeletesClaimBeforeRemovingStagingDir(t *testing.T) { + src, err := os.ReadFile("finalizer.go") + if err != nil { + t.Fatal(err) + } + body := string(src) + deleteIdx := strings.Index(body, "DeleteHealClaim(ctx, claim.HealOpID)") + removeIdx := strings.Index(body, "os.RemoveAll(claim.StagingDir)") + if deleteIdx < 0 || removeIdx < 0 { + t.Fatalf("expected publishStagingDir cleanup calls to exist") + } + if deleteIdx > removeIdx { + t.Fatalf("publishStagingDir must delete durable claim row before removing staging dir") + } +} diff --git a/supernode/storage_challenge/lep6_dispatch.go b/supernode/storage_challenge/lep6_dispatch.go index db3c2c6e..328e6872 100644 --- a/supernode/storage_challenge/lep6_dispatch.go +++ b/supernode/storage_challenge/lep6_dispatch.go @@ -323,6 +323,27 @@ func (d *LEP6Dispatcher) appendNoEligible( bucket audittypes.StorageProofBucketType, selectedTicketIDForLog string, ) { + // Wave 2 / F-PR286-02: Lumera chain validateNoEligibleTicketConsistency + // rejects NO_ELIGIBLE_TICKET when a recent eligible transcript exists for + // the same (target,bucket) within its consistency window. The current + // supernode audit.Module does not expose that chain history query, so this + // guard covers the safe local subset: never emit NO_ELIGIBLE when this + // process already buffered an eligible row for the same epoch/target/bucket. + // Skipping is safer than poisoning the whole report; in FULL mode Wave 1 + // coverage checks will abort submission. A selectedTicketIDForLog alone is + // not enough to suppress: H6 class-roll fallback intentionally emits + // NO_ELIGIBLE when the selected ticket has no rolled concrete class. + if buf != nil && buf.HasEligibleResult(epochID, target, bucket) { + lep6metrics.IncDispatchInternalFailure("no_eligible_consistency_suppressed") + logtrace.Warn(ctx, "lep6 dispatch: suppressed no-eligible row due to eligible-ticket consistency", logtrace.Fields{ + "epoch_id": epochID, + "target": target, + "bucket": bucket.String(), + "selected_ticket": selectedTicketIDForLog, + }) + return + } + transcriptHashHex, err := deterministic.TranscriptHash(deterministic.TranscriptInputs{ EpochID: epochID, ChallengerSupernodeAccount: d.self, diff --git a/supernode/storage_challenge/lep6_dispatch_test.go b/supernode/storage_challenge/lep6_dispatch_test.go index e854aa27..6c967dd7 100644 --- a/supernode/storage_challenge/lep6_dispatch_test.go +++ b/supernode/storage_challenge/lep6_dispatch_test.go @@ -208,6 +208,36 @@ func newDispatcher( return d, buf } +func TestAppendNoEligiblePreservedWhenOnlySelectedTicketExists(t *testing.T) { + audit := &dispatchAuditModule{} + d, buf := newDispatcher(t, audit, &stubFactory{}, NoTicketProvider{}, stubMetaProvider{}) + anchor := makeAnchor(9, 1000, "target-1") + + d.appendNoEligible(context.Background(), buf, 9, anchor, "target-1", audittypes.StorageProofBucketType_STORAGE_PROOF_BUCKET_TYPE_RECENT, "ticket-existing") + + results := buf.CollectResults(9) + require.Len(t, results, 1, "selected ticket alone is not a chain transcript-history conflict; H6 class-roll fallback still emits NO_ELIGIBLE") + require.Equal(t, audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_NO_ELIGIBLE_TICKET, results[0].ResultClass) +} + +func TestAppendNoEligibleSuppressedWhenBufferedEligibleResultExists(t *testing.T) { + audit := &dispatchAuditModule{} + d, buf := newDispatcher(t, audit, &stubFactory{}, NoTicketProvider{}, stubMetaProvider{}) + anchor := makeAnchor(10, 1000, "target-1") + bucket := audittypes.StorageProofBucketType_STORAGE_PROOF_BUCKET_TYPE_OLD + buf.Append(10, &audittypes.StorageProofResult{ + TargetSupernodeAccount: "target-1", + BucketType: bucket, + ResultClass: audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_PASS, + }) + + d.appendNoEligible(context.Background(), buf, 10, anchor, "target-1", bucket, "") + + results := buf.CollectResults(10) + require.Len(t, results, 1) + require.Equal(t, audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_PASS, results[0].ResultClass) +} + func TestDispatchEpoch_ModeUnspecified_NoOp(t *testing.T) { audit := &dispatchAuditModule{ params: &audittypes.QueryParamsResponse{ diff --git a/supernode/storage_challenge/result_buffer.go b/supernode/storage_challenge/result_buffer.go index d66a8704..c12fbf92 100644 --- a/supernode/storage_challenge/result_buffer.go +++ b/supernode/storage_challenge/result_buffer.go @@ -68,9 +68,13 @@ func (b *Buffer) Append(epochID uint64, result *audittypes.StorageProofResult) { if result == nil { return } + b.appendEntry(epochID, result, time.Now()) +} + +func (b *Buffer) appendEntry(epochID uint64, result *audittypes.StorageProofResult, arrivedAt time.Time) { entry := &bufferedResult{ result: result, - arrivedAt: time.Now(), + arrivedAt: arrivedAt, seq: b.seq.Add(1), } b.mu.Lock() @@ -78,8 +82,40 @@ func (b *Buffer) Append(epochID uint64, result *audittypes.StorageProofResult) { b.byEpoch[epochID] = append(b.byEpoch[epochID], entry) } +// RequeueResults puts previously drained results back under epochID. It is used +// by host_reporter when FULL-mode coverage is incomplete, so the next tick can +// retry instead of losing locally generated proofs. +func (b *Buffer) RequeueResults(epochID uint64, results []*audittypes.StorageProofResult) { + now := time.Now() + for _, result := range results { + if result != nil { + b.appendEntry(epochID, result, now) + } + } +} + +// HasEligibleResult reports whether the current in-memory buffer already has a +// non-NO_ELIGIBLE row for (epoch,target,bucket). It is intentionally scoped to +// this process/epoch; the current Lumera audit query interface does not expose +// the chain keeper's transcript-history index used by validateNoEligibleTicketConsistency. +func (b *Buffer) HasEligibleResult(epochID uint64, target string, bucket audittypes.StorageProofBucketType) bool { + b.mu.Lock() + defer b.mu.Unlock() + for _, entry := range b.byEpoch[epochID] { + if entry == nil || entry.result == nil { + continue + } + r := entry.result + if r.TargetSupernodeAccount == target && r.BucketType == bucket && + r.ResultClass != audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_NO_ELIGIBLE_TICKET { + return true + } + } + return false +} + // CollectResults drains and returns the buffered results for epochID, applying -// the LEP-6 16-cap self-throttle. Results buffered for other epochs are left +// the LEP-6 chain cap self-throttle. Results buffered for other epochs are left // intact. The returned slice is sorted deterministically by // (BucketType, EpochId, TicketId) so that downstream signing/serialisation is // stable across challengers and re-runs. diff --git a/supernode/transport/grpc/storage_challenge/handler.go b/supernode/transport/grpc/storage_challenge/handler.go index 76e916b4..6d4c9c3f 100644 --- a/supernode/transport/grpc/storage_challenge/handler.go +++ b/supernode/transport/grpc/storage_challenge/handler.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "strings" + "sync" "time" audittypes "github.com/LumeraProtocol/lumera/x/audit/v1/types" @@ -22,7 +23,10 @@ import ( "lukechampine.com/blake3" ) -const maxServedSliceBytes = uint64(65_536) +const ( + maxServedSliceBytes = uint64(65_536) + compoundCapsTTL = time.Minute +) // ArtifactReader is the recipient-side abstraction over cascade artifact storage // used to satisfy LEP-6 multi-range compound storage challenges. The B.3 wiring @@ -31,6 +35,15 @@ type ArtifactReader interface { ReadArtifactRange(ctx context.Context, class audittypes.StorageProofArtifactClass, key string, start, end uint64) ([]byte, error) } +type AuditParamReader interface { + GetParams(ctx context.Context) (*audittypes.QueryParamsResponse, error) +} + +type compoundCaps struct { + maxRanges uint32 + maxLen uint32 +} + type Server struct { supernode.UnimplementedStorageChallengeServiceServer @@ -45,6 +58,11 @@ type Server struct { // recipient_signature stays empty. keyring keyring.Keyring keyName string + + auditParams AuditParamReader + capsMu sync.RWMutex + caps compoundCaps + capsUntil time.Time } func NewServer(identity string, p2pClient p2p.Client, store queries.LocalStoreInterface) *Server { @@ -67,6 +85,53 @@ func (s *Server) WithRecipientSigner(kr keyring.Keyring, keyName string) *Server return s } +func (s *Server) WithAuditParams(audit AuditParamReader) *Server { + s.auditParams = audit + return s +} + +func (s *Server) WithCompoundCapsForTest(maxRanges, maxLen uint32) *Server { + s.capsMu.Lock() + defer s.capsMu.Unlock() + s.caps = compoundCaps{maxRanges: maxRanges, maxLen: maxLen} + s.capsUntil = time.Now().Add(24 * time.Hour) + return s +} + +func (s *Server) compoundCaps(ctx context.Context) compoundCaps { + fallback := compoundCaps{maxRanges: audittypes.DefaultStorageTruthCompoundRangesPerArtifact, maxLen: audittypes.DefaultStorageTruthCompoundRangeLenBytes} + now := time.Now() + s.capsMu.RLock() + cached, valid := s.caps, now.Before(s.capsUntil) + s.capsMu.RUnlock() + if valid && cached.maxRanges > 0 && cached.maxLen > 0 { + return cached + } + if s.auditParams == nil { + return fallback + } + resp, err := s.auditParams.GetParams(ctx) + if err != nil || resp == nil { + logtrace.Warn(ctx, "storage challenge: failed to refresh chain compound caps; using fallback", logtrace.Fields{"error": fmt.Sprint(err)}) + return fallback + } + caps := compoundCaps{ + maxRanges: resp.Params.StorageTruthCompoundRangesPerArtifact, + maxLen: resp.Params.StorageTruthCompoundRangeLenBytes, + } + if caps.maxRanges == 0 { + caps.maxRanges = audittypes.DefaultStorageTruthCompoundRangesPerArtifact + } + if caps.maxLen == 0 { + caps.maxLen = audittypes.DefaultStorageTruthCompoundRangeLenBytes + } + s.capsMu.Lock() + s.caps = caps + s.capsUntil = now.Add(compoundCapsTTL) + s.capsMu.Unlock() + return caps +} + func (s *Server) GetSliceProof(ctx context.Context, req *supernode.GetSliceProofRequest) (*supernode.GetSliceProofResponse, error) { if req == nil { return nil, fmt.Errorf("nil request") @@ -313,11 +378,10 @@ func (s *Server) GetCompoundProof(ctx context.Context, req *supernode.GetCompoun resp.Error = "at least one range is required" return resp, nil } - // LEP-6 §11 hardening (C6): bound per-call range count to prevent DoS / - // bulk-exfil. Spec k=4; cap 16 leaves headroom for chain-param drift. - if len(req.Ranges) > deterministic.MaxCompoundRanges { + caps := s.compoundCaps(ctx) + if len(req.Ranges) > int(caps.maxRanges) { return nil, status.Errorf(codes.InvalidArgument, - "too many ranges: got %d, max %d", len(req.Ranges), deterministic.MaxCompoundRanges) + "too many ranges: got %d, max %d", len(req.Ranges), caps.maxRanges) } var requestRangeLen uint64 for i, rng := range req.Ranges { @@ -341,16 +405,18 @@ func (s *Server) GetCompoundProof(ctx context.Context, req *supernode.GetCompoun return resp, nil } } - // C6: per-range length cap (defends against giant single-range exfil). - if requestRangeLen > deterministic.MaxCompoundRangeLenBytes { + if requestRangeLen > uint64(caps.maxLen) { return nil, status.Errorf(codes.InvalidArgument, - "range length %d exceeds cap %d", requestRangeLen, deterministic.MaxCompoundRangeLenBytes) + "range length %d exceeds cap %d", requestRangeLen, caps.maxLen) } - // C6: aggregate-bytes cap across all ranges (spec aggregate is 1 KiB; cap 16 KiB). aggregate := requestRangeLen * uint64(len(req.Ranges)) - if aggregate > uint64(deterministic.MaxCompoundAggregateBytes) { + aggregateCap := uint64(caps.maxRanges) * uint64(caps.maxLen) + if aggregateCap > uint64(deterministic.MaxCompoundAggregateBytes) { + aggregateCap = uint64(deterministic.MaxCompoundAggregateBytes) + } + if aggregate > aggregateCap { return nil, status.Errorf(codes.InvalidArgument, - "aggregate range bytes %d exceeds cap %d", aggregate, deterministic.MaxCompoundAggregateBytes) + "aggregate range bytes %d exceeds cap %d", aggregate, aggregateCap) } if s.reader == nil { diff --git a/supernode/transport/grpc/storage_challenge/handler_compound_caps_test.go b/supernode/transport/grpc/storage_challenge/handler_compound_caps_test.go index 844cdf67..69ab2def 100644 --- a/supernode/transport/grpc/storage_challenge/handler_compound_caps_test.go +++ b/supernode/transport/grpc/storage_challenge/handler_compound_caps_test.go @@ -4,6 +4,7 @@ import ( "context" "testing" + audittypes "github.com/LumeraProtocol/lumera/x/audit/v1/types" "github.com/LumeraProtocol/supernode/v2/gen/supernode" "github.com/LumeraProtocol/supernode/v2/pkg/storagechallenge/deterministic" "github.com/stretchr/testify/require" @@ -80,7 +81,9 @@ func TestGetCompoundProof_AggregateAtExactCap(t *testing.T) { t.Parallel() reader := &deterministicReader{} - srv := NewServer("recipient-1", &testP2PClient{}, nil).WithArtifactReader(reader) + srv := NewServer("recipient-1", &testP2PClient{}, nil). + WithArtifactReader(reader). + WithCompoundCapsForTest(uint32(deterministic.MaxCompoundRanges), uint32(deterministic.MaxCompoundRangeLenBytes)) // 16 ranges × 1024 bytes/range = 16384 bytes = MaxCompoundAggregateBytes exactly. rl := uint64(deterministic.MaxCompoundAggregateBytes / deterministic.MaxCompoundRanges) @@ -97,3 +100,68 @@ func TestGetCompoundProof_AggregateAtExactCap(t *testing.T) { require.True(t, resp.Ok, "error: %s", resp.Error) require.Equal(t, deterministic.MaxCompoundRanges, reader.calls) } + +func TestGetCompoundProofHonorsChainParamCaps(t *testing.T) { + srv := NewServer("recipient-1", &testP2PClient{}, nil). + WithArtifactReader(&deterministicReader{}). + WithCompoundCapsForTest(4, 256) + req := validCompoundRequestForCaps(5, 128) + _, err := srv.GetCompoundProof(context.Background(), req) + if status.Code(err) != codes.InvalidArgument { + t.Fatalf("expected InvalidArgument for 5 ranges over chain cap 4, got %v", err) + } + + req = validCompoundRequestForCaps(4, 257) + _, err = srv.GetCompoundProof(context.Background(), req) + if status.Code(err) != codes.InvalidArgument { + t.Fatalf("expected InvalidArgument for len 257 over chain cap 256, got %v", err) + } +} + +func validCompoundRequestForCaps(n int, size uint64) *supernode.GetCompoundProofRequest { + ranges := make([]*supernode.ByteRange, 0, n) + for i := 0; i < n; i++ { + start := uint64(i) * size + ranges = append(ranges, &supernode.ByteRange{Start: start, End: start + size}) + } + return &supernode.GetCompoundProofRequest{ + ChallengeId: "challenge-caps", + EpochId: 7, + TicketId: "ticket-caps", + ArtifactClass: uint32(audittypes.StorageProofArtifactClass_STORAGE_PROOF_ARTIFACT_CLASS_SYMBOL), + ArtifactKey: "artifact-caps", + ArtifactSize: uint64(n)*size + 1, + BucketType: uint32(audittypes.StorageProofBucketType_STORAGE_PROOF_BUCKET_TYPE_RECENT), + ArtifactOrdinal: 0, + ArtifactCount: 1, + Ranges: ranges, + } +} + +func TestGetCompoundProof_AggregateBytesCapBranch(t *testing.T) { + t.Parallel() + + reader := &deterministicReader{} + srv := NewServer("recipient-1", &testP2PClient{}, nil). + WithArtifactReader(reader). + WithCompoundCapsForTest(20, 1024) + + // Each range is within the chain-param count/length caps, but the total + // payload exceeds MaxCompoundAggregateBytes so the aggregate guard itself + // must reject before any artifact bytes are read. + rangeLen := uint64(1000) + ranges := make([]*supernode.ByteRange, 0, 17) + for i := uint64(0); i < 17; i++ { + ranges = append(ranges, &supernode.ByteRange{Start: i * (1 << 20), End: i*(1<<20) + rangeLen}) + } + req := compoundRequestWith(ranges, 1<<30) + + resp, err := srv.GetCompoundProof(context.Background(), req) + require.Error(t, err) + require.Nil(t, resp) + st, ok := status.FromError(err) + require.True(t, ok) + require.Equal(t, codes.InvalidArgument, st.Code()) + require.Contains(t, st.Message(), "aggregate") + require.Equal(t, 0, reader.calls) +}