From c697d82495847eda2252f3c26deb96ea702b5525 Mon Sep 17 00:00:00 2001 From: Phil Windle Date: Sun, 31 May 2026 20:59:35 +0000 Subject: [PATCH] refactor(p2p): split message processing from libp2p networking Extract received-message handling (validation, persistence, consensus callbacks, and the reqresp data handlers) out of LibP2PService into a new P2PMessageProcessor. The processor is built in the factory (the composition root) and injected into both the libp2p service and the P2PClient; consensus callbacks now register directly on the processor rather than on the networking service. LibP2PService is reduced to the networking layer. No behaviour change. This draws the boundary along the already-mostly-async P2PService interface in preparation for moving the libp2p stack onto a worker thread. --- yarn-project/p2p/src/client/factory.ts | 34 +- .../p2p/src/client/p2p_client.test.ts | 2 + yarn-project/p2p/src/client/p2p_client.ts | 18 +- .../p2p_client.integration_block_txs.test.ts | 2 +- .../p2p/src/services/dummy_service.ts | 38 +- .../services/libp2p/libp2p_service.test.ts | 123 ++- .../p2p/src/services/libp2p/libp2p_service.ts | 895 +----------------- .../services/libp2p/p2p_message_processor.ts | 895 ++++++++++++++++++ yarn-project/p2p/src/services/service.ts | 22 - .../p2p/src/test-helpers/mock-pubsub.ts | 35 +- .../p2p/src/test-helpers/reqresp-nodes.ts | 17 +- .../testbench/p2p_client_testbench_worker.ts | 36 +- 12 files changed, 1070 insertions(+), 1047 deletions(-) create mode 100644 yarn-project/p2p/src/services/libp2p/p2p_message_processor.ts diff --git a/yarn-project/p2p/src/client/factory.ts b/yarn-project/p2p/src/client/factory.ts index 3f271e43744f..220281f0ae46 100644 --- a/yarn-project/p2p/src/client/factory.ts +++ b/yarn-project/p2p/src/client/factory.ts @@ -27,6 +27,7 @@ import { import { TxValidationCache } from '../msg_validators/tx_validator/tx_validation_cache.js'; import { DummyP2PService } from '../services/dummy_service.js'; import { LibP2PService } from '../services/index.js'; +import { P2PMessageProcessor } from '../services/libp2p/p2p_message_processor.js'; import { createFileStoreTxSources } from '../services/tx_collection/file_store_tx_source.js'; import { TxCollection } from '../services/tx_collection/tx_collection.js'; import { NodeRpcTxSource, type TxSource, createNodeRpcTxSources } from '../services/tx_collection/tx_source.js'; @@ -141,21 +142,33 @@ export async function createP2PClient( const txValidationCache = config.txValidationCacheSize > 0 ? new TxValidationCache(config.txValidationCacheSize) : undefined; - const p2pService = await createP2PService( + // The message processor owns received-message handling (validation, persistence, consensus callbacks). + // It is shared between the P2PClient (which registers callbacks on it) and the libp2p service (which + // invokes it when messages arrive), so it is built here and injected into both. + const processor = new P2PMessageProcessor( config, + mempools, archiver, + epochCache, proofVerifier, worldStateSynchronizer, - epochCache, blockMinFeesProvider, + telemetry, + logger.createChild('libp2p_service'), + txValidationCache, + ); + + const p2pService = await createP2PService( + config, + worldStateSynchronizer, + epochCache, store, peerStore, - mempools, + processor, deps.p2pServiceFactory, packageVersion, logger.createChild('libp2p_service'), telemetry, - txValidationCache, ); const txValidatorForTxCollection = createTxValidatorForOnDemandReceivedTxs( @@ -215,6 +228,7 @@ export async function createP2PClient( archiver, mempools, p2pService, + processor, txCollection, txFileStore, epochCache, @@ -228,19 +242,15 @@ export async function createP2PClient( async function createP2PService( config: P2PConfig & DataStoreConfig, - archiver: L2BlockSource & ContractDataSource, - proofVerifier: ClientProtocolCircuitVerifier, worldStateSynchronizer: WorldStateSynchronizer, epochCache: EpochCacheInterface, - blockMinFeesProvider: BlockMinFeesProvider, store: AztecAsyncKVStore, peerStore: AztecLMDBStoreV2, - mempools: MemPools, + processor: P2PMessageProcessor, p2pServiceFactory: P2PClientDeps['p2pServiceFactory'], packageVersion: string, logger: Logger, telemetry: TelemetryClient, - txValidationCache?: TxValidationCache, ) { if (!config.p2pEnabled) { logger.verbose('P2P is disabled. Using dummy P2P service.'); @@ -255,16 +265,12 @@ async function createP2PService( const p2pService = await (p2pServiceFactory ?? LibP2PService.new)(config, peerId, { packageVersion, - mempools, - l2BlockSource: archiver, + processor, epochCache, - proofVerifier, worldStateSynchronizer, peerStore, - blockMinFeesProvider, telemetry, logger: logger.createChild(`libp2p_service`), - txValidationCache, }); return p2pService; diff --git a/yarn-project/p2p/src/client/p2p_client.test.ts b/yarn-project/p2p/src/client/p2p_client.test.ts index 01c03e0d206d..adaba314dc52 100644 --- a/yarn-project/p2p/src/client/p2p_client.test.ts +++ b/yarn-project/p2p/src/client/p2p_client.test.ts @@ -18,6 +18,7 @@ import type { P2PService } from '../index.js'; import { type AttestationPool, createTestAttestationPool } from '../mem_pools/attestation_pool/attestation_pool.js'; import type { MemPools } from '../mem_pools/interface.js'; import type { TxPoolV2 } from '../mem_pools/tx_pool_v2/interfaces.js'; +import type { P2PMessageProcessor } from '../services/libp2p/p2p_message_processor.js'; import type { TxCollection } from '../services/tx_collection/tx_collection.js'; import { P2PClient } from './p2p_client.js'; @@ -68,6 +69,7 @@ describe('P2P Client', () => { blockSource, mempools, p2pService, + mock(), txCollection, undefined, epochCache, diff --git a/yarn-project/p2p/src/client/p2p_client.ts b/yarn-project/p2p/src/client/p2p_client.ts index 77fc6e8be11c..5290f096794f 100644 --- a/yarn-project/p2p/src/client/p2p_client.ts +++ b/yarn-project/p2p/src/client/p2p_client.ts @@ -39,6 +39,7 @@ import type { AttestationPoolApi, ProposalsForSlot } from '../mem_pools/attestat import type { MemPools } from '../mem_pools/interface.js'; import type { TxPoolV2 } from '../mem_pools/tx_pool_v2/interfaces.js'; import type { AuthRequest, StatusMessage } from '../services/index.js'; +import type { P2PMessageProcessor } from '../services/libp2p/p2p_message_processor.js'; import { ReqRespSubProtocol, type ReqRespSubProtocolHandler } from '../services/reqresp/interface.js'; import type { DuplicateAttestationInfo, @@ -91,6 +92,7 @@ export class P2PClient extends WithTracer implements P2P { private l2BlockSource: L2BlockSource & ContractDataSource, mempools: MemPools, private p2pService: P2PService, + private processor: P2PMessageProcessor, private txCollection: TxCollection, private txFileStore: TxFileStore | undefined, private epochCache: EpochCacheInterface, @@ -406,30 +408,30 @@ export class P2PClient extends WithTracer implements P2P { return this.attestationPool.hasBlockProposalsForSlot(slot); } - // REVIEW: https://github.com/AztecProtocol/aztec-packages/issues/7963 - // ^ This pattern is not my favorite (md) + // Consensus callbacks are registered directly on the message processor (the main-thread component + // that owns received-message handling), not on the libp2p networking service. public registerBlockProposalHandler(handler: P2PBlockReceivedCallback): void { - this.p2pService.registerBlockReceivedCallback(handler); + this.processor.registerBlockReceivedCallback(handler); } public registerValidatorCheckpointProposalHandler(handler: P2PCheckpointReceivedCallback): void { - this.p2pService.registerValidatorCheckpointReceivedCallback(handler); + this.processor.registerValidatorCheckpointReceivedCallback(handler); } public registerAllNodesCheckpointProposalHandler(handler: P2PCheckpointReceivedCallback): void { - this.p2pService.registerAllNodesCheckpointReceivedCallback(handler); + this.processor.registerAllNodesCheckpointReceivedCallback(handler); } public registerDuplicateProposalCallback(callback: (info: DuplicateProposalInfo) => void): void { - this.p2pService.registerDuplicateProposalCallback(callback); + this.processor.registerDuplicateProposalCallback(callback); } public registerDuplicateAttestationCallback(callback: (info: DuplicateAttestationInfo) => void): void { - this.p2pService.registerDuplicateAttestationCallback(callback); + this.processor.registerDuplicateAttestationCallback(callback); } public registerCheckpointAttestationCallback(callback: (attestation: CheckpointAttestation) => void): void { - this.p2pService.registerCheckpointAttestationCallback(callback); + this.processor.registerCheckpointAttestationCallback(callback); } public async getPendingTxs(limit?: number, after?: TxHash): Promise { diff --git a/yarn-project/p2p/src/client/test/p2p_client.integration_block_txs.test.ts b/yarn-project/p2p/src/client/test/p2p_client.integration_block_txs.test.ts index a0f8c6960b4e..b50ea6c084dc 100644 --- a/yarn-project/p2p/src/client/test/p2p_client.integration_block_txs.test.ts +++ b/yarn-project/p2p/src/client/test/p2p_client.integration_block_txs.test.ts @@ -451,7 +451,7 @@ describe('p2p client integration block txs protocol ', () => { // Run the response through client1's real validation — this is what BatchTxRequester does in // production. The peer must not be penalized. - await (client1.p2pService as any).validateRequestedBlockTxsConsistency( + await (client1.p2pService as any).processor.validateRequestedBlockTxsConsistency( request, decoded, client2.p2pService.node.peerId, diff --git a/yarn-project/p2p/src/services/dummy_service.ts b/yarn-project/p2p/src/services/dummy_service.ts index 7e132b28ef95..00d9c4a1fbb9 100644 --- a/yarn-project/p2p/src/services/dummy_service.ts +++ b/yarn-project/p2p/src/services/dummy_service.ts @@ -22,23 +22,12 @@ import type { } from './reqresp/interface.js'; import type { GoodByeReason } from './reqresp/protocols/goodbye.js'; import { ReqRespStatus } from './reqresp/status.js'; -import { - type P2PBlockReceivedCallback, - type P2PCheckpointAttestationCallback, - type P2PCheckpointReceivedCallback, - type P2PDuplicateAttestationCallback, - type P2PDuplicateProposalCallback, - type P2PService, - type PeerDiscoveryService, - PeerDiscoveryState, -} from './service.js'; +import { type P2PService, type PeerDiscoveryService, PeerDiscoveryState } from './service.js'; /** * A dummy implementation of the P2P Service. */ export class DummyP2PService implements P2PService { - private allNodesCheckpointReceivedCallback?: P2PCheckpointReceivedCallback; - updateConfig(_config: Partial): void {} /** Returns an empty array for peers. */ @@ -80,31 +69,6 @@ export class DummyP2PService implements P2PService { */ public settledTxs(_: TxHash[]) {} - /** - * Register a callback into the validator client for when a block proposal is received - */ - public registerBlockReceivedCallback(_callback: P2PBlockReceivedCallback) {} - - /** - * Register a callback into the validator client for when a checkpoint proposal is received - */ - public registerValidatorCheckpointReceivedCallback(_callback: P2PCheckpointReceivedCallback) {} - public registerAllNodesCheckpointReceivedCallback(callback: P2PCheckpointReceivedCallback) { - this.allNodesCheckpointReceivedCallback = callback; - } - - /** - * Register a callback for when a duplicate proposal is detected - */ - public registerDuplicateProposalCallback(_callback: P2PDuplicateProposalCallback): void {} - - /** - * Register a callback for when a duplicate attestation is detected - */ - public registerDuplicateAttestationCallback(_callback: P2PDuplicateAttestationCallback): void {} - - public registerCheckpointAttestationCallback(_callback: P2PCheckpointAttestationCallback): void {} - /** * Sends a request to a peer. * @param _protocol - The protocol to send the request on. diff --git a/yarn-project/p2p/src/services/libp2p/libp2p_service.test.ts b/yarn-project/p2p/src/services/libp2p/libp2p_service.test.ts index 2b95d58cdce0..f09eab4939b9 100644 --- a/yarn-project/p2p/src/services/libp2p/libp2p_service.test.ts +++ b/yarn-project/p2p/src/services/libp2p/libp2p_service.test.ts @@ -46,6 +46,7 @@ import { BitVector } from '../reqresp/protocols/block_txs/bitvector.js'; import { BlockTxsRequest, BlockTxsResponse } from '../reqresp/protocols/block_txs/block_txs_reqresp.js'; import type { PeerDiscoveryService } from '../service.js'; import { LibP2PService } from './libp2p_service.js'; +import { P2PMessageProcessor } from './p2p_message_processor.js'; describe('LibP2PService', () => { const MOCK_PEER_ID = 'peer-id-123'; @@ -1433,6 +1434,40 @@ interface CreateTestLibP2PServiceOptions { * Test subclass of LibP2PService that exposes protected methods for testing * and allows construction with mocked dependencies. */ +/** + * Test message processor that lets tests control validator outcomes via flags on the owning service. + * The validator factories live on the processor, so the test flags are read through a back-reference. + */ +class TestP2PMessageProcessor extends P2PMessageProcessor { + public testService!: TestLibP2PService; + + /** Override to use test flag for first-stage validators. Returns a failing validator when firstStageValidationPasses is false. */ + protected override createFirstStageMessageValidators(): Promise> { + if (this.testService.firstStageValidationPasses) { + return Promise.resolve({}); + } + return Promise.resolve({ + [this.testService.firstStageFailingValidatorName]: { + validator: { validateTx: () => Promise.resolve({ result: 'invalid' as const, reason: ['Test failure'] }) }, + severity: this.testService.firstStageSeverity, + }, + }); + } + + /** Override to use test flag for second-stage validators. Returns a failing validator when secondStageValidationPasses is false. */ + protected override createSecondStageMessageValidators(): Record { + if (this.testService.secondStageValidationPasses) { + return {}; + } + return { + proofValidator: { + validator: { validateTx: () => Promise.resolve({ result: 'invalid' as const, reason: ['Proof failure'] }) }, + severity: PeerErrorSeverity.LowToleranceError, + }, + }; + } +} + class TestLibP2PService extends LibP2PService { /** Controls whether first-stage gossip validation passes. Set to false to simulate first-stage failure. */ public firstStageValidationPasses = true; @@ -1481,12 +1516,8 @@ class TestLibP2PService extends LibP2PService { verifyProof: () => Promise.resolve({ valid: true, durationMs: 1000, totalDurationMs: 1000 }), }); - super( + const processor = new TestP2PMessageProcessor( mockConfig, - node, - resolvedPeerDiscoveryService, - mockReqResp, - peerManager, mempools, archiver, epochCache, @@ -1497,6 +1528,12 @@ class TestLibP2PService extends LibP2PService { logger, ); + super(mockConfig, node, resolvedPeerDiscoveryService, mockReqResp, peerManager, processor, telemetry, logger); + + // The processor reads validator-control flags off this service; wire the back-reference now that + // super() has run and `this` is available. + processor.testService = this; + this.mockPeerDiscoveryService = resolvedPeerDiscoveryService; this.testEpochCache = epochCache; @@ -1512,39 +1549,13 @@ class TestLibP2PService extends LibP2PService { return super.handleGossipedTx(payloadData, msgId, source); } - /** Override to use test flag for first-stage validators. Returns a failing validator when firstStageValidationPasses is false. */ - protected override createFirstStageMessageValidators(): Promise> { - if (this.firstStageValidationPasses) { - return Promise.resolve({}); - } - return Promise.resolve({ - [this.firstStageFailingValidatorName]: { - validator: { validateTx: () => Promise.resolve({ result: 'invalid' as const, reason: ['Test failure'] }) }, - severity: this.firstStageSeverity, - }, - }); - } - - /** Override to use test flag for second-stage validators. Returns a failing validator when secondStageValidationPasses is false. */ - protected override createSecondStageMessageValidators(): Record { - if (this.secondStageValidationPasses) { - return {}; - } - return { - proofValidator: { - validator: { validateTx: () => Promise.resolve({ result: 'invalid' as const, reason: ['Proof failure'] }) }, - severity: PeerErrorSeverity.LowToleranceError, - }, - }; - } - - /** Exposes the protected validateRequestedBlockTxsConsistency for testing. */ - public override validateRequestedBlockTxsConsistency( + /** Exposes the processor's validateRequestedBlockTxsConsistency for testing. */ + public validateRequestedBlockTxsConsistency( request: BlockTxsRequest, response: BlockTxsResponse, peerId: PeerId, ): Promise { - return super.validateRequestedBlockTxsConsistency(request, response, peerId); + return this.processor.validateRequestedBlockTxsConsistency(request, response, peerId); } /** Exposes the protected processBlockFromPeer for testing. */ @@ -1557,14 +1568,50 @@ class TestLibP2PService extends LibP2PService { return super.handleGossipedCheckpointProposal(payloadData, msgId, source); } - /** Exposes the protected validateAndStoreCheckpointAttestation for testing. */ - public override validateAndStoreCheckpointAttestation(peerId: PeerId, attestation: CheckpointAttestation) { - return super.validateAndStoreCheckpointAttestation(peerId, attestation); + /** Exposes the processor's validateAndStoreCheckpointAttestation for testing. */ + public validateAndStoreCheckpointAttestation(peerId: PeerId, attestation: CheckpointAttestation) { + return this.processor.validateAndStoreCheckpointAttestation(peerId, attestation); + } + + // Consensus callbacks now live on the processor; expose registration through the service for the tests + // that drive the gossip dispatchers directly. + public registerBlockReceivedCallback(cb: Parameters[0]): void { + this.processor.registerBlockReceivedCallback(cb); + } + + public registerValidatorCheckpointReceivedCallback( + cb: Parameters[0], + ): void { + this.processor.registerValidatorCheckpointReceivedCallback(cb); + } + + public registerAllNodesCheckpointReceivedCallback( + cb: Parameters[0], + ): void { + this.processor.registerAllNodesCheckpointReceivedCallback(cb); + } + + public registerDuplicateProposalCallback( + cb: Parameters[0], + ): void { + this.processor.registerDuplicateProposalCallback(cb); + } + + public registerDuplicateAttestationCallback( + cb: Parameters[0], + ): void { + this.processor.registerDuplicateAttestationCallback(cb); + } + + public registerCheckpointAttestationCallback( + cb: Parameters[0], + ): void { + this.processor.registerCheckpointAttestationCallback(cb); } /** Sets the attestation pool on the mempools for test setup. */ public setAttestationPool(attestationPool: MockAttestationPoolForTests): void { - (this.mempools as any).attestationPool = attestationPool; + (this.processor as any).mempools.attestationPool = attestationPool; } } diff --git a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts index c943f74a8c2a..480595bddf98 100644 --- a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts +++ b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts @@ -1,32 +1,24 @@ import type { EpochCacheInterface } from '@aztec/epoch-cache'; -import { BlockNumber, type SlotNumber } from '@aztec/foundation/branded-types'; -import { maxBy, merge } from '@aztec/foundation/collection'; +import { merge } from '@aztec/foundation/collection'; import { type Logger, createLibp2pComponentLogger, createLogger } from '@aztec/foundation/log'; import { RunningPromise } from '@aztec/foundation/running-promise'; import { Timer } from '@aztec/foundation/timer'; import type { AztecAsyncKVStore } from '@aztec/kv-store'; -import { protocolContractsHash } from '@aztec/protocol-contracts'; -import type { EthAddress, L2BlockSource } from '@aztec/stdlib/block'; -import type { ContractDataSource } from '@aztec/stdlib/contract'; -import { type BlockMinFeesProvider, GasFees } from '@aztec/stdlib/gas'; -import type { ClientProtocolCircuitVerifier, PeerInfo, WorldStateSynchronizer } from '@aztec/stdlib/interfaces/server'; +import type { EthAddress } from '@aztec/stdlib/block'; +import type { PeerInfo, WorldStateSynchronizer } from '@aztec/stdlib/interfaces/server'; import { BlockProposal, CheckpointAttestation, CheckpointProposal, - type CheckpointProposalCore, type Gossipable, P2PMessage, PeerErrorSeverity, - PeerErrorSeverityByHarshness, TopicType, createTopicString, getTopicsForConfig, metricsTopicStrToLabels, } from '@aztec/stdlib/p2p'; -import { MerkleTreeId } from '@aztec/stdlib/trees'; -import { Tx, type TxValidationResult } from '@aztec/stdlib/tx'; -import type { UInt64 } from '@aztec/stdlib/types'; +import { Tx } from '@aztec/stdlib/tx'; import { compressComponentVersions } from '@aztec/stdlib/versioning'; import { Attributes, @@ -34,7 +26,6 @@ import { SpanStatusCode, type TelemetryClient, WithTracer, - trackSpan, } from '@aztec/telemetry-client'; import { @@ -58,24 +49,7 @@ import { ENR } from '@nethermindeth/enr'; import { createLibp2p } from 'libp2p'; import type { P2PConfig } from '../../config.js'; -import { CheckpointProposalReceivedCallbackNotRegisteredError } from '../../errors/p2p-service.error.js'; -import type { MemPools } from '../../mem_pools/interface.js'; -import { - BlockProposalValidator, - CheckpointAttestationValidator, - CheckpointProposalValidator, - DoubleSpendTxValidator, - FishermanAttestationValidator, - getDefaultAllowedSetupFunctions, -} from '../../msg_validators/index.js'; import { MessageSeenValidator } from '../../msg_validators/msg_seen_validator/msg_seen_validator.js'; -import { - type TransactionValidator, - createFirstStageTxValidationsForGossipedTransactions, - createSecondStageTxValidationsForGossipedTransactions, - createTxValidatorForBlockProposalReceivedTxs, -} from '../../msg_validators/tx_validator/factory.js'; -import { TxValidationCache } from '../../msg_validators/tx_validator/tx_validation_cache.js'; import { GossipSubEvent } from '../../types/index.js'; import { type PubSubLibp2p, convertToMultiaddr } from '../../util.js'; import { getVersions } from '../../versioning.js'; @@ -91,44 +65,19 @@ import type { BatchTxRequesterLibP2PService } from '../reqresp/batch-tx-requeste import type { P2PReqRespConfig } from '../reqresp/config.js'; import { AuthRequest, - BlockTxsRequest, - BlockTxsResponse, type ReqRespInterface, type ReqRespResponse, ReqRespSubProtocol, type ReqRespSubProtocolHandler, type ReqRespSubProtocolHandlers, StatusMessage, - ValidationError, pingHandler, reqGoodbyeHandler, - reqRespBlockTxsHandler, - reqRespStatusHandler, - reqRespTxHandler, } from '../reqresp/index.js'; import { ReqResp } from '../reqresp/reqresp.js'; -import type { - P2PBlockReceivedCallback, - P2PCheckpointAttestationCallback, - P2PCheckpointReceivedCallback, - P2PDuplicateAttestationCallback, - P2PService, - PeerDiscoveryService, -} from '../service.js'; +import type { P2PService, PeerDiscoveryService } from '../service.js'; import { P2PInstrumentation } from './instrumentation.js'; - -interface ValidationResult { - name: string; - isValid: TxValidationResult; - severity: PeerErrorSeverity; -} - -type ValidationOutcome = { allPassed: true } | { allPassed: false; failure: ValidationResult }; - -// REFACTOR: Unify with the type above -type ReceivedMessageValidationResult = - | { obj: T; result: Exclude; metadata?: M } - | { obj?: T; result: TopicValidatorResult.Reject; metadata?: M; severity: PeerErrorSeverity }; +import { P2PMessageProcessor, type ReceivedMessageValidationResult } from './p2p_message_processor.js'; /** * Lib P2P implementation of the P2PService interface. @@ -137,47 +86,9 @@ export class LibP2PService extends WithTracer implements P2PService { private discoveryRunningPromise?: RunningPromise; private msgIdSeenValidators: Record = {} as Record; - // Message validators - private blockProposalValidator: BlockProposalValidator; - private checkpointProposalValidator: CheckpointProposalValidator; - private checkpointAttestationValidator: CheckpointAttestationValidator; - private protocolVersion = ''; private topicStrings: Record = {} as Record; - /** Callback invoked when a duplicate proposal is detected (triggers slashing). */ - private duplicateProposalCallback?: (info: { - slot: SlotNumber; - proposer: EthAddress; - type: 'checkpoint' | 'block'; - }) => void; - - /** Callback invoked when a duplicate attestation is detected (triggers slashing). */ - private duplicateAttestationCallback?: P2PDuplicateAttestationCallback; - - /** Callback invoked when a valid checkpoint attestation is accepted into the pool. */ - private checkpointAttestationCallback?: P2PCheckpointAttestationCallback; - - /** - * Callback for when a block is received from a peer. - * @param block - The block received from the peer. - * @returns The attestation for the block, if any. - */ - private blockReceivedCallback: P2PBlockReceivedCallback; - - /** - * Callback for when a checkpoint proposal is received from a peer. - * @param checkpoint - The checkpoint proposal received from the peer. - * @returns The attestations for the checkpoint, if any. - */ - private allNodesCheckpointReceivedCallback: P2PCheckpointReceivedCallback; - /** - * Callback for when a checkpoint proposal is received - specifically for validators - from a peer. - * @param checkpoint - The checkpoint proposal received from the peer. - * @returns The attestations for the checkpoint, if any. - */ - private validatorCheckpointReceivedCallback: P2PCheckpointReceivedCallback; - private gossipSubEventHandler: (e: CustomEvent) => void; private ipChangedHandler?: (ip: string) => void; @@ -189,21 +100,18 @@ export class LibP2PService extends WithTracer implements P2PService { protected logger: Logger; + /** Handles the content of received messages: validation against node state, persistence, and consensus callbacks. */ + protected processor: P2PMessageProcessor; + constructor( private config: P2PConfig, protected node: PubSubLibp2p, private peerDiscoveryService: PeerDiscoveryService, private reqresp: ReqRespInterface, protected peerManager: PeerManagerInterface, - protected mempools: MemPools, - protected archiver: L2BlockSource & ContractDataSource, - private epochCache: EpochCacheInterface, - private proofVerifier: ClientProtocolCircuitVerifier, - private worldStateSynchronizer: WorldStateSynchronizer, - private blockMinFeesProvider: BlockMinFeesProvider, + processor: P2PMessageProcessor, telemetry: TelemetryClient, logger: Logger = createLogger('p2p:libp2p_service'), - private txValidationCache?: TxValidationCache, ) { super(telemetry, 'LibP2PService'); this.telemetry = telemetry; @@ -233,50 +141,10 @@ export class LibP2PService extends WithTracer implements P2PService { this.protocolVersion, ); - const p2pPropagationTime = config.attestationPropagationTime; - const proposalValidatorOpts = { - txsPermitted: !config.disableTransactions, - maxTxsPerBlock: config.validateMaxTxsPerBlock ?? config.validateMaxTxsPerCheckpoint, - maxBlocksPerCheckpoint: config.maxBlocksPerCheckpoint, - p2pPropagationTime, - skipSlotValidation: config.skipProposalSlotValidation, - signatureContext: { - chainId: config.l1ChainId, - rollupAddress: config.rollupAddress, - }, - }; - this.blockProposalValidator = new BlockProposalValidator(epochCache, proposalValidatorOpts); - this.checkpointProposalValidator = new CheckpointProposalValidator(epochCache, proposalValidatorOpts); - const attestationValidatorOpts = { - l1PublishingTime: config.l1PublishingTime, - p2pPropagationTime, - signatureContext: proposalValidatorOpts.signatureContext, - }; - this.checkpointAttestationValidator = config.fishermanMode - ? new FishermanAttestationValidator(epochCache, mempools.attestationPool, telemetry, attestationValidatorOpts) - : new CheckpointAttestationValidator(epochCache, attestationValidatorOpts); - this.gossipSubEventHandler = this.handleGossipSubEvent.bind(this); - this.blockReceivedCallback = async (block: BlockProposal): Promise => { - this.logger.warn( - `Handler for block received not yet registered on P2P service. Received block ${block.blockNumber} for slot ${block.slotNumber} from peer.`, - { p2pMessageIdentifier: await block.p2pMessageLoggingIdentifier() }, - ); - return true; - }; - - this.allNodesCheckpointReceivedCallback = ( - _checkpoint: CheckpointProposalCore, - ): Promise => { - throw new CheckpointProposalReceivedCallbackNotRegisteredError(); - }; - - this.validatorCheckpointReceivedCallback = ( - _checkpoint: CheckpointProposalCore, - ): Promise => { - return Promise.resolve(undefined); - }; + this.processor = processor; + this.processor.setNetwork(this); } public updateConfig(config: Partial>) { @@ -294,32 +162,16 @@ export class LibP2PService extends WithTracer implements P2PService { config: P2PConfig, peerId: PeerId, deps: { - mempools: MemPools; - l2BlockSource: L2BlockSource & ContractDataSource; + processor: P2PMessageProcessor; epochCache: EpochCacheInterface; - proofVerifier: ClientProtocolCircuitVerifier; worldStateSynchronizer: WorldStateSynchronizer; peerStore: AztecAsyncKVStore; - blockMinFeesProvider: BlockMinFeesProvider; telemetry: TelemetryClient; logger: Logger; packageVersion: string; - txValidationCache?: TxValidationCache; }, ) { - const { - worldStateSynchronizer, - epochCache, - l2BlockSource, - mempools, - proofVerifier, - peerStore, - blockMinFeesProvider, - telemetry, - logger, - packageVersion, - txValidationCache, - } = deps; + const { processor, worldStateSynchronizer, epochCache, peerStore, telemetry, logger, packageVersion } = deps; const { p2pPort, maxPeerCount, listenAddress } = config; const bindAddrTcp = convertToMultiaddr(listenAddress, p2pPort, 'tcp'); @@ -512,22 +364,7 @@ export class LibP2PService extends WithTracer implements P2PService { node.services.pubsub.score.params.appSpecificScore = (peerId: string) => peerManager.shouldDisableP2PGossip(peerId) ? -Infinity : peerManager.getPeerScore(peerId); - return new LibP2PService( - config, - node, - peerDiscoveryService, - reqresp, - peerManager, - mempools, - l2BlockSource, - epochCache, - proofVerifier, - worldStateSynchronizer, - blockMinFeesProvider, - telemetry, - logger, - txValidationCache, - ); + return new LibP2PService(config, node, peerDiscoveryService, reqresp, peerManager, processor, telemetry, logger); } /** @@ -546,30 +383,16 @@ export class LibP2PService extends WithTracer implements P2PService { } const announceTcpMultiaddr = p2pIp ? convertToMultiaddr(p2pIp, p2pPort, 'tcp') : undefined; - // Create request response protocol handlers - const txHandler = reqRespTxHandler(this.mempools); + // Create request response protocol handlers. Handlers that serve data from node state (status, tx, + // block txs) come from the message processor; lifecycle handlers (ping, goodbye) live here. const goodbyeHandler = reqGoodbyeHandler(this.peerManager); - const statusHandler = reqRespStatusHandler(this.protocolVersion, this.worldStateSynchronizer, this.logger); const requestResponseHandlers: Partial = { [ReqRespSubProtocol.PING]: pingHandler, - [ReqRespSubProtocol.STATUS]: statusHandler.bind(this), [ReqRespSubProtocol.GOODBYE]: goodbyeHandler.bind(this), + ...this.processor.createReqRespDataHandlers(this.protocolVersion), }; - if (!this.config.disableTransactions) { - const blockTxsHandler = reqRespBlockTxsHandler( - this.mempools.attestationPool, - this.archiver, - this.mempools.txPool, - ); - requestResponseHandlers[ReqRespSubProtocol.BLOCK_TXS] = blockTxsHandler.bind(this); - } - - if (!this.config.disableTransactions) { - requestResponseHandlers[ReqRespSubProtocol.TX] = txHandler.bind(this); - } - await this.peerManager.initializePeers(); await this.reqresp.start(requestResponseHandlers); @@ -710,41 +533,6 @@ export class LibP2PService extends WithTracer implements P2PService { return this.peerDiscoveryService.getEnr(); } - public registerBlockReceivedCallback(callback: P2PBlockReceivedCallback) { - this.blockReceivedCallback = callback; - } - - public registerValidatorCheckpointReceivedCallback(callback: P2PCheckpointReceivedCallback) { - this.validatorCheckpointReceivedCallback = callback; - } - - public registerAllNodesCheckpointReceivedCallback(callback: P2PCheckpointReceivedCallback) { - this.allNodesCheckpointReceivedCallback = callback; - } - - /** - * Registers a callback to be invoked when a duplicate proposal is detected. - * This callback is triggered on the first duplicate (when count goes from 1 to 2). - */ - public registerDuplicateProposalCallback( - callback: (info: { slot: SlotNumber; proposer: EthAddress; type: 'checkpoint' | 'block' }) => void, - ): void { - this.duplicateProposalCallback = callback; - } - - /** - * Registers a callback to be invoked when a duplicate attestation is detected. - * A validator signing attestations for different proposals at the same slot. - * This callback is triggered on the first duplicate (when count goes from 1 to 2). - */ - public registerDuplicateAttestationCallback(callback: P2PDuplicateAttestationCallback): void { - this.duplicateAttestationCallback = callback; - } - - public registerCheckpointAttestationCallback(callback: P2PCheckpointAttestationCallback): void { - this.checkpointAttestationCallback = callback; - } - /** * Subscribes to a topic. * @param topic - The topic to subscribe to. @@ -1003,84 +791,12 @@ export class LibP2PService extends WithTracer implements P2PService { } protected async handleGossipedTx(payloadData: Buffer, msgId: string, source: PeerId) { - const validationFunc: () => Promise> = async () => { + const validationFunc: () => Promise> = () => { const tx = this.tryDeserialize(() => Tx.fromBuffer(payloadData), msgId, source); if (!tx) { - return { result: TopicValidatorResult.Reject, severity: PeerErrorSeverity.LowToleranceError }; - } - - const currentBlockNumber = await this.archiver.getBlockNumber(); - const { ts: nextSlotTimestamp } = this.epochCache.getEpochAndSlotInNextL1Slot(); - - // Stage 1: fast validators (metadata, data, timestamps, double-spend, gas, phases, block header) - const firstStageValidators = await this.createFirstStageMessageValidators(currentBlockNumber, nextSlotTimestamp); - const firstStageOutcome = await this.runValidations(tx, firstStageValidators); - if (!firstStageOutcome.allPassed) { - const { name } = firstStageOutcome.failure; - let { severity } = firstStageOutcome.failure; - - // Double spend validator has a special case handler. We perform more detailed examination - // as to how recently the nullifier was entered into the tree and if the transaction should - // have 'known' the nullifier existed. This determines the severity of the penalty applied to the peer. - if (name === 'doubleSpendValidator') { - const txBlockNumber = BlockNumber(currentBlockNumber + 1); - severity = await this.handleDoubleSpendFailure(tx, txBlockNumber); - } - - this.logger.verbose(`Rejecting gossiped tx ${tx.getTxHash().toString()}: stage 1 validation failed`, { - validator: name, - severity, - source: source.toString(), - }); - return { result: TopicValidatorResult.Reject, severity }; - } - - // Pool pre-check: see if the pool would accept this tx before doing expensive proof verification - const canAdd = await this.mempools.txPool.canAddPendingTx(tx); - if (canAdd === 'ignored') { - this.logger.verbose(`Ignoring gossiped tx ${tx.getTxHash().toString()}: pool pre-check returned ignored`, { - source: source.toString(), - }); - return { result: TopicValidatorResult.Ignore, obj: tx }; - } - - // Stage 2: expensive proof verification - const secondStageValidators = this.createSecondStageMessageValidators(); - const secondStageOutcome = await this.runValidations(tx, secondStageValidators); - if (!secondStageOutcome.allPassed) { - const { severity, name } = secondStageOutcome.failure; - this.logger.verbose(`Rejecting gossiped tx ${tx.getTxHash().toString()}: stage 2 validation failed`, { - validator: name, - severity, - source: source.toString(), - }); - return { result: TopicValidatorResult.Reject, severity }; - } - - // Pool add: persist the tx - const txHash = tx.getTxHash(); - const addResult = await this.mempools.txPool.addPendingTxs([tx], { source: 'gossip' }); - - const wasAccepted = addResult.accepted.some(h => h.equals(txHash)); - const wasIgnored = addResult.ignored.some(h => h.equals(txHash)); - - this.logger.verbose(`Validate propagated tx ${txHash.toString()}`, { - wasAccepted, - wasIgnored, - [Attributes.P2P_ID]: source.toString(), - }); - - if (wasAccepted) { - return { result: TopicValidatorResult.Accept, obj: tx }; - } else if (wasIgnored) { - return { result: TopicValidatorResult.Ignore, obj: tx }; - } else { - this.logger.warn(`Gossiped tx ${txHash.toString()} unexpectedly rejected by pool`, { - source: source.toString(), - txHash: txHash.toString(), - }); - return { result: TopicValidatorResult.Reject, severity: PeerErrorSeverity.HighToleranceError }; + return Promise.resolve({ result: TopicValidatorResult.Reject, severity: PeerErrorSeverity.LowToleranceError }); } + return this.processor.validateAndStoreTx(tx, source); }; const { result, obj: tx } = await this.validateReceivedMessage(validationFunc, msgId, source, TopicType.tx); @@ -1117,7 +833,7 @@ export class LibP2PService extends WithTracer implements P2PService { severity: PeerErrorSeverity.LowToleranceError, }); } - return this.validateAndStoreCheckpointAttestation(source, attestation); + return this.processor.validateAndStoreCheckpointAttestation(source, attestation); }, msgId, source, @@ -1139,83 +855,13 @@ export class LibP2PService extends WithTracer implements P2PService { ); } - /** Validates a checkpoint attestation and adds it to the pool. Penalizes the peer if validation fails. */ - @trackSpan('Libp2pService.validateAndStoreCheckpointAttestation', (_peerId, attestation) => ({ - [Attributes.SLOT_NUMBER]: attestation.payload.header.slotNumber.toString(), - })) - protected async validateAndStoreCheckpointAttestation( - peerId: PeerId, - attestation: CheckpointAttestation, - ): Promise> { - const validationResult = await this.checkpointAttestationValidator.validate(attestation); - - if (validationResult.result === 'reject') { - this.logger.warn(`Penalizing peer ${peerId} for checkpoint attestation validation failure`); - return { result: TopicValidatorResult.Reject, severity: validationResult.severity }; - } - - if (validationResult.result === 'ignore') { - return { result: TopicValidatorResult.Ignore, obj: attestation }; - } - - // Try to add the attestation: this handles existence check, cap check, and adding in one call - // count is the number of attestations by this signer for this slot (for duplicate detection) - const slot = attestation.payload.header.slotNumber; - const { added, alreadyExists, count } = - await this.mempools.attestationPool.tryAddCheckpointAttestation(attestation); - - this.logger.trace(`Validate propagated checkpoint attestation`, { - added, - alreadyExists, - count, - [Attributes.SLOT_NUMBER]: slot.toString(), - [Attributes.P2P_ID]: peerId.toString(), - }); - - // Exact same attestation received, no need to re-broadcast - if (alreadyExists) { - return { result: TopicValidatorResult.Ignore, obj: attestation }; - } - - // Could not add (cap reached for signer), penalize and do not re-broadcast - if (!added) { - this.logger.warn(`Rejecting checkpoint attestation due to cap`, { - slot: slot.toString(), - archive: attestation.archive.toString(), - source: peerId.toString(), - attester: attestation.getSender()?.toString(), - count, - }); - return { result: TopicValidatorResult.Reject, severity: PeerErrorSeverity.HighToleranceError }; - } - - // Check if this is a duplicate attestation (signer attested to a different proposal at the same slot) - // count is the number of attestations by this signer for this slot - if (count === 2) { - const attester = attestation.getSender(); - if (attester) { - this.logger.warn(`Detected duplicate attestation (equivocation) at slot ${slot}`, { - slot: slot.toString(), - archive: attestation.archive.toString(), - source: peerId.toString(), - attester: attester.toString(), - }); - this.duplicateAttestationCallback?.({ slot, attester }); - } - } - - // Attestation was added successfully - accept it so other nodes can also detect the equivocation - this.checkpointAttestationCallback?.(attestation); - return { result: TopicValidatorResult.Accept, obj: attestation }; - } - protected async processBlockFromPeer(payloadData: Buffer, msgId: string, source: PeerId): Promise { const { result, obj: block, metadata: { isEquivocated } = {}, } = await this.validateReceivedMessage( - () => this.validateAndStoreBlockProposal(source, BlockProposal.fromBuffer(payloadData)), + () => this.processor.validateAndStoreBlockProposal(source, BlockProposal.fromBuffer(payloadData)), msgId, source, TopicType.block_proposal, @@ -1226,104 +872,7 @@ export class LibP2PService extends WithTracer implements P2PService { return; } - await this.processValidBlockProposal(block, source); - } - - /** Validates a block proposal. Triggers a penalization to the peer that sent it if invalid. Adds to the mempool if valid. */ - @trackSpan('Libp2pService.validateAndStoreBlockProposal', (_peerId, block) => ({ - [Attributes.BLOCK_NUMBER]: block.blockNumber.toString(), - [Attributes.SLOT_NUMBER]: block.slotNumber.toString(), - })) - protected async validateAndStoreBlockProposal( - peerId: PeerId, - block: BlockProposal, - ): Promise> { - const validationResult = await this.blockProposalValidator.validate(block); - - if (validationResult.result === 'reject') { - this.logger.warn(`Penalizing peer ${peerId} for block proposal validation failure`); - return { result: TopicValidatorResult.Reject, severity: validationResult.severity }; - } - - if (validationResult.result === 'ignore') { - return { result: TopicValidatorResult.Ignore, obj: block }; - } - - // Try to add the proposal: this handles existence check, cap check, and adding in one call - const { added, alreadyExists, count } = await this.mempools.attestationPool.tryAddBlockProposal(block); - const isEquivocated = count !== undefined && count > 1; - - // Duplicate proposal received, no need to re-broadcast - if (alreadyExists) { - this.logger.debug(`Ignoring duplicate block proposal received`, { - ...block.toBlockInfo(), - indexWithinCheckpoint: block.indexWithinCheckpoint, - proposer: block.getSender()?.toString(), - source: peerId.toString(), - }); - return { result: TopicValidatorResult.Ignore, obj: block, metadata: { isEquivocated } }; - } - - // Too many blocks received for this slot and index, penalize peer and do not re-broadcast - if (!added) { - this.logger.warn(`Penalizing peer for block proposal exceeding per-position cap`, { - ...block.toBlockInfo(), - indexWithinCheckpoint: block.indexWithinCheckpoint, - count, - proposer: block.getSender()?.toString(), - source: peerId.toString(), - }); - return { - result: TopicValidatorResult.Reject, - metadata: { isEquivocated }, - severity: PeerErrorSeverity.HighToleranceError, - }; - } - - // If this was a duplicate proposal, do not process it, but do invoke the duplicate callback, - // and do re-broadcast it so other nodes in the network know to slash the proposer - if (isEquivocated) { - const proposer = block.getSender(); - this.logger.warn(`Detected duplicate block proposal (equivocation) at slot ${block.slotNumber}`, { - ...block.toBlockInfo(), - source: peerId.toString(), - proposer: proposer?.toString(), - }); - // Invoke the duplicate callback on the first duplicate spotted only - if (proposer && count === 2) { - this.duplicateProposalCallback?.({ slot: block.slotNumber, proposer, type: 'block' }); - } - return { result: TopicValidatorResult.Accept, obj: block, metadata: { isEquivocated } }; - } - - // Otherwise, we're good to go! - return { result: TopicValidatorResult.Accept, obj: block }; - } - - // REFACTOR(palla): This method should be moved to the p2p_client or to a separate component, - // should not be here as it does not deal with p2p networking. - @trackSpan('Libp2pService.processValidBlockProposal', async block => ({ - [Attributes.SLOT_NUMBER]: block.slotNumber, - [Attributes.BLOCK_ARCHIVE]: block.archive.toString(), - [Attributes.P2P_ID]: await block.p2pMessageLoggingIdentifier().then(i => i.toString()), - })) - protected async processValidBlockProposal(block: BlockProposal, sender: PeerId) { - const slot = block.slotNumber; - this.logger.verbose(`Received block proposal for slot ${slot} from external peer ${sender.toString()}.`, { - p2pMessageIdentifier: await block.p2pMessageLoggingIdentifier(), - source: sender.toString(), - ...block.toBlockInfo(), - }); - - // Mark the txs in this proposal as protected - await this.mempools.txPool.protectTxs(block.txHashes, block.blockHeader); - - // Call the block received callback to validate the proposal. - // Note: Validators do NOT attest to individual blocks, only to checkpoint proposals. - const isValid = await this.blockReceivedCallback(block, sender); - if (!isValid) { - this.logger.info(`Block proposal validation failed for block ${block.blockNumber}`, block.toBlockInfo()); - } + await this.processor.processValidBlockProposal(block, source); } /** @@ -1336,7 +885,7 @@ export class LibP2PService extends WithTracer implements P2PService { obj: checkpoint, metadata: { isEquivocated, processBlock } = {}, } = await this.validateReceivedMessage( - () => this.validateAndStoreCheckpointProposal(source, CheckpointProposal.fromBuffer(payloadData)), + () => this.processor.validateAndStoreCheckpointProposal(source, CheckpointProposal.fromBuffer(payloadData)), msgId, source, TopicType.checkpoint_proposal, @@ -1345,14 +894,14 @@ export class LibP2PService extends WithTracer implements P2PService { // Process checkpoint proposal if valid and not equivocated. const processCheckpointFn = () => result === TopicValidatorResult.Accept && checkpoint && !isEquivocated - ? this.processValidCheckpointProposal(checkpoint.toCore(), source) + ? this.processor.processValidCheckpointProposal(checkpoint.toCore(), source) : Promise.resolve(); // If the checkpoint contained a valid last block, we process it even if the checkpoint itself is to be rejected // TODO(palla/mbps): Is this ok? Should we be considering a block from a checkpoint that was equivocated? const processBlockFn = () => processBlock && checkpoint && checkpoint.getBlockProposal() - ? this.processValidBlockProposal(checkpoint.getBlockProposal()!, source) + ? this.processor.processValidBlockProposal(checkpoint.getBlockProposal()!, source) : Promise.resolve(); // A node that skips checkpoint validation attests without re-executing the embedded last block, so run @@ -1371,146 +920,6 @@ export class LibP2PService extends WithTracer implements P2PService { await processCheckpointFn(); } - /** - * Validates a checkpoint proposal. Penalizes peer if validation fails. Adds the checkpoint and - * its last block (if present) to the mempool if valid. Triggers equivocation detection on both. - */ - @trackSpan('Libp2pService.validateAndStoreCheckpointProposal', (_peerId, checkpoint) => ({ - [Attributes.SLOT_NUMBER]: checkpoint.slotNumber.toString(), - })) - protected async validateAndStoreCheckpointProposal( - peerId: PeerId, - checkpoint: CheckpointProposal, - ): Promise> { - const validationResult = await this.checkpointProposalValidator.validate(checkpoint); - - if (validationResult.result === 'reject') { - this.logger.warn(`Penalizing peer ${peerId} for checkpoint proposal validation failure`); - return { result: TopicValidatorResult.Reject, severity: validationResult.severity }; - } - - if (validationResult.result === 'ignore') { - return { result: TopicValidatorResult.Ignore, obj: checkpoint }; - } - - // Extract and try to add the block proposal first if present - const blockProposal = checkpoint.getBlockProposal(); - let processBlock = false; - if (blockProposal) { - this.logger.debug(`Validating block proposal from propagated checkpoint`, { - [Attributes.SLOT_NUMBER]: checkpoint.slotNumber.toString(), - [Attributes.P2P_ID]: peerId.toString(), - }); - const blockProposalResult = await this.validateAndStoreBlockProposal(peerId, blockProposal); - const { obj, metadata: { isEquivocated } = {} } = blockProposalResult; - if (blockProposalResult.result === TopicValidatorResult.Reject || !obj || isEquivocated) { - this.logger.debug(`Rejecting checkpoint due to invalid last block proposal`, { - [Attributes.SLOT_NUMBER]: checkpoint.slotNumber.toString(), - [Attributes.P2P_ID]: peerId.toString(), - isEquivocated, - result: blockProposalResult.result, - }); - return { - result: TopicValidatorResult.Reject, - severity: - 'severity' in blockProposalResult ? blockProposalResult.severity : PeerErrorSeverity.MidToleranceError, - }; - } else if (blockProposalResult.result === TopicValidatorResult.Accept && obj && !isEquivocated) { - processBlock = true; - } - } - - // Try to add the checkpoint proposal core: this handles existence check, cap check, and adding in one call - const checkpointCore = checkpoint.toCore(); - const tryAddResult = await this.mempools.attestationPool.tryAddCheckpointProposal(checkpointCore); - const { added, alreadyExists, count } = tryAddResult; - const isEquivocated = count !== undefined && count > 1; - - // Duplicate proposal received, do not re-broadcast - if (alreadyExists) { - this.logger.debug(`Ignoring duplicate checkpoint proposal received`, { - ...checkpoint.toCheckpointInfo(), - source: peerId.toString(), - }); - return { - result: TopicValidatorResult.Ignore, - obj: checkpoint, - metadata: { isEquivocated, processBlock }, - }; - } - - // Too many checkpoint proposals received for this slot, penalize peer and do not re-broadcast - // Note: We still return the checkpoint obj so the lastBlock can be processed if valid - if (!added) { - this.logger.warn(`Penalizing peer for checkpoint proposal exceeding per-slot cap`, { - ...checkpoint.toCheckpointInfo(), - count, - source: peerId.toString(), - }); - return { - result: TopicValidatorResult.Reject, - obj: checkpoint, - metadata: { isEquivocated, processBlock }, - severity: PeerErrorSeverity.HighToleranceError, - }; - } - - // If this was a duplicate proposal, do not process it, but do invoke the duplicate callback, - // and do re-broadcast it so other nodes in the network know to slash the proposer - if (isEquivocated) { - const proposer = checkpoint.getSender(); - this.logger.warn(`Detected duplicate checkpoint proposal (equivocation) at slot ${checkpoint.slotNumber}`, { - ...checkpoint.toCheckpointInfo(), - source: peerId.toString(), - proposer: proposer?.toString(), - }); - // Invoke the duplicate callback on the first duplicate spotted only - if (proposer && count === 2) { - this.duplicateProposalCallback?.({ slot: checkpoint.slotNumber, proposer, type: 'checkpoint' }); - } - return { - result: TopicValidatorResult.Accept, - obj: checkpoint, - metadata: { isEquivocated, processBlock }, - }; - } - - // Otherwise, we're good to go! - return { result: TopicValidatorResult.Accept, obj: checkpoint, metadata: { processBlock, isEquivocated } }; - } - - /** - * Process a validated checkpoint proposal. - * Note: The proposal was already added to the pool by tryAddCheckpointProposal in handleGossipedCheckpointProposal. - */ - @trackSpan('Libp2pService.processValidCheckpointProposal', async checkpoint => ({ - [Attributes.SLOT_NUMBER]: checkpoint.slotNumber, - [Attributes.BLOCK_ARCHIVE]: checkpoint.archive.toString(), - [Attributes.P2P_ID]: await checkpoint.p2pMessageLoggingIdentifier().then(i => i.toString()), - })) - protected async processValidCheckpointProposal(checkpoint: CheckpointProposalCore, sender: PeerId) { - const slot = checkpoint.slotNumber; - this.logger.verbose(`Received checkpoint proposal for slot ${slot} from external peer ${sender.toString()}.`, { - p2pMessageIdentifier: await checkpoint.p2pMessageLoggingIdentifier(), - slot: checkpoint.slotNumber, - archive: checkpoint.archive.toString(), - source: sender.toString(), - }); - - await this.allNodesCheckpointReceivedCallback(checkpoint, sender); - - // Call the checkpoint received callback with the core version (without lastBlock) - // to validate and potentially generate attestations - const attestations = await this.validatorCheckpointReceivedCallback(checkpoint, sender); - if (attestations && attestations.length > 0) { - // If the callback returned attestations, add them to the pool and propagate them - await this.mempools.attestationPool.addOwnCheckpointAttestations(attestations); - for (const attestation of attestations) { - await this.propagate(attestation); - } - } - } - /** * Propagates provided message to peers. * @param message - The message to propagate. @@ -1523,116 +932,6 @@ export class LibP2PService extends WithTracer implements P2PService { }); } - /** - * Validate the requested block transactions request-response consistency. - * It does NOT validate the transactions themselves. - * @param request - The block transactions request. - * @param response - The block transactions response. - * @param peerId - The ID of the peer that made the request. - * @returns True if the request-response is consistent, false otherwise. - */ - @trackSpan('Libp2pService.validateRequestedBlockTxsConsistency', request => ({ - [Attributes.BLOCK_ARCHIVE]: request.archiveRoot.toString(), - })) - protected async validateRequestedBlockTxsConsistency( - request: BlockTxsRequest, - response: BlockTxsResponse, - peerId: PeerId, - ): Promise { - try { - // A response with archiveRoot=Fr.zero is the documented "I don't have the block" signal from - // reqRespBlockTxsHandler (block_txs_handler.ts:54-58): the peer lacked the block in its - // attestation pool and archiver, but matched the requested hashes against its tx pool and - // shipped what it found. This is legitimate behaviour, not misbehaviour — we just can't verify - // membership/order without the block, so we drop the response without penalising the peer. - if (response.archiveRoot.isZero()) { - this.logger.debug(`Peer ${peerId.toString()} signalled missing block with Fr.zero archive root`); - return false; - } - - if (!response.archiveRoot.equals(request.archiveRoot)) { - this.peerManager.penalizePeer(peerId, PeerErrorSeverity.MidToleranceError); - throw new ValidationError( - `Received block txs for unexpected archive root: expected ${request.archiveRoot.toString()}, got ${response.archiveRoot.toString()}`, - ); - } - - if (response.txIndices.getLength() !== request.txIndices.getLength()) { - this.peerManager.penalizePeer(peerId, PeerErrorSeverity.MidToleranceError); - throw new ValidationError( - `Received block txs with mismatched bitvector length: expected ${request.txIndices.getLength()}, got ${response.txIndices.getLength()}`, - ); - } - - // Check no duplicates and not exceeding returnable count - const requestedIndices = new Set(request.txIndices.getTrueIndices()); - const availableIndices = new Set(response.txIndices.getTrueIndices()); - const maxReturnable = [...requestedIndices].filter(i => availableIndices.has(i)).length; - - const returnedHashes = await Promise.all(response.txs.map(tx => tx.getTxHash().toString())); - const uniqueReturned = new Set(returnedHashes.map(h => h.toString())); - if (uniqueReturned.size !== returnedHashes.length) { - this.peerManager.penalizePeer(peerId, PeerErrorSeverity.MidToleranceError); - throw new ValidationError(`Received duplicate txs in block txs response`); - } - if (response.txs.length > maxReturnable) { - this.peerManager.penalizePeer(peerId, PeerErrorSeverity.MidToleranceError); - throw new ValidationError( - `Received more txs (${response.txs.length}) than requested-and-available (${maxReturnable})`, - ); - } - - // To verify membership/order of the returned txs we need the canonical tx hash list for the - // block. Prefer the block proposal (held while a block is in flight), but fall back to the - // archiver for blocks we only know as mined — e.g. a prover collecting txs to prove a block it - // never received a proposal for. This mirrors the responder side (reqRespBlockTxsHandler), - // which serves from proposal-or-archiver. - const proposal = await this.mempools.attestationPool.getBlockProposalByArchive(request.archiveRoot.toString()); - const blockTxHashes = - proposal?.txHashes ?? - (await this.archiver.getBlock({ archive: request.archiveRoot }))?.body.txEffects.map(e => e.txHash); - - if (blockTxHashes) { - // Build intersected indices - const intersectIdx = request.txIndices.getTrueIndices().filter(i => response.txIndices.isSet(i)); - - // Enforce subset membership and preserve increasing order by index. - const hashToIndexInBlock = new Map( - blockTxHashes.map((h, i) => [h.toString(), i] as [string, number]), - ); - const allowedIndexSet = new Set(intersectIdx); - const indices = returnedHashes.map(h => hashToIndexInBlock.get(h)); - const allAllowed = indices.every(idx => idx !== undefined && allowedIndexSet.has(idx)); - const strictlyIncreasing = indices.every((idx, i) => (i === 0 ? idx !== undefined : idx! > indices[i - 1]!)); - if (!allAllowed || !strictlyIncreasing) { - this.peerManager.penalizePeer(peerId, PeerErrorSeverity.LowToleranceError); - throw new ValidationError('Returned txs do not match expected subset/order for requested indices'); - } - } else { - // Neither a local proposal nor an archived block: we cannot verify membership/order of the - // returned txs. This is a local-state gap, not a peer fault, so we do not penalize. - this.logger.warn( - `Block ${request.archiveRoot.toString()} not found in attestation pool or archiver; cannot validate membership/order of returned txs`, - ); - return false; - } - - return true; - } catch (e: any) { - if (e instanceof ValidationError) { - this.logger.warn(`Failed validation for requested block txs from peer ${peerId.toString()}`); - } else { - this.logger.error(`Error during validation of requested block txs`, e); - } - - return false; - } - } - - private getGasFees(): Promise { - return this.blockMinFeesProvider.getCurrentMinFees(); - } - /** * Get the BatchTxRequesterLibP2PService dependencies for creating BatchTxRequester instances */ @@ -1640,145 +939,19 @@ export class LibP2PService extends WithTracer implements P2PService { return { reqResp: this.reqresp, connectionSampler: this.reqresp.getConnectionSampler(), - txValidatorConfig: { - l1ChainId: this.config.l1ChainId, - rollupVersion: this.config.rollupVersion, - proofVerifier: this.proofVerifier, - txValidationCache: this.txValidationCache, - }, + txValidatorConfig: this.processor.getBatchTxValidatorConfig(), peerScoring: this.peerManager, - validateRequestedBlockTxsConsistency: this.validateRequestedBlockTxsConsistency.bind(this), + validateRequestedBlockTxsConsistency: this.processor.validateRequestedBlockTxsConsistency.bind(this.processor), }; } - public async validateTxsReceivedInBlockProposal(txs: Tx[]): Promise { - const validator = createTxValidatorForBlockProposalReceivedTxs( - this.proofVerifier, - { l1ChainId: this.config.l1ChainId, rollupVersion: this.config.rollupVersion }, - this.logger.getBindings(), - this.txValidationCache, - ); - - const results = await Promise.all( - txs.map(async tx => { - const result = await validator.validateTx(tx); - return result.result !== 'invalid'; - }), - ); - if (results.some(value => value === false)) { - throw new Error('Invalid tx detected'); - } + public validateTxsReceivedInBlockProposal(txs: Tx[]): Promise { + return this.processor.validateTxsReceivedInBlockProposal(txs); } - /** Creates the first stage (fast) validators for gossiped transactions. */ - protected async createFirstStageMessageValidators( - currentBlockNumber: BlockNumber, - nextSlotTimestamp: UInt64, - ): Promise> { - const gasFees = await this.getGasFees(); - const allowedInSetup = [ - ...(await getDefaultAllowedSetupFunctions()), - ...(this.config.txPublicSetupAllowListExtend ?? []), - ]; - const blockNumber = BlockNumber(currentBlockNumber + 1); - const l1Constants = await this.archiver.getL1Constants(); - - return createFirstStageTxValidationsForGossipedTransactions( - nextSlotTimestamp, - blockNumber, - this.worldStateSynchronizer, - gasFees, - this.config.l1ChainId, - this.config.rollupVersion, - protocolContractsHash, - this.archiver, - !this.config.disableTransactions, - allowedInSetup, - this.logger.getBindings(), - { - rollupManaLimit: l1Constants.rollupManaLimit, - maxBlockL2Gas: this.config.validateMaxL2BlockGas, - maxBlockDAGas: this.config.validateMaxDABlockGas, - }, - ); - } - - /** Creates the second stage (expensive proof verification) validators for gossiped transactions. */ - protected createSecondStageMessageValidators(): Record { - return createSecondStageTxValidationsForGossipedTransactions(this.proofVerifier, this.logger.getBindings()); - } - - /** - * Run validations on a tx. - * @param tx - The tx to validate. - * @param messageValidators - The message validators to run. - * @returns The validation outcome. - */ - private async runValidations( - tx: Tx, - messageValidators: Record, - ): Promise { - const validationPromises = Object.entries(messageValidators).map(async ([name, { validator, severity }]) => { - const { result } = await validator.validateTx(tx); - return { name, isValid: result !== 'invalid', severity }; - }); - - // A promise that resolves when all validations have been run - const allValidations = await Promise.all(validationPromises); - const failures = allValidations.filter(x => !x.isValid); - if (failures.length > 0) { - // Pick the most severe failure (lowest tolerance = harshest penalty) - const failed = maxBy(failures, f => PeerErrorSeverityByHarshness.indexOf(f.severity))!; - return { - allPassed: false, - failure: { - isValid: { result: 'invalid' as const, reason: ['Failed validation'] }, - name: failed.name, - severity: failed.severity, - }, - }; - } else { - return { - allPassed: true, - }; - } - } - - /** - * Handle a double spend failure. - * - * Double spend failures are managed on their own because they are a special case. - * We must check if the double spend is recent or old, if it is past a threshold, then we heavily penalize the peer. - * - * @param tx - The tx that failed the double spend validator. - * @param blockNumber - The block number of the tx. - * @param peerId - The peer ID of the peer that sent the tx. - * @returns Severity - */ - private async handleDoubleSpendFailure(tx: Tx, blockNumber: BlockNumber): Promise { - if (blockNumber <= this.config.doubleSpendSeverePeerPenaltyWindow) { - return PeerErrorSeverity.HighToleranceError; - } - - const snapshotValidator = new DoubleSpendTxValidator( - { - nullifiersExist: async (nullifiers: Buffer[]) => { - const merkleTree = this.worldStateSynchronizer.getSnapshot( - BlockNumber(blockNumber - this.config.doubleSpendSeverePeerPenaltyWindow), - ); - const indices = await merkleTree.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, nullifiers); - return indices.map(index => index !== undefined); - }, - }, - this.logger.getBindings(), - ); - - const validSnapshot = await snapshotValidator.validateTx(tx); - if (validSnapshot.result !== 'valid') { - return PeerErrorSeverity.LowToleranceError; - } - - return PeerErrorSeverity.HighToleranceError; + /** Applies a peer-scoring penalty. Exposed for the message processor to penalize peers during validation. */ + public penalizePeer(peerId: PeerId, severity: PeerErrorSeverity): void { + this.peerManager.penalizePeer(peerId, severity); } public getPeerScore(peerId: PeerId): number { diff --git a/yarn-project/p2p/src/services/libp2p/p2p_message_processor.ts b/yarn-project/p2p/src/services/libp2p/p2p_message_processor.ts new file mode 100644 index 000000000000..378799a51553 --- /dev/null +++ b/yarn-project/p2p/src/services/libp2p/p2p_message_processor.ts @@ -0,0 +1,895 @@ +import type { EpochCacheInterface } from '@aztec/epoch-cache'; +import { BlockNumber, type SlotNumber } from '@aztec/foundation/branded-types'; +import { maxBy } from '@aztec/foundation/collection'; +import { type Logger, createLogger } from '@aztec/foundation/log'; +import { protocolContractsHash } from '@aztec/protocol-contracts'; +import type { EthAddress, L2BlockSource } from '@aztec/stdlib/block'; +import type { ContractDataSource } from '@aztec/stdlib/contract'; +import { type BlockMinFeesProvider, GasFees } from '@aztec/stdlib/gas'; +import type { ClientProtocolCircuitVerifier, WorldStateSynchronizer } from '@aztec/stdlib/interfaces/server'; +import { + BlockProposal, + CheckpointAttestation, + CheckpointProposal, + type CheckpointProposalCore, + type Gossipable, + PeerErrorSeverity, + PeerErrorSeverityByHarshness, +} from '@aztec/stdlib/p2p'; +import { MerkleTreeId } from '@aztec/stdlib/trees'; +import { Tx, type TxValidationResult } from '@aztec/stdlib/tx'; +import type { UInt64 } from '@aztec/stdlib/types'; +import { Attributes, type TelemetryClient, WithTracer, trackSpan } from '@aztec/telemetry-client'; + +import { type PeerId, TopicValidatorResult } from '@libp2p/interface'; + +import type { P2PConfig } from '../../config.js'; +import { CheckpointProposalReceivedCallbackNotRegisteredError } from '../../errors/p2p-service.error.js'; +import type { MemPools } from '../../mem_pools/interface.js'; +import { + BlockProposalValidator, + CheckpointAttestationValidator, + CheckpointProposalValidator, + DoubleSpendTxValidator, + FishermanAttestationValidator, + getDefaultAllowedSetupFunctions, +} from '../../msg_validators/index.js'; +import { + type TransactionValidator, + createFirstStageTxValidationsForGossipedTransactions, + createSecondStageTxValidationsForGossipedTransactions, + createTxValidatorForBlockProposalReceivedTxs, +} from '../../msg_validators/tx_validator/factory.js'; +import type { TxValidationCache } from '../../msg_validators/tx_validator/tx_validation_cache.js'; +import type { BatchRequestTxValidatorConfig } from '../reqresp/batch-tx-requester/tx_validator.js'; +import { + BlockTxsRequest, + BlockTxsResponse, + ReqRespSubProtocol, + type ReqRespSubProtocolHandlers, + ValidationError, + reqRespBlockTxsHandler, + reqRespStatusHandler, + reqRespTxHandler, +} from '../reqresp/index.js'; +import type { + P2PBlockReceivedCallback, + P2PCheckpointAttestationCallback, + P2PCheckpointReceivedCallback, + P2PDuplicateAttestationCallback, +} from '../service.js'; + +interface ValidationResult { + name: string; + isValid: TxValidationResult; + severity: PeerErrorSeverity; +} + +type ValidationOutcome = { allPassed: true } | { allPassed: false; failure: ValidationResult }; + +// REFACTOR: Unify with the type above +export type ReceivedMessageValidationResult = + | { obj: T; result: Exclude; metadata?: M } + | { obj?: T; result: TopicValidatorResult.Reject; metadata?: M; severity: PeerErrorSeverity }; + +/** + * The subset of libp2p network operations the message processor invokes while handling received + * messages: re-broadcasting (propagate) and peer scoring (penalizePeer). Kept as a narrow interface + * so the processor stays decoupled from the libp2p node itself. + */ +export interface P2PNetwork { + propagate(message: T): Promise; + penalizePeer(peerId: PeerId, severity: PeerErrorSeverity): void; +} + +/** + * Handles the content of P2P messages received over gossip and request/response: validation against + * node state (world state, archiver, mempools), persistence into the mempools, and dispatch to the + * consensus callbacks. This is the part of the P2P stack that depends on main-thread node state, as + * opposed to the libp2p networking machinery in {@link LibP2PService}. + */ +export class P2PMessageProcessor extends WithTracer { + // Message validators + private blockProposalValidator: BlockProposalValidator; + private checkpointProposalValidator: CheckpointProposalValidator; + private checkpointAttestationValidator: CheckpointAttestationValidator; + + /** Callback invoked when a duplicate proposal is detected (triggers slashing). */ + private duplicateProposalCallback?: (info: { + slot: SlotNumber; + proposer: EthAddress; + type: 'checkpoint' | 'block'; + }) => void; + + /** Callback invoked when a duplicate attestation is detected (triggers slashing). */ + private duplicateAttestationCallback?: P2PDuplicateAttestationCallback; + + /** Callback invoked when a valid checkpoint attestation is accepted into the pool. */ + private checkpointAttestationCallback?: P2PCheckpointAttestationCallback; + + /** + * Callback for when a block is received from a peer. + * @param block - The block received from the peer. + * @returns The attestation for the block, if any. + */ + private blockReceivedCallback: P2PBlockReceivedCallback; + + /** + * Callback for when a checkpoint proposal is received from a peer. + * @param checkpoint - The checkpoint proposal received from the peer. + * @returns The attestations for the checkpoint, if any. + */ + private allNodesCheckpointReceivedCallback: P2PCheckpointReceivedCallback; + /** + * Callback for when a checkpoint proposal is received - specifically for validators - from a peer. + * @param checkpoint - The checkpoint proposal received from the peer. + * @returns The attestations for the checkpoint, if any. + */ + private validatorCheckpointReceivedCallback: P2PCheckpointReceivedCallback; + + protected logger: Logger; + + private network?: P2PNetwork; + + constructor( + private config: P2PConfig, + protected mempools: MemPools, + protected archiver: L2BlockSource & ContractDataSource, + private epochCache: EpochCacheInterface, + private proofVerifier: ClientProtocolCircuitVerifier, + private worldStateSynchronizer: WorldStateSynchronizer, + private blockMinFeesProvider: BlockMinFeesProvider, + telemetry: TelemetryClient, + logger: Logger = createLogger('p2p:libp2p_service'), + private txValidationCache?: TxValidationCache, + ) { + super(telemetry, 'P2PMessageProcessor'); + + // Create child logger with fisherman prefix if in fisherman mode + this.logger = config.fishermanMode ? logger.createChild('[FISHERMAN]') : logger; + + const p2pPropagationTime = config.attestationPropagationTime; + const proposalValidatorOpts = { + txsPermitted: !config.disableTransactions, + maxTxsPerBlock: config.validateMaxTxsPerBlock ?? config.validateMaxTxsPerCheckpoint, + maxBlocksPerCheckpoint: config.maxBlocksPerCheckpoint, + p2pPropagationTime, + skipSlotValidation: config.skipProposalSlotValidation, + signatureContext: { + chainId: config.l1ChainId, + rollupAddress: config.rollupAddress, + }, + }; + this.blockProposalValidator = new BlockProposalValidator(epochCache, proposalValidatorOpts); + this.checkpointProposalValidator = new CheckpointProposalValidator(epochCache, proposalValidatorOpts); + const attestationValidatorOpts = { + l1PublishingTime: config.l1PublishingTime, + p2pPropagationTime, + signatureContext: proposalValidatorOpts.signatureContext, + }; + this.checkpointAttestationValidator = config.fishermanMode + ? new FishermanAttestationValidator(epochCache, mempools.attestationPool, telemetry, attestationValidatorOpts) + : new CheckpointAttestationValidator(epochCache, attestationValidatorOpts); + + this.blockReceivedCallback = async (block: BlockProposal): Promise => { + this.logger.warn( + `Handler for block received not yet registered on P2P service. Received block ${block.blockNumber} for slot ${block.slotNumber} from peer.`, + { p2pMessageIdentifier: await block.p2pMessageLoggingIdentifier() }, + ); + return true; + }; + + this.allNodesCheckpointReceivedCallback = ( + _checkpoint: CheckpointProposalCore, + ): Promise => { + throw new CheckpointProposalReceivedCallbackNotRegisteredError(); + }; + + this.validatorCheckpointReceivedCallback = ( + _checkpoint: CheckpointProposalCore, + ): Promise => { + return Promise.resolve(undefined); + }; + } + + /** Wires up the libp2p network operations the processor invokes. Called once during construction of the service. */ + public setNetwork(network: P2PNetwork): void { + this.network = network; + } + + private get net(): P2PNetwork { + if (!this.network) { + throw new Error('P2PMessageProcessor network not set'); + } + return this.network; + } + + public registerBlockReceivedCallback(callback: P2PBlockReceivedCallback) { + this.blockReceivedCallback = callback; + } + + public registerValidatorCheckpointReceivedCallback(callback: P2PCheckpointReceivedCallback) { + this.validatorCheckpointReceivedCallback = callback; + } + + public registerAllNodesCheckpointReceivedCallback(callback: P2PCheckpointReceivedCallback) { + this.allNodesCheckpointReceivedCallback = callback; + } + + /** + * Registers a callback to be invoked when a duplicate proposal is detected. + * This callback is triggered on the first duplicate (when count goes from 1 to 2). + */ + public registerDuplicateProposalCallback( + callback: (info: { slot: SlotNumber; proposer: EthAddress; type: 'checkpoint' | 'block' }) => void, + ): void { + this.duplicateProposalCallback = callback; + } + + /** + * Registers a callback to be invoked when a duplicate attestation is detected. + * A validator signing attestations for different proposals at the same slot. + * This callback is triggered on the first duplicate (when count goes from 1 to 2). + */ + public registerDuplicateAttestationCallback(callback: P2PDuplicateAttestationCallback): void { + this.duplicateAttestationCallback = callback; + } + + public registerCheckpointAttestationCallback(callback: P2PCheckpointAttestationCallback): void { + this.checkpointAttestationCallback = callback; + } + + /** + * Validates a gossiped transaction against node state and, if valid, persists it to the tx pool. + * @returns The gossip validation result, indicating whether to re-broadcast the tx. + */ + public async validateAndStoreTx(tx: Tx, source: PeerId): Promise> { + const currentBlockNumber = await this.archiver.getBlockNumber(); + const { ts: nextSlotTimestamp } = this.epochCache.getEpochAndSlotInNextL1Slot(); + + // Stage 1: fast validators (metadata, data, timestamps, double-spend, gas, phases, block header) + const firstStageValidators = await this.createFirstStageMessageValidators(currentBlockNumber, nextSlotTimestamp); + const firstStageOutcome = await this.runValidations(tx, firstStageValidators); + if (!firstStageOutcome.allPassed) { + const { name } = firstStageOutcome.failure; + let { severity } = firstStageOutcome.failure; + + // Double spend validator has a special case handler. We perform more detailed examination + // as to how recently the nullifier was entered into the tree and if the transaction should + // have 'known' the nullifier existed. This determines the severity of the penalty applied to the peer. + if (name === 'doubleSpendValidator') { + const txBlockNumber = BlockNumber(currentBlockNumber + 1); + severity = await this.handleDoubleSpendFailure(tx, txBlockNumber); + } + + this.logger.verbose(`Rejecting gossiped tx ${tx.getTxHash().toString()}: stage 1 validation failed`, { + validator: name, + severity, + source: source.toString(), + }); + return { result: TopicValidatorResult.Reject, severity }; + } + + // Pool pre-check: see if the pool would accept this tx before doing expensive proof verification + const canAdd = await this.mempools.txPool.canAddPendingTx(tx); + if (canAdd === 'ignored') { + this.logger.verbose(`Ignoring gossiped tx ${tx.getTxHash().toString()}: pool pre-check returned ignored`, { + source: source.toString(), + }); + return { result: TopicValidatorResult.Ignore, obj: tx }; + } + + // Stage 2: expensive proof verification + const secondStageValidators = this.createSecondStageMessageValidators(); + const secondStageOutcome = await this.runValidations(tx, secondStageValidators); + if (!secondStageOutcome.allPassed) { + const { severity, name } = secondStageOutcome.failure; + this.logger.verbose(`Rejecting gossiped tx ${tx.getTxHash().toString()}: stage 2 validation failed`, { + validator: name, + severity, + source: source.toString(), + }); + return { result: TopicValidatorResult.Reject, severity }; + } + + // Pool add: persist the tx + const txHash = tx.getTxHash(); + const addResult = await this.mempools.txPool.addPendingTxs([tx], { source: 'gossip' }); + + const wasAccepted = addResult.accepted.some(h => h.equals(txHash)); + const wasIgnored = addResult.ignored.some(h => h.equals(txHash)); + + this.logger.verbose(`Validate propagated tx ${txHash.toString()}`, { + wasAccepted, + wasIgnored, + [Attributes.P2P_ID]: source.toString(), + }); + + if (wasAccepted) { + return { result: TopicValidatorResult.Accept, obj: tx }; + } else if (wasIgnored) { + return { result: TopicValidatorResult.Ignore, obj: tx }; + } else { + this.logger.warn(`Gossiped tx ${txHash.toString()} unexpectedly rejected by pool`, { + source: source.toString(), + txHash: txHash.toString(), + }); + return { result: TopicValidatorResult.Reject, severity: PeerErrorSeverity.HighToleranceError }; + } + } + + /** Validates a checkpoint attestation and adds it to the pool. Penalizes the peer if validation fails. */ + @trackSpan('Libp2pService.validateAndStoreCheckpointAttestation', (_peerId, attestation) => ({ + [Attributes.SLOT_NUMBER]: attestation.payload.header.slotNumber.toString(), + })) + public async validateAndStoreCheckpointAttestation( + peerId: PeerId, + attestation: CheckpointAttestation, + ): Promise> { + const validationResult = await this.checkpointAttestationValidator.validate(attestation); + + if (validationResult.result === 'reject') { + this.logger.warn(`Penalizing peer ${peerId} for checkpoint attestation validation failure`); + return { result: TopicValidatorResult.Reject, severity: validationResult.severity }; + } + + if (validationResult.result === 'ignore') { + return { result: TopicValidatorResult.Ignore, obj: attestation }; + } + + // Try to add the attestation: this handles existence check, cap check, and adding in one call + // count is the number of attestations by this signer for this slot (for duplicate detection) + const slot = attestation.payload.header.slotNumber; + const { added, alreadyExists, count } = + await this.mempools.attestationPool.tryAddCheckpointAttestation(attestation); + + this.logger.trace(`Validate propagated checkpoint attestation`, { + added, + alreadyExists, + count, + [Attributes.SLOT_NUMBER]: slot.toString(), + [Attributes.P2P_ID]: peerId.toString(), + }); + + // Exact same attestation received, no need to re-broadcast + if (alreadyExists) { + return { result: TopicValidatorResult.Ignore, obj: attestation }; + } + + // Could not add (cap reached for signer), penalize and do not re-broadcast + if (!added) { + this.logger.warn(`Rejecting checkpoint attestation due to cap`, { + slot: slot.toString(), + archive: attestation.archive.toString(), + source: peerId.toString(), + attester: attestation.getSender()?.toString(), + count, + }); + return { result: TopicValidatorResult.Reject, severity: PeerErrorSeverity.HighToleranceError }; + } + + // Check if this is a duplicate attestation (signer attested to a different proposal at the same slot) + // count is the number of attestations by this signer for this slot + if (count === 2) { + const attester = attestation.getSender(); + if (attester) { + this.logger.warn(`Detected duplicate attestation (equivocation) at slot ${slot}`, { + slot: slot.toString(), + archive: attestation.archive.toString(), + source: peerId.toString(), + attester: attester.toString(), + }); + this.duplicateAttestationCallback?.({ slot, attester }); + } + } + + // Attestation was added successfully - accept it so other nodes can also detect the equivocation + this.checkpointAttestationCallback?.(attestation); + return { result: TopicValidatorResult.Accept, obj: attestation }; + } + + /** Validates a block proposal. Triggers a penalization to the peer that sent it if invalid. Adds to the mempool if valid. */ + @trackSpan('Libp2pService.validateAndStoreBlockProposal', (_peerId, block) => ({ + [Attributes.BLOCK_NUMBER]: block.blockNumber.toString(), + [Attributes.SLOT_NUMBER]: block.slotNumber.toString(), + })) + public async validateAndStoreBlockProposal( + peerId: PeerId, + block: BlockProposal, + ): Promise> { + const validationResult = await this.blockProposalValidator.validate(block); + + if (validationResult.result === 'reject') { + this.logger.warn(`Penalizing peer ${peerId} for block proposal validation failure`); + return { result: TopicValidatorResult.Reject, severity: validationResult.severity }; + } + + if (validationResult.result === 'ignore') { + return { result: TopicValidatorResult.Ignore, obj: block }; + } + + // Try to add the proposal: this handles existence check, cap check, and adding in one call + const { added, alreadyExists, count } = await this.mempools.attestationPool.tryAddBlockProposal(block); + const isEquivocated = count !== undefined && count > 1; + + // Duplicate proposal received, no need to re-broadcast + if (alreadyExists) { + this.logger.debug(`Ignoring duplicate block proposal received`, { + ...block.toBlockInfo(), + indexWithinCheckpoint: block.indexWithinCheckpoint, + proposer: block.getSender()?.toString(), + source: peerId.toString(), + }); + return { result: TopicValidatorResult.Ignore, obj: block, metadata: { isEquivocated } }; + } + + // Too many blocks received for this slot and index, penalize peer and do not re-broadcast + if (!added) { + this.logger.warn(`Penalizing peer for block proposal exceeding per-position cap`, { + ...block.toBlockInfo(), + indexWithinCheckpoint: block.indexWithinCheckpoint, + count, + proposer: block.getSender()?.toString(), + source: peerId.toString(), + }); + return { + result: TopicValidatorResult.Reject, + metadata: { isEquivocated }, + severity: PeerErrorSeverity.HighToleranceError, + }; + } + + // If this was a duplicate proposal, do not process it, but do invoke the duplicate callback, + // and do re-broadcast it so other nodes in the network know to slash the proposer + if (isEquivocated) { + const proposer = block.getSender(); + this.logger.warn(`Detected duplicate block proposal (equivocation) at slot ${block.slotNumber}`, { + ...block.toBlockInfo(), + source: peerId.toString(), + proposer: proposer?.toString(), + }); + // Invoke the duplicate callback on the first duplicate spotted only + if (proposer && count === 2) { + this.duplicateProposalCallback?.({ slot: block.slotNumber, proposer, type: 'block' }); + } + return { result: TopicValidatorResult.Accept, obj: block, metadata: { isEquivocated } }; + } + + // Otherwise, we're good to go! + return { result: TopicValidatorResult.Accept, obj: block }; + } + + // REFACTOR(palla): This method should be moved to the p2p_client or to a separate component, + // should not be here as it does not deal with p2p networking. + @trackSpan('Libp2pService.processValidBlockProposal', async block => ({ + [Attributes.SLOT_NUMBER]: block.slotNumber, + [Attributes.BLOCK_ARCHIVE]: block.archive.toString(), + [Attributes.P2P_ID]: await block.p2pMessageLoggingIdentifier().then(i => i.toString()), + })) + public async processValidBlockProposal(block: BlockProposal, sender: PeerId) { + const slot = block.slotNumber; + this.logger.verbose(`Received block proposal for slot ${slot} from external peer ${sender.toString()}.`, { + p2pMessageIdentifier: await block.p2pMessageLoggingIdentifier(), + source: sender.toString(), + ...block.toBlockInfo(), + }); + + // Mark the txs in this proposal as protected + await this.mempools.txPool.protectTxs(block.txHashes, block.blockHeader); + + // Call the block received callback to validate the proposal. + // Note: Validators do NOT attest to individual blocks, only to checkpoint proposals. + const isValid = await this.blockReceivedCallback(block, sender); + if (!isValid) { + this.logger.info(`Block proposal validation failed for block ${block.blockNumber}`, block.toBlockInfo()); + } + } + + /** + * Validates a checkpoint proposal. Penalizes peer if validation fails. Adds the checkpoint and + * its last block (if present) to the mempool if valid. Triggers equivocation detection on both. + */ + @trackSpan('Libp2pService.validateAndStoreCheckpointProposal', (_peerId, checkpoint) => ({ + [Attributes.SLOT_NUMBER]: checkpoint.slotNumber.toString(), + })) + public async validateAndStoreCheckpointProposal( + peerId: PeerId, + checkpoint: CheckpointProposal, + ): Promise> { + const validationResult = await this.checkpointProposalValidator.validate(checkpoint); + + if (validationResult.result === 'reject') { + this.logger.warn(`Penalizing peer ${peerId} for checkpoint proposal validation failure`); + return { result: TopicValidatorResult.Reject, severity: validationResult.severity }; + } + + if (validationResult.result === 'ignore') { + return { result: TopicValidatorResult.Ignore, obj: checkpoint }; + } + + // Extract and try to add the block proposal first if present + const blockProposal = checkpoint.getBlockProposal(); + let processBlock = false; + if (blockProposal) { + this.logger.debug(`Validating block proposal from propagated checkpoint`, { + [Attributes.SLOT_NUMBER]: checkpoint.slotNumber.toString(), + [Attributes.P2P_ID]: peerId.toString(), + }); + const blockProposalResult = await this.validateAndStoreBlockProposal(peerId, blockProposal); + const { obj, metadata: { isEquivocated } = {} } = blockProposalResult; + if (blockProposalResult.result === TopicValidatorResult.Reject || !obj || isEquivocated) { + this.logger.debug(`Rejecting checkpoint due to invalid last block proposal`, { + [Attributes.SLOT_NUMBER]: checkpoint.slotNumber.toString(), + [Attributes.P2P_ID]: peerId.toString(), + isEquivocated, + result: blockProposalResult.result, + }); + return { + result: TopicValidatorResult.Reject, + severity: + 'severity' in blockProposalResult ? blockProposalResult.severity : PeerErrorSeverity.MidToleranceError, + }; + } else if (blockProposalResult.result === TopicValidatorResult.Accept && obj && !isEquivocated) { + processBlock = true; + } + } + + // Try to add the checkpoint proposal core: this handles existence check, cap check, and adding in one call + const checkpointCore = checkpoint.toCore(); + const tryAddResult = await this.mempools.attestationPool.tryAddCheckpointProposal(checkpointCore); + const { added, alreadyExists, count } = tryAddResult; + const isEquivocated = count !== undefined && count > 1; + + // Duplicate proposal received, do not re-broadcast + if (alreadyExists) { + this.logger.debug(`Ignoring duplicate checkpoint proposal received`, { + ...checkpoint.toCheckpointInfo(), + source: peerId.toString(), + }); + return { + result: TopicValidatorResult.Ignore, + obj: checkpoint, + metadata: { isEquivocated, processBlock }, + }; + } + + // Too many checkpoint proposals received for this slot, penalize peer and do not re-broadcast + // Note: We still return the checkpoint obj so the lastBlock can be processed if valid + if (!added) { + this.logger.warn(`Penalizing peer for checkpoint proposal exceeding per-slot cap`, { + ...checkpoint.toCheckpointInfo(), + count, + source: peerId.toString(), + }); + return { + result: TopicValidatorResult.Reject, + obj: checkpoint, + metadata: { isEquivocated, processBlock }, + severity: PeerErrorSeverity.HighToleranceError, + }; + } + + // If this was a duplicate proposal, do not process it, but do invoke the duplicate callback, + // and do re-broadcast it so other nodes in the network know to slash the proposer + if (isEquivocated) { + const proposer = checkpoint.getSender(); + this.logger.warn(`Detected duplicate checkpoint proposal (equivocation) at slot ${checkpoint.slotNumber}`, { + ...checkpoint.toCheckpointInfo(), + source: peerId.toString(), + proposer: proposer?.toString(), + }); + // Invoke the duplicate callback on the first duplicate spotted only + if (proposer && count === 2) { + this.duplicateProposalCallback?.({ slot: checkpoint.slotNumber, proposer, type: 'checkpoint' }); + } + return { + result: TopicValidatorResult.Accept, + obj: checkpoint, + metadata: { isEquivocated, processBlock }, + }; + } + + // Otherwise, we're good to go! + return { result: TopicValidatorResult.Accept, obj: checkpoint, metadata: { processBlock, isEquivocated } }; + } + + /** + * Process a validated checkpoint proposal. + * Note: The proposal was already added to the pool by tryAddCheckpointProposal in handleGossipedCheckpointProposal. + */ + @trackSpan('Libp2pService.processValidCheckpointProposal', async checkpoint => ({ + [Attributes.SLOT_NUMBER]: checkpoint.slotNumber, + [Attributes.BLOCK_ARCHIVE]: checkpoint.archive.toString(), + [Attributes.P2P_ID]: await checkpoint.p2pMessageLoggingIdentifier().then(i => i.toString()), + })) + public async processValidCheckpointProposal(checkpoint: CheckpointProposalCore, sender: PeerId) { + const slot = checkpoint.slotNumber; + this.logger.verbose(`Received checkpoint proposal for slot ${slot} from external peer ${sender.toString()}.`, { + p2pMessageIdentifier: await checkpoint.p2pMessageLoggingIdentifier(), + slot: checkpoint.slotNumber, + archive: checkpoint.archive.toString(), + source: sender.toString(), + }); + + await this.allNodesCheckpointReceivedCallback(checkpoint, sender); + + // Call the checkpoint received callback with the core version (without lastBlock) + // to validate and potentially generate attestations + const attestations = await this.validatorCheckpointReceivedCallback(checkpoint, sender); + if (attestations && attestations.length > 0) { + // If the callback returned attestations, add them to the pool and propagate them + await this.mempools.attestationPool.addOwnCheckpointAttestations(attestations); + for (const attestation of attestations) { + await this.net.propagate(attestation); + } + } + } + + /** + * Validate the requested block transactions request-response consistency. + * It does NOT validate the transactions themselves. + * @param request - The block transactions request. + * @param response - The block transactions response. + * @param peerId - The ID of the peer that made the request. + * @returns True if the request-response is consistent, false otherwise. + */ + @trackSpan('Libp2pService.validateRequestedBlockTxsConsistency', request => ({ + [Attributes.BLOCK_ARCHIVE]: request.archiveRoot.toString(), + })) + public async validateRequestedBlockTxsConsistency( + request: BlockTxsRequest, + response: BlockTxsResponse, + peerId: PeerId, + ): Promise { + try { + // A response with archiveRoot=Fr.zero is the documented "I don't have the block" signal from + // reqRespBlockTxsHandler (block_txs_handler.ts:54-58): the peer lacked the block in its + // attestation pool and archiver, but matched the requested hashes against its tx pool and + // shipped what it found. This is legitimate behaviour, not misbehaviour — we just can't verify + // membership/order without the block, so we drop the response without penalising the peer. + if (response.archiveRoot.isZero()) { + this.logger.debug(`Peer ${peerId.toString()} signalled missing block with Fr.zero archive root`); + return false; + } + + if (!response.archiveRoot.equals(request.archiveRoot)) { + this.net.penalizePeer(peerId, PeerErrorSeverity.MidToleranceError); + throw new ValidationError( + `Received block txs for unexpected archive root: expected ${request.archiveRoot.toString()}, got ${response.archiveRoot.toString()}`, + ); + } + + if (response.txIndices.getLength() !== request.txIndices.getLength()) { + this.net.penalizePeer(peerId, PeerErrorSeverity.MidToleranceError); + throw new ValidationError( + `Received block txs with mismatched bitvector length: expected ${request.txIndices.getLength()}, got ${response.txIndices.getLength()}`, + ); + } + + // Check no duplicates and not exceeding returnable count + const requestedIndices = new Set(request.txIndices.getTrueIndices()); + const availableIndices = new Set(response.txIndices.getTrueIndices()); + const maxReturnable = [...requestedIndices].filter(i => availableIndices.has(i)).length; + + const returnedHashes = await Promise.all(response.txs.map(tx => tx.getTxHash().toString())); + const uniqueReturned = new Set(returnedHashes.map(h => h.toString())); + if (uniqueReturned.size !== returnedHashes.length) { + this.net.penalizePeer(peerId, PeerErrorSeverity.MidToleranceError); + throw new ValidationError(`Received duplicate txs in block txs response`); + } + if (response.txs.length > maxReturnable) { + this.net.penalizePeer(peerId, PeerErrorSeverity.MidToleranceError); + throw new ValidationError( + `Received more txs (${response.txs.length}) than requested-and-available (${maxReturnable})`, + ); + } + + // To verify membership/order of the returned txs we need the canonical tx hash list for the + // block. Prefer the block proposal (held while a block is in flight), but fall back to the + // archiver for blocks we only know as mined — e.g. a prover collecting txs to prove a block it + // never received a proposal for. This mirrors the responder side (reqRespBlockTxsHandler), + // which serves from proposal-or-archiver. + const proposal = await this.mempools.attestationPool.getBlockProposalByArchive(request.archiveRoot.toString()); + const blockTxHashes = + proposal?.txHashes ?? + (await this.archiver.getBlock({ archive: request.archiveRoot }))?.body.txEffects.map(e => e.txHash); + + if (blockTxHashes) { + // Build intersected indices + const intersectIdx = request.txIndices.getTrueIndices().filter(i => response.txIndices.isSet(i)); + + // Enforce subset membership and preserve increasing order by index. + const hashToIndexInBlock = new Map( + blockTxHashes.map((h, i) => [h.toString(), i] as [string, number]), + ); + const allowedIndexSet = new Set(intersectIdx); + const indices = returnedHashes.map(h => hashToIndexInBlock.get(h)); + const allAllowed = indices.every(idx => idx !== undefined && allowedIndexSet.has(idx)); + const strictlyIncreasing = indices.every((idx, i) => (i === 0 ? idx !== undefined : idx! > indices[i - 1]!)); + if (!allAllowed || !strictlyIncreasing) { + this.net.penalizePeer(peerId, PeerErrorSeverity.LowToleranceError); + throw new ValidationError('Returned txs do not match expected subset/order for requested indices'); + } + } else { + // Neither a local proposal nor an archived block: we cannot verify membership/order of the + // returned txs. This is a local-state gap, not a peer fault, so we do not penalize. + this.logger.warn( + `Block ${request.archiveRoot.toString()} not found in attestation pool or archiver; cannot validate membership/order of returned txs`, + ); + return false; + } + + return true; + } catch (e: any) { + if (e instanceof ValidationError) { + this.logger.warn(`Failed validation for requested block txs from peer ${peerId.toString()}`); + } else { + this.logger.error(`Error during validation of requested block txs`, e); + } + + return false; + } + } + + public async validateTxsReceivedInBlockProposal(txs: Tx[]): Promise { + const validator = createTxValidatorForBlockProposalReceivedTxs( + this.proofVerifier, + { l1ChainId: this.config.l1ChainId, rollupVersion: this.config.rollupVersion }, + this.logger.getBindings(), + this.txValidationCache, + ); + + const results = await Promise.all( + txs.map(async tx => { + const result = await validator.validateTx(tx); + return result.result !== 'invalid'; + }), + ); + if (results.some(value => value === false)) { + throw new Error('Invalid tx detected'); + } + } + + /** Builds the request/response sub-protocol handlers that serve data from node state (status, tx, block txs). */ + public createReqRespDataHandlers(protocolVersion: string): Partial { + const handlers: Partial = { + [ReqRespSubProtocol.STATUS]: reqRespStatusHandler(protocolVersion, this.worldStateSynchronizer, this.logger), + }; + + if (!this.config.disableTransactions) { + handlers[ReqRespSubProtocol.BLOCK_TXS] = reqRespBlockTxsHandler( + this.mempools.attestationPool, + this.archiver, + this.mempools.txPool, + ); + handlers[ReqRespSubProtocol.TX] = reqRespTxHandler(this.mempools); + } + + return handlers; + } + + /** Returns the tx validator configuration used by the batch tx requester. */ + public getBatchTxValidatorConfig(): BatchRequestTxValidatorConfig { + return { + l1ChainId: this.config.l1ChainId, + rollupVersion: this.config.rollupVersion, + proofVerifier: this.proofVerifier, + txValidationCache: this.txValidationCache, + }; + } + + private getGasFees(): Promise { + return this.blockMinFeesProvider.getCurrentMinFees(); + } + + /** Creates the first stage (fast) validators for gossiped transactions. */ + protected async createFirstStageMessageValidators( + currentBlockNumber: BlockNumber, + nextSlotTimestamp: UInt64, + ): Promise> { + const gasFees = await this.getGasFees(); + const allowedInSetup = [ + ...(await getDefaultAllowedSetupFunctions()), + ...(this.config.txPublicSetupAllowListExtend ?? []), + ]; + const blockNumber = BlockNumber(currentBlockNumber + 1); + const l1Constants = await this.archiver.getL1Constants(); + + return createFirstStageTxValidationsForGossipedTransactions( + nextSlotTimestamp, + blockNumber, + this.worldStateSynchronizer, + gasFees, + this.config.l1ChainId, + this.config.rollupVersion, + protocolContractsHash, + this.archiver, + !this.config.disableTransactions, + allowedInSetup, + this.logger.getBindings(), + { + rollupManaLimit: l1Constants.rollupManaLimit, + maxBlockL2Gas: this.config.validateMaxL2BlockGas, + maxBlockDAGas: this.config.validateMaxDABlockGas, + }, + ); + } + + /** Creates the second stage (expensive proof verification) validators for gossiped transactions. */ + protected createSecondStageMessageValidators(): Record { + return createSecondStageTxValidationsForGossipedTransactions(this.proofVerifier, this.logger.getBindings()); + } + + /** + * Run validations on a tx. + * @param tx - The tx to validate. + * @param messageValidators - The message validators to run. + * @returns The validation outcome. + */ + private async runValidations( + tx: Tx, + messageValidators: Record, + ): Promise { + const validationPromises = Object.entries(messageValidators).map(async ([name, { validator, severity }]) => { + const { result } = await validator.validateTx(tx); + return { name, isValid: result !== 'invalid', severity }; + }); + + // A promise that resolves when all validations have been run + const allValidations = await Promise.all(validationPromises); + const failures = allValidations.filter(x => !x.isValid); + if (failures.length > 0) { + // Pick the most severe failure (lowest tolerance = harshest penalty) + const failed = maxBy(failures, f => PeerErrorSeverityByHarshness.indexOf(f.severity))!; + return { + allPassed: false, + failure: { + isValid: { result: 'invalid' as const, reason: ['Failed validation'] }, + name: failed.name, + severity: failed.severity, + }, + }; + } else { + return { + allPassed: true, + }; + } + } + + /** + * Handle a double spend failure. + * + * Double spend failures are managed on their own because they are a special case. + * We must check if the double spend is recent or old, if it is past a threshold, then we heavily penalize the peer. + * + * @param tx - The tx that failed the double spend validator. + * @param blockNumber - The block number of the tx. + * @param peerId - The peer ID of the peer that sent the tx. + * @returns Severity + */ + private async handleDoubleSpendFailure(tx: Tx, blockNumber: BlockNumber): Promise { + if (blockNumber <= this.config.doubleSpendSeverePeerPenaltyWindow) { + return PeerErrorSeverity.HighToleranceError; + } + + const snapshotValidator = new DoubleSpendTxValidator( + { + nullifiersExist: async (nullifiers: Buffer[]) => { + const merkleTree = this.worldStateSynchronizer.getSnapshot( + BlockNumber(blockNumber - this.config.doubleSpendSeverePeerPenaltyWindow), + ); + const indices = await merkleTree.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, nullifiers); + return indices.map(index => index !== undefined); + }, + }, + this.logger.getBindings(), + ); + + const validSnapshot = await snapshotValidator.validateTx(tx); + if (validSnapshot.result !== 'valid') { + return PeerErrorSeverity.LowToleranceError; + } + + return PeerErrorSeverity.HighToleranceError; + } +} diff --git a/yarn-project/p2p/src/services/service.ts b/yarn-project/p2p/src/services/service.ts index a2f550da5259..03581f06423d 100644 --- a/yarn-project/p2p/src/services/service.ts +++ b/yarn-project/p2p/src/services/service.ts @@ -95,28 +95,6 @@ export interface P2PService { */ propagate(message: T): Promise; - // Leaky abstraction: fix https://github.com/AztecProtocol/aztec-packages/issues/7963 - registerBlockReceivedCallback(callback: P2PBlockReceivedCallback): void; - - registerValidatorCheckpointReceivedCallback(callback: P2PCheckpointReceivedCallback): void; - - registerAllNodesCheckpointReceivedCallback(callback: P2PCheckpointReceivedCallback): void; - - /** - * Registers a callback invoked when a duplicate proposal is detected (equivocation). - * The callback is triggered on the first duplicate (when count goes from 1 to 2). - */ - registerDuplicateProposalCallback(callback: P2PDuplicateProposalCallback): void; - - /** - * Registers a callback invoked when a duplicate attestation is detected (equivocation). - * A validator signing attestations for different proposals at the same slot. - * The callback is triggered on the first duplicate (when count goes from 1 to 2). - */ - registerDuplicateAttestationCallback(callback: P2PDuplicateAttestationCallback): void; - - registerCheckpointAttestationCallback(callback: P2PCheckpointAttestationCallback): void; - getEnr(): ENR | undefined; getPeers(includePending?: boolean): PeerInfo[]; diff --git a/yarn-project/p2p/src/test-helpers/mock-pubsub.ts b/yarn-project/p2p/src/test-helpers/mock-pubsub.ts index ac1f1109b9ac..d1ee01b0865e 100644 --- a/yarn-project/p2p/src/test-helpers/mock-pubsub.ts +++ b/yarn-project/p2p/src/test-helpers/mock-pubsub.ts @@ -1,12 +1,5 @@ -import type { EpochCacheInterface } from '@aztec/epoch-cache'; -import { type Logger, createLogger } from '@aztec/foundation/log'; +import { createLogger } from '@aztec/foundation/log'; import { sleep } from '@aztec/foundation/sleep'; -import type { AztecAsyncKVStore } from '@aztec/kv-store'; -import type { L2BlockSource } from '@aztec/stdlib/block'; -import type { ContractDataSource } from '@aztec/stdlib/contract'; -import type { BlockMinFeesProvider } from '@aztec/stdlib/gas'; -import type { ClientProtocolCircuitVerifier, WorldStateSynchronizer } from '@aztec/stdlib/interfaces/server'; -import type { TelemetryClient } from '@aztec/telemetry-client'; import type { GossipsubEvents, GossipsubMessage } from '@chainsafe/libp2p-gossipsub'; import type { MsgIdStr, PeerIdStr, PublishOpts, TopicStr } from '@chainsafe/libp2p-gossipsub/types'; @@ -18,8 +11,6 @@ import { TypedEventEmitter, } from '@libp2p/interface'; -import type { P2PConfig } from '../config.js'; -import type { MemPools } from '../mem_pools/interface.js'; import { DummyPeerDiscoveryService, DummyPeerManager, LibP2PService } from '../services/index.js'; import type { P2PReqRespConfig } from '../services/reqresp/config.js'; import type { ConnectionSampler } from '../services/reqresp/connection-sampler/connection_sampler.js'; @@ -43,22 +34,7 @@ type GossipSubService = PubSubLibp2p['services']['pubsub']; export function getMockPubSubP2PServiceFactory( network: MockGossipSubNetwork, ): (...args: Parameters<(typeof LibP2PService)['new']>) => Promise { - return ( - config: P2PConfig, - peerId: PeerId, - deps: { - packageVersion: string; - mempools: MemPools; - l2BlockSource: L2BlockSource & ContractDataSource; - epochCache: EpochCacheInterface; - proofVerifier: ClientProtocolCircuitVerifier; - worldStateSynchronizer: WorldStateSynchronizer; - peerStore: AztecAsyncKVStore; - blockMinFeesProvider: BlockMinFeesProvider; - telemetry: TelemetryClient; - logger: Logger; - }, - ) => { + return (...[config, peerId, deps]: Parameters<(typeof LibP2PService)['new']>) => { deps.logger.verbose('Creating mock PubSub service'); const libp2p = new MockPubSub(peerId, network); const peerManager = new DummyPeerManager(peerId, network); @@ -70,12 +46,7 @@ export function getMockPubSubP2PServiceFactory( peerDiscoveryService, reqresp, peerManager, - deps.mempools, - deps.l2BlockSource, - deps.epochCache, - deps.proofVerifier, - deps.worldStateSynchronizer, - deps.blockMinFeesProvider, + deps.processor, deps.telemetry, deps.logger, ); diff --git a/yarn-project/p2p/src/test-helpers/reqresp-nodes.ts b/yarn-project/p2p/src/test-helpers/reqresp-nodes.ts index 8f9bbbbda460..57bd0c127a66 100644 --- a/yarn-project/p2p/src/test-helpers/reqresp-nodes.ts +++ b/yarn-project/p2p/src/test-helpers/reqresp-nodes.ts @@ -36,6 +36,7 @@ import type { MemPools } from '../mem_pools/interface.js'; import { DiscV5Service } from '../services/discv5/discV5_service.js'; import { APP_SPECIFIC_WEIGHT } from '../services/gossipsub/scoring.js'; import { LibP2PService } from '../services/libp2p/libp2p_service.js'; +import { P2PMessageProcessor } from '../services/libp2p/p2p_message_processor.js'; import { PeerManager } from '../services/peer-manager/peer_manager.js'; import { PeerScoring } from '../services/peer-manager/peer_scoring.js'; import type { P2PReqRespConfig } from '../services/reqresp/config.js'; @@ -162,12 +163,8 @@ export async function createTestLibP2PService( p2pNode.services.pubsub.score.params.appSpecificScore = (peerId: string) => peerManager.shouldDisableP2PGossip(peerId) ? -Infinity : peerManager.getPeerScore(peerId); - return new LibP2PService( + const processor = new P2PMessageProcessor( config, - p2pNode as PubSubLibp2p, - discoveryService, - reqresp, - peerManager, mempools, archiver, epochCache, @@ -176,6 +173,16 @@ export async function createTestLibP2PService( { getCurrentMinFees: () => Promise.resolve(GasFees.empty()) }, telemetry, ); + + return new LibP2PService( + config, + p2pNode as PubSubLibp2p, + discoveryService, + reqresp, + peerManager, + processor, + telemetry, + ); } /** diff --git a/yarn-project/p2p/src/testbench/p2p_client_testbench_worker.ts b/yarn-project/p2p/src/testbench/p2p_client_testbench_worker.ts index 274c86272f13..ec5d9a72d366 100644 --- a/yarn-project/p2p/src/testbench/p2p_client_testbench_worker.ts +++ b/yarn-project/p2p/src/testbench/p2p_client_testbench_worker.ts @@ -4,7 +4,6 @@ * Used when running testbench commands. */ import { MockL2BlockSource } from '@aztec/archiver/test'; -import type { EpochCacheInterface } from '@aztec/epoch-cache'; import { BlockNumber } from '@aztec/foundation/branded-types'; import { SecretValue } from '@aztec/foundation/config'; import { Secp256k1Signer } from '@aztec/foundation/crypto/secp256k1-signer'; @@ -15,10 +14,8 @@ import { DateProvider, Timer } from '@aztec/foundation/timer'; import { openTmpStore } from '@aztec/kv-store/lmdb-v2'; import { getVKTreeRoot } from '@aztec/noir-protocol-circuits-types/vk-tree'; import { protocolContractsHash } from '@aztec/protocol-contracts'; -import type { L2BlockSource } from '@aztec/stdlib/block'; -import type { ContractDataSource } from '@aztec/stdlib/contract'; import { GasFees } from '@aztec/stdlib/gas'; -import type { ClientProtocolCircuitVerifier, WorldStateSynchronizer } from '@aztec/stdlib/interfaces/server'; +import type { ClientProtocolCircuitVerifier } from '@aztec/stdlib/interfaces/server'; import type { DataStoreConfig } from '@aztec/stdlib/kv-store'; import { type BlockProposal, P2PMessage } from '@aztec/stdlib/p2p'; import { ChonkProof } from '@aztec/stdlib/proofs'; @@ -35,6 +32,7 @@ import type { P2PConfig } from '../config.js'; import { createP2PClient } from '../index.js'; import type { MemPools } from '../mem_pools/index.js'; import { LibP2PService } from '../services/index.js'; +import type { P2PMessageProcessor } from '../services/libp2p/p2p_message_processor.js'; import type { PeerManager } from '../services/peer-manager/peer_manager.js'; import { BatchTxRequester } from '../services/reqresp/batch-tx-requester/batch_tx_requester.js'; import type { BatchTxRequesterLibP2PService } from '../services/reqresp/batch-tx-requester/interface.js'; @@ -94,30 +92,13 @@ class TestLibP2PService extends LibP2PService { peerDiscoveryService: PeerDiscoveryService, reqresp: ReqResp, peerManager: PeerManager, - mempools: MemPools, - archiver: L2BlockSource & ContractDataSource, - epochCache: EpochCacheInterface, - proofVerifier: ClientProtocolCircuitVerifier, - worldStateSynchronizer: WorldStateSynchronizer, + processor: P2PMessageProcessor, + private readonly mempools: MemPools, telemetry: TelemetryClient, logger = createLogger('p2p:test:libp2p_service'), disableTxValidation = true, ) { - super( - config, - node, - peerDiscoveryService, - reqresp, - peerManager, - mempools, - archiver, - epochCache, - proofVerifier, - worldStateSynchronizer, - { getCurrentMinFees: () => Promise.resolve(GasFees.empty()) }, - telemetry, - logger, - ); + super(config, node, peerDiscoveryService, reqresp, peerManager, processor, telemetry, logger); this.disableTxValidation = disableTxValidation; } @@ -407,11 +388,8 @@ process.on('message', async msg => { (client as any).p2pService.peerDiscoveryService, (client as any).p2pService.reqresp, (client as any).p2pService.peerManager, - (client as any).p2pService.mempools, - (client as any).p2pService.archiver, - epochCache, - proofVerifier, - worldState, + (client as any).p2pService.processor, + (client as any).p2pService.processor.mempools, telemetry as TelemetryClient, workerLogger, true,