diff --git a/yarn-project/p2p/src/client/factory.ts b/yarn-project/p2p/src/client/factory.ts index 3f271e43744f..220281f0ae46 100644 --- a/yarn-project/p2p/src/client/factory.ts +++ b/yarn-project/p2p/src/client/factory.ts @@ -27,6 +27,7 @@ import { import { TxValidationCache } from '../msg_validators/tx_validator/tx_validation_cache.js'; import { DummyP2PService } from '../services/dummy_service.js'; import { LibP2PService } from '../services/index.js'; +import { P2PMessageProcessor } from '../services/libp2p/p2p_message_processor.js'; import { createFileStoreTxSources } from '../services/tx_collection/file_store_tx_source.js'; import { TxCollection } from '../services/tx_collection/tx_collection.js'; import { NodeRpcTxSource, type TxSource, createNodeRpcTxSources } from '../services/tx_collection/tx_source.js'; @@ -141,21 +142,33 @@ export async function createP2PClient( const txValidationCache = config.txValidationCacheSize > 0 ? new TxValidationCache(config.txValidationCacheSize) : undefined; - const p2pService = await createP2PService( + // The message processor owns received-message handling (validation, persistence, consensus callbacks). + // It is shared between the P2PClient (which registers callbacks on it) and the libp2p service (which + // invokes it when messages arrive), so it is built here and injected into both. + const processor = new P2PMessageProcessor( config, + mempools, archiver, + epochCache, proofVerifier, worldStateSynchronizer, - epochCache, blockMinFeesProvider, + telemetry, + logger.createChild('libp2p_service'), + txValidationCache, + ); + + const p2pService = await createP2PService( + config, + worldStateSynchronizer, + epochCache, store, peerStore, - mempools, + processor, deps.p2pServiceFactory, packageVersion, logger.createChild('libp2p_service'), telemetry, - txValidationCache, ); const txValidatorForTxCollection = createTxValidatorForOnDemandReceivedTxs( @@ -215,6 +228,7 @@ export async function createP2PClient( archiver, mempools, p2pService, + processor, txCollection, txFileStore, epochCache, @@ -228,19 +242,15 @@ export async function createP2PClient( async function createP2PService( config: P2PConfig & DataStoreConfig, - archiver: L2BlockSource & ContractDataSource, - proofVerifier: ClientProtocolCircuitVerifier, worldStateSynchronizer: WorldStateSynchronizer, epochCache: EpochCacheInterface, - blockMinFeesProvider: BlockMinFeesProvider, store: AztecAsyncKVStore, peerStore: AztecLMDBStoreV2, - mempools: MemPools, + processor: P2PMessageProcessor, p2pServiceFactory: P2PClientDeps['p2pServiceFactory'], packageVersion: string, logger: Logger, telemetry: TelemetryClient, - txValidationCache?: TxValidationCache, ) { if (!config.p2pEnabled) { logger.verbose('P2P is disabled. Using dummy P2P service.'); @@ -255,16 +265,12 @@ async function createP2PService( const p2pService = await (p2pServiceFactory ?? LibP2PService.new)(config, peerId, { packageVersion, - mempools, - l2BlockSource: archiver, + processor, epochCache, - proofVerifier, worldStateSynchronizer, peerStore, - blockMinFeesProvider, telemetry, logger: logger.createChild(`libp2p_service`), - txValidationCache, }); return p2pService; diff --git a/yarn-project/p2p/src/client/p2p_client.test.ts b/yarn-project/p2p/src/client/p2p_client.test.ts index 01c03e0d206d..adaba314dc52 100644 --- a/yarn-project/p2p/src/client/p2p_client.test.ts +++ b/yarn-project/p2p/src/client/p2p_client.test.ts @@ -18,6 +18,7 @@ import type { P2PService } from '../index.js'; import { type AttestationPool, createTestAttestationPool } from '../mem_pools/attestation_pool/attestation_pool.js'; import type { MemPools } from '../mem_pools/interface.js'; import type { TxPoolV2 } from '../mem_pools/tx_pool_v2/interfaces.js'; +import type { P2PMessageProcessor } from '../services/libp2p/p2p_message_processor.js'; import type { TxCollection } from '../services/tx_collection/tx_collection.js'; import { P2PClient } from './p2p_client.js'; @@ -68,6 +69,7 @@ describe('P2P Client', () => { blockSource, mempools, p2pService, + mock(), txCollection, undefined, epochCache, diff --git a/yarn-project/p2p/src/client/p2p_client.ts b/yarn-project/p2p/src/client/p2p_client.ts index 77fc6e8be11c..5290f096794f 100644 --- a/yarn-project/p2p/src/client/p2p_client.ts +++ b/yarn-project/p2p/src/client/p2p_client.ts @@ -39,6 +39,7 @@ import type { AttestationPoolApi, ProposalsForSlot } from '../mem_pools/attestat import type { MemPools } from '../mem_pools/interface.js'; import type { TxPoolV2 } from '../mem_pools/tx_pool_v2/interfaces.js'; import type { AuthRequest, StatusMessage } from '../services/index.js'; +import type { P2PMessageProcessor } from '../services/libp2p/p2p_message_processor.js'; import { ReqRespSubProtocol, type ReqRespSubProtocolHandler } from '../services/reqresp/interface.js'; import type { DuplicateAttestationInfo, @@ -91,6 +92,7 @@ export class P2PClient extends WithTracer implements P2P { private l2BlockSource: L2BlockSource & ContractDataSource, mempools: MemPools, private p2pService: P2PService, + private processor: P2PMessageProcessor, private txCollection: TxCollection, private txFileStore: TxFileStore | undefined, private epochCache: EpochCacheInterface, @@ -406,30 +408,30 @@ export class P2PClient extends WithTracer implements P2P { return this.attestationPool.hasBlockProposalsForSlot(slot); } - // REVIEW: https://github.com/AztecProtocol/aztec-packages/issues/7963 - // ^ This pattern is not my favorite (md) + // Consensus callbacks are registered directly on the message processor (the main-thread component + // that owns received-message handling), not on the libp2p networking service. public registerBlockProposalHandler(handler: P2PBlockReceivedCallback): void { - this.p2pService.registerBlockReceivedCallback(handler); + this.processor.registerBlockReceivedCallback(handler); } public registerValidatorCheckpointProposalHandler(handler: P2PCheckpointReceivedCallback): void { - this.p2pService.registerValidatorCheckpointReceivedCallback(handler); + this.processor.registerValidatorCheckpointReceivedCallback(handler); } public registerAllNodesCheckpointProposalHandler(handler: P2PCheckpointReceivedCallback): void { - this.p2pService.registerAllNodesCheckpointReceivedCallback(handler); + this.processor.registerAllNodesCheckpointReceivedCallback(handler); } public registerDuplicateProposalCallback(callback: (info: DuplicateProposalInfo) => void): void { - this.p2pService.registerDuplicateProposalCallback(callback); + this.processor.registerDuplicateProposalCallback(callback); } public registerDuplicateAttestationCallback(callback: (info: DuplicateAttestationInfo) => void): void { - this.p2pService.registerDuplicateAttestationCallback(callback); + this.processor.registerDuplicateAttestationCallback(callback); } public registerCheckpointAttestationCallback(callback: (attestation: CheckpointAttestation) => void): void { - this.p2pService.registerCheckpointAttestationCallback(callback); + this.processor.registerCheckpointAttestationCallback(callback); } public async getPendingTxs(limit?: number, after?: TxHash): Promise { diff --git a/yarn-project/p2p/src/client/test/p2p_client.integration_block_txs.test.ts b/yarn-project/p2p/src/client/test/p2p_client.integration_block_txs.test.ts index a0f8c6960b4e..b50ea6c084dc 100644 --- a/yarn-project/p2p/src/client/test/p2p_client.integration_block_txs.test.ts +++ b/yarn-project/p2p/src/client/test/p2p_client.integration_block_txs.test.ts @@ -451,7 +451,7 @@ describe('p2p client integration block txs protocol ', () => { // Run the response through client1's real validation — this is what BatchTxRequester does in // production. The peer must not be penalized. - await (client1.p2pService as any).validateRequestedBlockTxsConsistency( + await (client1.p2pService as any).processor.validateRequestedBlockTxsConsistency( request, decoded, client2.p2pService.node.peerId, diff --git a/yarn-project/p2p/src/services/dummy_service.ts b/yarn-project/p2p/src/services/dummy_service.ts index 7e132b28ef95..00d9c4a1fbb9 100644 --- a/yarn-project/p2p/src/services/dummy_service.ts +++ b/yarn-project/p2p/src/services/dummy_service.ts @@ -22,23 +22,12 @@ import type { } from './reqresp/interface.js'; import type { GoodByeReason } from './reqresp/protocols/goodbye.js'; import { ReqRespStatus } from './reqresp/status.js'; -import { - type P2PBlockReceivedCallback, - type P2PCheckpointAttestationCallback, - type P2PCheckpointReceivedCallback, - type P2PDuplicateAttestationCallback, - type P2PDuplicateProposalCallback, - type P2PService, - type PeerDiscoveryService, - PeerDiscoveryState, -} from './service.js'; +import { type P2PService, type PeerDiscoveryService, PeerDiscoveryState } from './service.js'; /** * A dummy implementation of the P2P Service. */ export class DummyP2PService implements P2PService { - private allNodesCheckpointReceivedCallback?: P2PCheckpointReceivedCallback; - updateConfig(_config: Partial): void {} /** Returns an empty array for peers. */ @@ -80,31 +69,6 @@ export class DummyP2PService implements P2PService { */ public settledTxs(_: TxHash[]) {} - /** - * Register a callback into the validator client for when a block proposal is received - */ - public registerBlockReceivedCallback(_callback: P2PBlockReceivedCallback) {} - - /** - * Register a callback into the validator client for when a checkpoint proposal is received - */ - public registerValidatorCheckpointReceivedCallback(_callback: P2PCheckpointReceivedCallback) {} - public registerAllNodesCheckpointReceivedCallback(callback: P2PCheckpointReceivedCallback) { - this.allNodesCheckpointReceivedCallback = callback; - } - - /** - * Register a callback for when a duplicate proposal is detected - */ - public registerDuplicateProposalCallback(_callback: P2PDuplicateProposalCallback): void {} - - /** - * Register a callback for when a duplicate attestation is detected - */ - public registerDuplicateAttestationCallback(_callback: P2PDuplicateAttestationCallback): void {} - - public registerCheckpointAttestationCallback(_callback: P2PCheckpointAttestationCallback): void {} - /** * Sends a request to a peer. * @param _protocol - The protocol to send the request on. diff --git a/yarn-project/p2p/src/services/libp2p/libp2p_service.test.ts b/yarn-project/p2p/src/services/libp2p/libp2p_service.test.ts index 2b95d58cdce0..f09eab4939b9 100644 --- a/yarn-project/p2p/src/services/libp2p/libp2p_service.test.ts +++ b/yarn-project/p2p/src/services/libp2p/libp2p_service.test.ts @@ -46,6 +46,7 @@ import { BitVector } from '../reqresp/protocols/block_txs/bitvector.js'; import { BlockTxsRequest, BlockTxsResponse } from '../reqresp/protocols/block_txs/block_txs_reqresp.js'; import type { PeerDiscoveryService } from '../service.js'; import { LibP2PService } from './libp2p_service.js'; +import { P2PMessageProcessor } from './p2p_message_processor.js'; describe('LibP2PService', () => { const MOCK_PEER_ID = 'peer-id-123'; @@ -1433,6 +1434,40 @@ interface CreateTestLibP2PServiceOptions { * Test subclass of LibP2PService that exposes protected methods for testing * and allows construction with mocked dependencies. */ +/** + * Test message processor that lets tests control validator outcomes via flags on the owning service. + * The validator factories live on the processor, so the test flags are read through a back-reference. + */ +class TestP2PMessageProcessor extends P2PMessageProcessor { + public testService!: TestLibP2PService; + + /** Override to use test flag for first-stage validators. Returns a failing validator when firstStageValidationPasses is false. */ + protected override createFirstStageMessageValidators(): Promise> { + if (this.testService.firstStageValidationPasses) { + return Promise.resolve({}); + } + return Promise.resolve({ + [this.testService.firstStageFailingValidatorName]: { + validator: { validateTx: () => Promise.resolve({ result: 'invalid' as const, reason: ['Test failure'] }) }, + severity: this.testService.firstStageSeverity, + }, + }); + } + + /** Override to use test flag for second-stage validators. Returns a failing validator when secondStageValidationPasses is false. */ + protected override createSecondStageMessageValidators(): Record { + if (this.testService.secondStageValidationPasses) { + return {}; + } + return { + proofValidator: { + validator: { validateTx: () => Promise.resolve({ result: 'invalid' as const, reason: ['Proof failure'] }) }, + severity: PeerErrorSeverity.LowToleranceError, + }, + }; + } +} + class TestLibP2PService extends LibP2PService { /** Controls whether first-stage gossip validation passes. Set to false to simulate first-stage failure. */ public firstStageValidationPasses = true; @@ -1481,12 +1516,8 @@ class TestLibP2PService extends LibP2PService { verifyProof: () => Promise.resolve({ valid: true, durationMs: 1000, totalDurationMs: 1000 }), }); - super( + const processor = new TestP2PMessageProcessor( mockConfig, - node, - resolvedPeerDiscoveryService, - mockReqResp, - peerManager, mempools, archiver, epochCache, @@ -1497,6 +1528,12 @@ class TestLibP2PService extends LibP2PService { logger, ); + super(mockConfig, node, resolvedPeerDiscoveryService, mockReqResp, peerManager, processor, telemetry, logger); + + // The processor reads validator-control flags off this service; wire the back-reference now that + // super() has run and `this` is available. + processor.testService = this; + this.mockPeerDiscoveryService = resolvedPeerDiscoveryService; this.testEpochCache = epochCache; @@ -1512,39 +1549,13 @@ class TestLibP2PService extends LibP2PService { return super.handleGossipedTx(payloadData, msgId, source); } - /** Override to use test flag for first-stage validators. Returns a failing validator when firstStageValidationPasses is false. */ - protected override createFirstStageMessageValidators(): Promise> { - if (this.firstStageValidationPasses) { - return Promise.resolve({}); - } - return Promise.resolve({ - [this.firstStageFailingValidatorName]: { - validator: { validateTx: () => Promise.resolve({ result: 'invalid' as const, reason: ['Test failure'] }) }, - severity: this.firstStageSeverity, - }, - }); - } - - /** Override to use test flag for second-stage validators. Returns a failing validator when secondStageValidationPasses is false. */ - protected override createSecondStageMessageValidators(): Record { - if (this.secondStageValidationPasses) { - return {}; - } - return { - proofValidator: { - validator: { validateTx: () => Promise.resolve({ result: 'invalid' as const, reason: ['Proof failure'] }) }, - severity: PeerErrorSeverity.LowToleranceError, - }, - }; - } - - /** Exposes the protected validateRequestedBlockTxsConsistency for testing. */ - public override validateRequestedBlockTxsConsistency( + /** Exposes the processor's validateRequestedBlockTxsConsistency for testing. */ + public validateRequestedBlockTxsConsistency( request: BlockTxsRequest, response: BlockTxsResponse, peerId: PeerId, ): Promise { - return super.validateRequestedBlockTxsConsistency(request, response, peerId); + return this.processor.validateRequestedBlockTxsConsistency(request, response, peerId); } /** Exposes the protected processBlockFromPeer for testing. */ @@ -1557,14 +1568,50 @@ class TestLibP2PService extends LibP2PService { return super.handleGossipedCheckpointProposal(payloadData, msgId, source); } - /** Exposes the protected validateAndStoreCheckpointAttestation for testing. */ - public override validateAndStoreCheckpointAttestation(peerId: PeerId, attestation: CheckpointAttestation) { - return super.validateAndStoreCheckpointAttestation(peerId, attestation); + /** Exposes the processor's validateAndStoreCheckpointAttestation for testing. */ + public validateAndStoreCheckpointAttestation(peerId: PeerId, attestation: CheckpointAttestation) { + return this.processor.validateAndStoreCheckpointAttestation(peerId, attestation); + } + + // Consensus callbacks now live on the processor; expose registration through the service for the tests + // that drive the gossip dispatchers directly. + public registerBlockReceivedCallback(cb: Parameters[0]): void { + this.processor.registerBlockReceivedCallback(cb); + } + + public registerValidatorCheckpointReceivedCallback( + cb: Parameters[0], + ): void { + this.processor.registerValidatorCheckpointReceivedCallback(cb); + } + + public registerAllNodesCheckpointReceivedCallback( + cb: Parameters[0], + ): void { + this.processor.registerAllNodesCheckpointReceivedCallback(cb); + } + + public registerDuplicateProposalCallback( + cb: Parameters[0], + ): void { + this.processor.registerDuplicateProposalCallback(cb); + } + + public registerDuplicateAttestationCallback( + cb: Parameters[0], + ): void { + this.processor.registerDuplicateAttestationCallback(cb); + } + + public registerCheckpointAttestationCallback( + cb: Parameters[0], + ): void { + this.processor.registerCheckpointAttestationCallback(cb); } /** Sets the attestation pool on the mempools for test setup. */ public setAttestationPool(attestationPool: MockAttestationPoolForTests): void { - (this.mempools as any).attestationPool = attestationPool; + (this.processor as any).mempools.attestationPool = attestationPool; } } diff --git a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts index c943f74a8c2a..480595bddf98 100644 --- a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts +++ b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts @@ -1,32 +1,24 @@ import type { EpochCacheInterface } from '@aztec/epoch-cache'; -import { BlockNumber, type SlotNumber } from '@aztec/foundation/branded-types'; -import { maxBy, merge } from '@aztec/foundation/collection'; +import { merge } from '@aztec/foundation/collection'; import { type Logger, createLibp2pComponentLogger, createLogger } from '@aztec/foundation/log'; import { RunningPromise } from '@aztec/foundation/running-promise'; import { Timer } from '@aztec/foundation/timer'; import type { AztecAsyncKVStore } from '@aztec/kv-store'; -import { protocolContractsHash } from '@aztec/protocol-contracts'; -import type { EthAddress, L2BlockSource } from '@aztec/stdlib/block'; -import type { ContractDataSource } from '@aztec/stdlib/contract'; -import { type BlockMinFeesProvider, GasFees } from '@aztec/stdlib/gas'; -import type { ClientProtocolCircuitVerifier, PeerInfo, WorldStateSynchronizer } from '@aztec/stdlib/interfaces/server'; +import type { EthAddress } from '@aztec/stdlib/block'; +import type { PeerInfo, WorldStateSynchronizer } from '@aztec/stdlib/interfaces/server'; import { BlockProposal, CheckpointAttestation, CheckpointProposal, - type CheckpointProposalCore, type Gossipable, P2PMessage, PeerErrorSeverity, - PeerErrorSeverityByHarshness, TopicType, createTopicString, getTopicsForConfig, metricsTopicStrToLabels, } from '@aztec/stdlib/p2p'; -import { MerkleTreeId } from '@aztec/stdlib/trees'; -import { Tx, type TxValidationResult } from '@aztec/stdlib/tx'; -import type { UInt64 } from '@aztec/stdlib/types'; +import { Tx } from '@aztec/stdlib/tx'; import { compressComponentVersions } from '@aztec/stdlib/versioning'; import { Attributes, @@ -34,7 +26,6 @@ import { SpanStatusCode, type TelemetryClient, WithTracer, - trackSpan, } from '@aztec/telemetry-client'; import { @@ -58,24 +49,7 @@ import { ENR } from '@nethermindeth/enr'; import { createLibp2p } from 'libp2p'; import type { P2PConfig } from '../../config.js'; -import { CheckpointProposalReceivedCallbackNotRegisteredError } from '../../errors/p2p-service.error.js'; -import type { MemPools } from '../../mem_pools/interface.js'; -import { - BlockProposalValidator, - CheckpointAttestationValidator, - CheckpointProposalValidator, - DoubleSpendTxValidator, - FishermanAttestationValidator, - getDefaultAllowedSetupFunctions, -} from '../../msg_validators/index.js'; import { MessageSeenValidator } from '../../msg_validators/msg_seen_validator/msg_seen_validator.js'; -import { - type TransactionValidator, - createFirstStageTxValidationsForGossipedTransactions, - createSecondStageTxValidationsForGossipedTransactions, - createTxValidatorForBlockProposalReceivedTxs, -} from '../../msg_validators/tx_validator/factory.js'; -import { TxValidationCache } from '../../msg_validators/tx_validator/tx_validation_cache.js'; import { GossipSubEvent } from '../../types/index.js'; import { type PubSubLibp2p, convertToMultiaddr } from '../../util.js'; import { getVersions } from '../../versioning.js'; @@ -91,44 +65,19 @@ import type { BatchTxRequesterLibP2PService } from '../reqresp/batch-tx-requeste import type { P2PReqRespConfig } from '../reqresp/config.js'; import { AuthRequest, - BlockTxsRequest, - BlockTxsResponse, type ReqRespInterface, type ReqRespResponse, ReqRespSubProtocol, type ReqRespSubProtocolHandler, type ReqRespSubProtocolHandlers, StatusMessage, - ValidationError, pingHandler, reqGoodbyeHandler, - reqRespBlockTxsHandler, - reqRespStatusHandler, - reqRespTxHandler, } from '../reqresp/index.js'; import { ReqResp } from '../reqresp/reqresp.js'; -import type { - P2PBlockReceivedCallback, - P2PCheckpointAttestationCallback, - P2PCheckpointReceivedCallback, - P2PDuplicateAttestationCallback, - P2PService, - PeerDiscoveryService, -} from '../service.js'; +import type { P2PService, PeerDiscoveryService } from '../service.js'; import { P2PInstrumentation } from './instrumentation.js'; - -interface ValidationResult { - name: string; - isValid: TxValidationResult; - severity: PeerErrorSeverity; -} - -type ValidationOutcome = { allPassed: true } | { allPassed: false; failure: ValidationResult }; - -// REFACTOR: Unify with the type above -type ReceivedMessageValidationResult = - | { obj: T; result: Exclude; metadata?: M } - | { obj?: T; result: TopicValidatorResult.Reject; metadata?: M; severity: PeerErrorSeverity }; +import { P2PMessageProcessor, type ReceivedMessageValidationResult } from './p2p_message_processor.js'; /** * Lib P2P implementation of the P2PService interface. @@ -137,47 +86,9 @@ export class LibP2PService extends WithTracer implements P2PService { private discoveryRunningPromise?: RunningPromise; private msgIdSeenValidators: Record = {} as Record; - // Message validators - private blockProposalValidator: BlockProposalValidator; - private checkpointProposalValidator: CheckpointProposalValidator; - private checkpointAttestationValidator: CheckpointAttestationValidator; - private protocolVersion = ''; private topicStrings: Record = {} as Record; - /** Callback invoked when a duplicate proposal is detected (triggers slashing). */ - private duplicateProposalCallback?: (info: { - slot: SlotNumber; - proposer: EthAddress; - type: 'checkpoint' | 'block'; - }) => void; - - /** Callback invoked when a duplicate attestation is detected (triggers slashing). */ - private duplicateAttestationCallback?: P2PDuplicateAttestationCallback; - - /** Callback invoked when a valid checkpoint attestation is accepted into the pool. */ - private checkpointAttestationCallback?: P2PCheckpointAttestationCallback; - - /** - * Callback for when a block is received from a peer. - * @param block - The block received from the peer. - * @returns The attestation for the block, if any. - */ - private blockReceivedCallback: P2PBlockReceivedCallback; - - /** - * Callback for when a checkpoint proposal is received from a peer. - * @param checkpoint - The checkpoint proposal received from the peer. - * @returns The attestations for the checkpoint, if any. - */ - private allNodesCheckpointReceivedCallback: P2PCheckpointReceivedCallback; - /** - * Callback for when a checkpoint proposal is received - specifically for validators - from a peer. - * @param checkpoint - The checkpoint proposal received from the peer. - * @returns The attestations for the checkpoint, if any. - */ - private validatorCheckpointReceivedCallback: P2PCheckpointReceivedCallback; - private gossipSubEventHandler: (e: CustomEvent) => void; private ipChangedHandler?: (ip: string) => void; @@ -189,21 +100,18 @@ export class LibP2PService extends WithTracer implements P2PService { protected logger: Logger; + /** Handles the content of received messages: validation against node state, persistence, and consensus callbacks. */ + protected processor: P2PMessageProcessor; + constructor( private config: P2PConfig, protected node: PubSubLibp2p, private peerDiscoveryService: PeerDiscoveryService, private reqresp: ReqRespInterface, protected peerManager: PeerManagerInterface, - protected mempools: MemPools, - protected archiver: L2BlockSource & ContractDataSource, - private epochCache: EpochCacheInterface, - private proofVerifier: ClientProtocolCircuitVerifier, - private worldStateSynchronizer: WorldStateSynchronizer, - private blockMinFeesProvider: BlockMinFeesProvider, + processor: P2PMessageProcessor, telemetry: TelemetryClient, logger: Logger = createLogger('p2p:libp2p_service'), - private txValidationCache?: TxValidationCache, ) { super(telemetry, 'LibP2PService'); this.telemetry = telemetry; @@ -233,50 +141,10 @@ export class LibP2PService extends WithTracer implements P2PService { this.protocolVersion, ); - const p2pPropagationTime = config.attestationPropagationTime; - const proposalValidatorOpts = { - txsPermitted: !config.disableTransactions, - maxTxsPerBlock: config.validateMaxTxsPerBlock ?? config.validateMaxTxsPerCheckpoint, - maxBlocksPerCheckpoint: config.maxBlocksPerCheckpoint, - p2pPropagationTime, - skipSlotValidation: config.skipProposalSlotValidation, - signatureContext: { - chainId: config.l1ChainId, - rollupAddress: config.rollupAddress, - }, - }; - this.blockProposalValidator = new BlockProposalValidator(epochCache, proposalValidatorOpts); - this.checkpointProposalValidator = new CheckpointProposalValidator(epochCache, proposalValidatorOpts); - const attestationValidatorOpts = { - l1PublishingTime: config.l1PublishingTime, - p2pPropagationTime, - signatureContext: proposalValidatorOpts.signatureContext, - }; - this.checkpointAttestationValidator = config.fishermanMode - ? new FishermanAttestationValidator(epochCache, mempools.attestationPool, telemetry, attestationValidatorOpts) - : new CheckpointAttestationValidator(epochCache, attestationValidatorOpts); - this.gossipSubEventHandler = this.handleGossipSubEvent.bind(this); - this.blockReceivedCallback = async (block: BlockProposal): Promise => { - this.logger.warn( - `Handler for block received not yet registered on P2P service. Received block ${block.blockNumber} for slot ${block.slotNumber} from peer.`, - { p2pMessageIdentifier: await block.p2pMessageLoggingIdentifier() }, - ); - return true; - }; - - this.allNodesCheckpointReceivedCallback = ( - _checkpoint: CheckpointProposalCore, - ): Promise => { - throw new CheckpointProposalReceivedCallbackNotRegisteredError(); - }; - - this.validatorCheckpointReceivedCallback = ( - _checkpoint: CheckpointProposalCore, - ): Promise => { - return Promise.resolve(undefined); - }; + this.processor = processor; + this.processor.setNetwork(this); } public updateConfig(config: Partial>) { @@ -294,32 +162,16 @@ export class LibP2PService extends WithTracer implements P2PService { config: P2PConfig, peerId: PeerId, deps: { - mempools: MemPools; - l2BlockSource: L2BlockSource & ContractDataSource; + processor: P2PMessageProcessor; epochCache: EpochCacheInterface; - proofVerifier: ClientProtocolCircuitVerifier; worldStateSynchronizer: WorldStateSynchronizer; peerStore: AztecAsyncKVStore; - blockMinFeesProvider: BlockMinFeesProvider; telemetry: TelemetryClient; logger: Logger; packageVersion: string; - txValidationCache?: TxValidationCache; }, ) { - const { - worldStateSynchronizer, - epochCache, - l2BlockSource, - mempools, - proofVerifier, - peerStore, - blockMinFeesProvider, - telemetry, - logger, - packageVersion, - txValidationCache, - } = deps; + const { processor, worldStateSynchronizer, epochCache, peerStore, telemetry, logger, packageVersion } = deps; const { p2pPort, maxPeerCount, listenAddress } = config; const bindAddrTcp = convertToMultiaddr(listenAddress, p2pPort, 'tcp'); @@ -512,22 +364,7 @@ export class LibP2PService extends WithTracer implements P2PService { node.services.pubsub.score.params.appSpecificScore = (peerId: string) => peerManager.shouldDisableP2PGossip(peerId) ? -Infinity : peerManager.getPeerScore(peerId); - return new LibP2PService( - config, - node, - peerDiscoveryService, - reqresp, - peerManager, - mempools, - l2BlockSource, - epochCache, - proofVerifier, - worldStateSynchronizer, - blockMinFeesProvider, - telemetry, - logger, - txValidationCache, - ); + return new LibP2PService(config, node, peerDiscoveryService, reqresp, peerManager, processor, telemetry, logger); } /** @@ -546,30 +383,16 @@ export class LibP2PService extends WithTracer implements P2PService { } const announceTcpMultiaddr = p2pIp ? convertToMultiaddr(p2pIp, p2pPort, 'tcp') : undefined; - // Create request response protocol handlers - const txHandler = reqRespTxHandler(this.mempools); + // Create request response protocol handlers. Handlers that serve data from node state (status, tx, + // block txs) come from the message processor; lifecycle handlers (ping, goodbye) live here. const goodbyeHandler = reqGoodbyeHandler(this.peerManager); - const statusHandler = reqRespStatusHandler(this.protocolVersion, this.worldStateSynchronizer, this.logger); const requestResponseHandlers: Partial = { [ReqRespSubProtocol.PING]: pingHandler, - [ReqRespSubProtocol.STATUS]: statusHandler.bind(this), [ReqRespSubProtocol.GOODBYE]: goodbyeHandler.bind(this), + ...this.processor.createReqRespDataHandlers(this.protocolVersion), }; - if (!this.config.disableTransactions) { - const blockTxsHandler = reqRespBlockTxsHandler( - this.mempools.attestationPool, - this.archiver, - this.mempools.txPool, - ); - requestResponseHandlers[ReqRespSubProtocol.BLOCK_TXS] = blockTxsHandler.bind(this); - } - - if (!this.config.disableTransactions) { - requestResponseHandlers[ReqRespSubProtocol.TX] = txHandler.bind(this); - } - await this.peerManager.initializePeers(); await this.reqresp.start(requestResponseHandlers); @@ -710,41 +533,6 @@ export class LibP2PService extends WithTracer implements P2PService { return this.peerDiscoveryService.getEnr(); } - public registerBlockReceivedCallback(callback: P2PBlockReceivedCallback) { - this.blockReceivedCallback = callback; - } - - public registerValidatorCheckpointReceivedCallback(callback: P2PCheckpointReceivedCallback) { - this.validatorCheckpointReceivedCallback = callback; - } - - public registerAllNodesCheckpointReceivedCallback(callback: P2PCheckpointReceivedCallback) { - this.allNodesCheckpointReceivedCallback = callback; - } - - /** - * Registers a callback to be invoked when a duplicate proposal is detected. - * This callback is triggered on the first duplicate (when count goes from 1 to 2). - */ - public registerDuplicateProposalCallback( - callback: (info: { slot: SlotNumber; proposer: EthAddress; type: 'checkpoint' | 'block' }) => void, - ): void { - this.duplicateProposalCallback = callback; - } - - /** - * Registers a callback to be invoked when a duplicate attestation is detected. - * A validator signing attestations for different proposals at the same slot. - * This callback is triggered on the first duplicate (when count goes from 1 to 2). - */ - public registerDuplicateAttestationCallback(callback: P2PDuplicateAttestationCallback): void { - this.duplicateAttestationCallback = callback; - } - - public registerCheckpointAttestationCallback(callback: P2PCheckpointAttestationCallback): void { - this.checkpointAttestationCallback = callback; - } - /** * Subscribes to a topic. * @param topic - The topic to subscribe to. @@ -1003,84 +791,12 @@ export class LibP2PService extends WithTracer implements P2PService { } protected async handleGossipedTx(payloadData: Buffer, msgId: string, source: PeerId) { - const validationFunc: () => Promise> = async () => { + const validationFunc: () => Promise> = () => { const tx = this.tryDeserialize(() => Tx.fromBuffer(payloadData), msgId, source); if (!tx) { - return { result: TopicValidatorResult.Reject, severity: PeerErrorSeverity.LowToleranceError }; - } - - const currentBlockNumber = await this.archiver.getBlockNumber(); - const { ts: nextSlotTimestamp } = this.epochCache.getEpochAndSlotInNextL1Slot(); - - // Stage 1: fast validators (metadata, data, timestamps, double-spend, gas, phases, block header) - const firstStageValidators = await this.createFirstStageMessageValidators(currentBlockNumber, nextSlotTimestamp); - const firstStageOutcome = await this.runValidations(tx, firstStageValidators); - if (!firstStageOutcome.allPassed) { - const { name } = firstStageOutcome.failure; - let { severity } = firstStageOutcome.failure; - - // Double spend validator has a special case handler. We perform more detailed examination - // as to how recently the nullifier was entered into the tree and if the transaction should - // have 'known' the nullifier existed. This determines the severity of the penalty applied to the peer. - if (name === 'doubleSpendValidator') { - const txBlockNumber = BlockNumber(currentBlockNumber + 1); - severity = await this.handleDoubleSpendFailure(tx, txBlockNumber); - } - - this.logger.verbose(`Rejecting gossiped tx ${tx.getTxHash().toString()}: stage 1 validation failed`, { - validator: name, - severity, - source: source.toString(), - }); - return { result: TopicValidatorResult.Reject, severity }; - } - - // Pool pre-check: see if the pool would accept this tx before doing expensive proof verification - const canAdd = await this.mempools.txPool.canAddPendingTx(tx); - if (canAdd === 'ignored') { - this.logger.verbose(`Ignoring gossiped tx ${tx.getTxHash().toString()}: pool pre-check returned ignored`, { - source: source.toString(), - }); - return { result: TopicValidatorResult.Ignore, obj: tx }; - } - - // Stage 2: expensive proof verification - const secondStageValidators = this.createSecondStageMessageValidators(); - const secondStageOutcome = await this.runValidations(tx, secondStageValidators); - if (!secondStageOutcome.allPassed) { - const { severity, name } = secondStageOutcome.failure; - this.logger.verbose(`Rejecting gossiped tx ${tx.getTxHash().toString()}: stage 2 validation failed`, { - validator: name, - severity, - source: source.toString(), - }); - return { result: TopicValidatorResult.Reject, severity }; - } - - // Pool add: persist the tx - const txHash = tx.getTxHash(); - const addResult = await this.mempools.txPool.addPendingTxs([tx], { source: 'gossip' }); - - const wasAccepted = addResult.accepted.some(h => h.equals(txHash)); - const wasIgnored = addResult.ignored.some(h => h.equals(txHash)); - - this.logger.verbose(`Validate propagated tx ${txHash.toString()}`, { - wasAccepted, - wasIgnored, - [Attributes.P2P_ID]: source.toString(), - }); - - if (wasAccepted) { - return { result: TopicValidatorResult.Accept, obj: tx }; - } else if (wasIgnored) { - return { result: TopicValidatorResult.Ignore, obj: tx }; - } else { - this.logger.warn(`Gossiped tx ${txHash.toString()} unexpectedly rejected by pool`, { - source: source.toString(), - txHash: txHash.toString(), - }); - return { result: TopicValidatorResult.Reject, severity: PeerErrorSeverity.HighToleranceError }; + return Promise.resolve({ result: TopicValidatorResult.Reject, severity: PeerErrorSeverity.LowToleranceError }); } + return this.processor.validateAndStoreTx(tx, source); }; const { result, obj: tx } = await this.validateReceivedMessage(validationFunc, msgId, source, TopicType.tx); @@ -1117,7 +833,7 @@ export class LibP2PService extends WithTracer implements P2PService { severity: PeerErrorSeverity.LowToleranceError, }); } - return this.validateAndStoreCheckpointAttestation(source, attestation); + return this.processor.validateAndStoreCheckpointAttestation(source, attestation); }, msgId, source, @@ -1139,83 +855,13 @@ export class LibP2PService extends WithTracer implements P2PService { ); } - /** Validates a checkpoint attestation and adds it to the pool. Penalizes the peer if validation fails. */ - @trackSpan('Libp2pService.validateAndStoreCheckpointAttestation', (_peerId, attestation) => ({ - [Attributes.SLOT_NUMBER]: attestation.payload.header.slotNumber.toString(), - })) - protected async validateAndStoreCheckpointAttestation( - peerId: PeerId, - attestation: CheckpointAttestation, - ): Promise> { - const validationResult = await this.checkpointAttestationValidator.validate(attestation); - - if (validationResult.result === 'reject') { - this.logger.warn(`Penalizing peer ${peerId} for checkpoint attestation validation failure`); - return { result: TopicValidatorResult.Reject, severity: validationResult.severity }; - } - - if (validationResult.result === 'ignore') { - return { result: TopicValidatorResult.Ignore, obj: attestation }; - } - - // Try to add the attestation: this handles existence check, cap check, and adding in one call - // count is the number of attestations by this signer for this slot (for duplicate detection) - const slot = attestation.payload.header.slotNumber; - const { added, alreadyExists, count } = - await this.mempools.attestationPool.tryAddCheckpointAttestation(attestation); - - this.logger.trace(`Validate propagated checkpoint attestation`, { - added, - alreadyExists, - count, - [Attributes.SLOT_NUMBER]: slot.toString(), - [Attributes.P2P_ID]: peerId.toString(), - }); - - // Exact same attestation received, no need to re-broadcast - if (alreadyExists) { - return { result: TopicValidatorResult.Ignore, obj: attestation }; - } - - // Could not add (cap reached for signer), penalize and do not re-broadcast - if (!added) { - this.logger.warn(`Rejecting checkpoint attestation due to cap`, { - slot: slot.toString(), - archive: attestation.archive.toString(), - source: peerId.toString(), - attester: attestation.getSender()?.toString(), - count, - }); - return { result: TopicValidatorResult.Reject, severity: PeerErrorSeverity.HighToleranceError }; - } - - // Check if this is a duplicate attestation (signer attested to a different proposal at the same slot) - // count is the number of attestations by this signer for this slot - if (count === 2) { - const attester = attestation.getSender(); - if (attester) { - this.logger.warn(`Detected duplicate attestation (equivocation) at slot ${slot}`, { - slot: slot.toString(), - archive: attestation.archive.toString(), - source: peerId.toString(), - attester: attester.toString(), - }); - this.duplicateAttestationCallback?.({ slot, attester }); - } - } - - // Attestation was added successfully - accept it so other nodes can also detect the equivocation - this.checkpointAttestationCallback?.(attestation); - return { result: TopicValidatorResult.Accept, obj: attestation }; - } - protected async processBlockFromPeer(payloadData: Buffer, msgId: string, source: PeerId): Promise { const { result, obj: block, metadata: { isEquivocated } = {}, } = await this.validateReceivedMessage( - () => this.validateAndStoreBlockProposal(source, BlockProposal.fromBuffer(payloadData)), + () => this.processor.validateAndStoreBlockProposal(source, BlockProposal.fromBuffer(payloadData)), msgId, source, TopicType.block_proposal, @@ -1226,104 +872,7 @@ export class LibP2PService extends WithTracer implements P2PService { return; } - await this.processValidBlockProposal(block, source); - } - - /** Validates a block proposal. Triggers a penalization to the peer that sent it if invalid. Adds to the mempool if valid. */ - @trackSpan('Libp2pService.validateAndStoreBlockProposal', (_peerId, block) => ({ - [Attributes.BLOCK_NUMBER]: block.blockNumber.toString(), - [Attributes.SLOT_NUMBER]: block.slotNumber.toString(), - })) - protected async validateAndStoreBlockProposal( - peerId: PeerId, - block: BlockProposal, - ): Promise> { - const validationResult = await this.blockProposalValidator.validate(block); - - if (validationResult.result === 'reject') { - this.logger.warn(`Penalizing peer ${peerId} for block proposal validation failure`); - return { result: TopicValidatorResult.Reject, severity: validationResult.severity }; - } - - if (validationResult.result === 'ignore') { - return { result: TopicValidatorResult.Ignore, obj: block }; - } - - // Try to add the proposal: this handles existence check, cap check, and adding in one call - const { added, alreadyExists, count } = await this.mempools.attestationPool.tryAddBlockProposal(block); - const isEquivocated = count !== undefined && count > 1; - - // Duplicate proposal received, no need to re-broadcast - if (alreadyExists) { - this.logger.debug(`Ignoring duplicate block proposal received`, { - ...block.toBlockInfo(), - indexWithinCheckpoint: block.indexWithinCheckpoint, - proposer: block.getSender()?.toString(), - source: peerId.toString(), - }); - return { result: TopicValidatorResult.Ignore, obj: block, metadata: { isEquivocated } }; - } - - // Too many blocks received for this slot and index, penalize peer and do not re-broadcast - if (!added) { - this.logger.warn(`Penalizing peer for block proposal exceeding per-position cap`, { - ...block.toBlockInfo(), - indexWithinCheckpoint: block.indexWithinCheckpoint, - count, - proposer: block.getSender()?.toString(), - source: peerId.toString(), - }); - return { - result: TopicValidatorResult.Reject, - metadata: { isEquivocated }, - severity: PeerErrorSeverity.HighToleranceError, - }; - } - - // If this was a duplicate proposal, do not process it, but do invoke the duplicate callback, - // and do re-broadcast it so other nodes in the network know to slash the proposer - if (isEquivocated) { - const proposer = block.getSender(); - this.logger.warn(`Detected duplicate block proposal (equivocation) at slot ${block.slotNumber}`, { - ...block.toBlockInfo(), - source: peerId.toString(), - proposer: proposer?.toString(), - }); - // Invoke the duplicate callback on the first duplicate spotted only - if (proposer && count === 2) { - this.duplicateProposalCallback?.({ slot: block.slotNumber, proposer, type: 'block' }); - } - return { result: TopicValidatorResult.Accept, obj: block, metadata: { isEquivocated } }; - } - - // Otherwise, we're good to go! - return { result: TopicValidatorResult.Accept, obj: block }; - } - - // REFACTOR(palla): This method should be moved to the p2p_client or to a separate component, - // should not be here as it does not deal with p2p networking. - @trackSpan('Libp2pService.processValidBlockProposal', async block => ({ - [Attributes.SLOT_NUMBER]: block.slotNumber, - [Attributes.BLOCK_ARCHIVE]: block.archive.toString(), - [Attributes.P2P_ID]: await block.p2pMessageLoggingIdentifier().then(i => i.toString()), - })) - protected async processValidBlockProposal(block: BlockProposal, sender: PeerId) { - const slot = block.slotNumber; - this.logger.verbose(`Received block proposal for slot ${slot} from external peer ${sender.toString()}.`, { - p2pMessageIdentifier: await block.p2pMessageLoggingIdentifier(), - source: sender.toString(), - ...block.toBlockInfo(), - }); - - // Mark the txs in this proposal as protected - await this.mempools.txPool.protectTxs(block.txHashes, block.blockHeader); - - // Call the block received callback to validate the proposal. - // Note: Validators do NOT attest to individual blocks, only to checkpoint proposals. - const isValid = await this.blockReceivedCallback(block, sender); - if (!isValid) { - this.logger.info(`Block proposal validation failed for block ${block.blockNumber}`, block.toBlockInfo()); - } + await this.processor.processValidBlockProposal(block, source); } /** @@ -1336,7 +885,7 @@ export class LibP2PService extends WithTracer implements P2PService { obj: checkpoint, metadata: { isEquivocated, processBlock } = {}, } = await this.validateReceivedMessage( - () => this.validateAndStoreCheckpointProposal(source, CheckpointProposal.fromBuffer(payloadData)), + () => this.processor.validateAndStoreCheckpointProposal(source, CheckpointProposal.fromBuffer(payloadData)), msgId, source, TopicType.checkpoint_proposal, @@ -1345,14 +894,14 @@ export class LibP2PService extends WithTracer implements P2PService { // Process checkpoint proposal if valid and not equivocated. const processCheckpointFn = () => result === TopicValidatorResult.Accept && checkpoint && !isEquivocated - ? this.processValidCheckpointProposal(checkpoint.toCore(), source) + ? this.processor.processValidCheckpointProposal(checkpoint.toCore(), source) : Promise.resolve(); // If the checkpoint contained a valid last block, we process it even if the checkpoint itself is to be rejected // TODO(palla/mbps): Is this ok? Should we be considering a block from a checkpoint that was equivocated? const processBlockFn = () => processBlock && checkpoint && checkpoint.getBlockProposal() - ? this.processValidBlockProposal(checkpoint.getBlockProposal()!, source) + ? this.processor.processValidBlockProposal(checkpoint.getBlockProposal()!, source) : Promise.resolve(); // A node that skips checkpoint validation attests without re-executing the embedded last block, so run @@ -1371,146 +920,6 @@ export class LibP2PService extends WithTracer implements P2PService { await processCheckpointFn(); } - /** - * Validates a checkpoint proposal. Penalizes peer if validation fails. Adds the checkpoint and - * its last block (if present) to the mempool if valid. Triggers equivocation detection on both. - */ - @trackSpan('Libp2pService.validateAndStoreCheckpointProposal', (_peerId, checkpoint) => ({ - [Attributes.SLOT_NUMBER]: checkpoint.slotNumber.toString(), - })) - protected async validateAndStoreCheckpointProposal( - peerId: PeerId, - checkpoint: CheckpointProposal, - ): Promise> { - const validationResult = await this.checkpointProposalValidator.validate(checkpoint); - - if (validationResult.result === 'reject') { - this.logger.warn(`Penalizing peer ${peerId} for checkpoint proposal validation failure`); - return { result: TopicValidatorResult.Reject, severity: validationResult.severity }; - } - - if (validationResult.result === 'ignore') { - return { result: TopicValidatorResult.Ignore, obj: checkpoint }; - } - - // Extract and try to add the block proposal first if present - const blockProposal = checkpoint.getBlockProposal(); - let processBlock = false; - if (blockProposal) { - this.logger.debug(`Validating block proposal from propagated checkpoint`, { - [Attributes.SLOT_NUMBER]: checkpoint.slotNumber.toString(), - [Attributes.P2P_ID]: peerId.toString(), - }); - const blockProposalResult = await this.validateAndStoreBlockProposal(peerId, blockProposal); - const { obj, metadata: { isEquivocated } = {} } = blockProposalResult; - if (blockProposalResult.result === TopicValidatorResult.Reject || !obj || isEquivocated) { - this.logger.debug(`Rejecting checkpoint due to invalid last block proposal`, { - [Attributes.SLOT_NUMBER]: checkpoint.slotNumber.toString(), - [Attributes.P2P_ID]: peerId.toString(), - isEquivocated, - result: blockProposalResult.result, - }); - return { - result: TopicValidatorResult.Reject, - severity: - 'severity' in blockProposalResult ? blockProposalResult.severity : PeerErrorSeverity.MidToleranceError, - }; - } else if (blockProposalResult.result === TopicValidatorResult.Accept && obj && !isEquivocated) { - processBlock = true; - } - } - - // Try to add the checkpoint proposal core: this handles existence check, cap check, and adding in one call - const checkpointCore = checkpoint.toCore(); - const tryAddResult = await this.mempools.attestationPool.tryAddCheckpointProposal(checkpointCore); - const { added, alreadyExists, count } = tryAddResult; - const isEquivocated = count !== undefined && count > 1; - - // Duplicate proposal received, do not re-broadcast - if (alreadyExists) { - this.logger.debug(`Ignoring duplicate checkpoint proposal received`, { - ...checkpoint.toCheckpointInfo(), - source: peerId.toString(), - }); - return { - result: TopicValidatorResult.Ignore, - obj: checkpoint, - metadata: { isEquivocated, processBlock }, - }; - } - - // Too many checkpoint proposals received for this slot, penalize peer and do not re-broadcast - // Note: We still return the checkpoint obj so the lastBlock can be processed if valid - if (!added) { - this.logger.warn(`Penalizing peer for checkpoint proposal exceeding per-slot cap`, { - ...checkpoint.toCheckpointInfo(), - count, - source: peerId.toString(), - }); - return { - result: TopicValidatorResult.Reject, - obj: checkpoint, - metadata: { isEquivocated, processBlock }, - severity: PeerErrorSeverity.HighToleranceError, - }; - } - - // If this was a duplicate proposal, do not process it, but do invoke the duplicate callback, - // and do re-broadcast it so other nodes in the network know to slash the proposer - if (isEquivocated) { - const proposer = checkpoint.getSender(); - this.logger.warn(`Detected duplicate checkpoint proposal (equivocation) at slot ${checkpoint.slotNumber}`, { - ...checkpoint.toCheckpointInfo(), - source: peerId.toString(), - proposer: proposer?.toString(), - }); - // Invoke the duplicate callback on the first duplicate spotted only - if (proposer && count === 2) { - this.duplicateProposalCallback?.({ slot: checkpoint.slotNumber, proposer, type: 'checkpoint' }); - } - return { - result: TopicValidatorResult.Accept, - obj: checkpoint, - metadata: { isEquivocated, processBlock }, - }; - } - - // Otherwise, we're good to go! - return { result: TopicValidatorResult.Accept, obj: checkpoint, metadata: { processBlock, isEquivocated } }; - } - - /** - * Process a validated checkpoint proposal. - * Note: The proposal was already added to the pool by tryAddCheckpointProposal in handleGossipedCheckpointProposal. - */ - @trackSpan('Libp2pService.processValidCheckpointProposal', async checkpoint => ({ - [Attributes.SLOT_NUMBER]: checkpoint.slotNumber, - [Attributes.BLOCK_ARCHIVE]: checkpoint.archive.toString(), - [Attributes.P2P_ID]: await checkpoint.p2pMessageLoggingIdentifier().then(i => i.toString()), - })) - protected async processValidCheckpointProposal(checkpoint: CheckpointProposalCore, sender: PeerId) { - const slot = checkpoint.slotNumber; - this.logger.verbose(`Received checkpoint proposal for slot ${slot} from external peer ${sender.toString()}.`, { - p2pMessageIdentifier: await checkpoint.p2pMessageLoggingIdentifier(), - slot: checkpoint.slotNumber, - archive: checkpoint.archive.toString(), - source: sender.toString(), - }); - - await this.allNodesCheckpointReceivedCallback(checkpoint, sender); - - // Call the checkpoint received callback with the core version (without lastBlock) - // to validate and potentially generate attestations - const attestations = await this.validatorCheckpointReceivedCallback(checkpoint, sender); - if (attestations && attestations.length > 0) { - // If the callback returned attestations, add them to the pool and propagate them - await this.mempools.attestationPool.addOwnCheckpointAttestations(attestations); - for (const attestation of attestations) { - await this.propagate(attestation); - } - } - } - /** * Propagates provided message to peers. * @param message - The message to propagate. @@ -1523,116 +932,6 @@ export class LibP2PService extends WithTracer implements P2PService { }); } - /** - * Validate the requested block transactions request-response consistency. - * It does NOT validate the transactions themselves. - * @param request - The block transactions request. - * @param response - The block transactions response. - * @param peerId - The ID of the peer that made the request. - * @returns True if the request-response is consistent, false otherwise. - */ - @trackSpan('Libp2pService.validateRequestedBlockTxsConsistency', request => ({ - [Attributes.BLOCK_ARCHIVE]: request.archiveRoot.toString(), - })) - protected async validateRequestedBlockTxsConsistency( - request: BlockTxsRequest, - response: BlockTxsResponse, - peerId: PeerId, - ): Promise { - try { - // A response with archiveRoot=Fr.zero is the documented "I don't have the block" signal from - // reqRespBlockTxsHandler (block_txs_handler.ts:54-58): the peer lacked the block in its - // attestation pool and archiver, but matched the requested hashes against its tx pool and - // shipped what it found. This is legitimate behaviour, not misbehaviour — we just can't verify - // membership/order without the block, so we drop the response without penalising the peer. - if (response.archiveRoot.isZero()) { - this.logger.debug(`Peer ${peerId.toString()} signalled missing block with Fr.zero archive root`); - return false; - } - - if (!response.archiveRoot.equals(request.archiveRoot)) { - this.peerManager.penalizePeer(peerId, PeerErrorSeverity.MidToleranceError); - throw new ValidationError( - `Received block txs for unexpected archive root: expected ${request.archiveRoot.toString()}, got ${response.archiveRoot.toString()}`, - ); - } - - if (response.txIndices.getLength() !== request.txIndices.getLength()) { - this.peerManager.penalizePeer(peerId, PeerErrorSeverity.MidToleranceError); - throw new ValidationError( - `Received block txs with mismatched bitvector length: expected ${request.txIndices.getLength()}, got ${response.txIndices.getLength()}`, - ); - } - - // Check no duplicates and not exceeding returnable count - const requestedIndices = new Set(request.txIndices.getTrueIndices()); - const availableIndices = new Set(response.txIndices.getTrueIndices()); - const maxReturnable = [...requestedIndices].filter(i => availableIndices.has(i)).length; - - const returnedHashes = await Promise.all(response.txs.map(tx => tx.getTxHash().toString())); - const uniqueReturned = new Set(returnedHashes.map(h => h.toString())); - if (uniqueReturned.size !== returnedHashes.length) { - this.peerManager.penalizePeer(peerId, PeerErrorSeverity.MidToleranceError); - throw new ValidationError(`Received duplicate txs in block txs response`); - } - if (response.txs.length > maxReturnable) { - this.peerManager.penalizePeer(peerId, PeerErrorSeverity.MidToleranceError); - throw new ValidationError( - `Received more txs (${response.txs.length}) than requested-and-available (${maxReturnable})`, - ); - } - - // To verify membership/order of the returned txs we need the canonical tx hash list for the - // block. Prefer the block proposal (held while a block is in flight), but fall back to the - // archiver for blocks we only know as mined — e.g. a prover collecting txs to prove a block it - // never received a proposal for. This mirrors the responder side (reqRespBlockTxsHandler), - // which serves from proposal-or-archiver. - const proposal = await this.mempools.attestationPool.getBlockProposalByArchive(request.archiveRoot.toString()); - const blockTxHashes = - proposal?.txHashes ?? - (await this.archiver.getBlock({ archive: request.archiveRoot }))?.body.txEffects.map(e => e.txHash); - - if (blockTxHashes) { - // Build intersected indices - const intersectIdx = request.txIndices.getTrueIndices().filter(i => response.txIndices.isSet(i)); - - // Enforce subset membership and preserve increasing order by index. - const hashToIndexInBlock = new Map( - blockTxHashes.map((h, i) => [h.toString(), i] as [string, number]), - ); - const allowedIndexSet = new Set(intersectIdx); - const indices = returnedHashes.map(h => hashToIndexInBlock.get(h)); - const allAllowed = indices.every(idx => idx !== undefined && allowedIndexSet.has(idx)); - const strictlyIncreasing = indices.every((idx, i) => (i === 0 ? idx !== undefined : idx! > indices[i - 1]!)); - if (!allAllowed || !strictlyIncreasing) { - this.peerManager.penalizePeer(peerId, PeerErrorSeverity.LowToleranceError); - throw new ValidationError('Returned txs do not match expected subset/order for requested indices'); - } - } else { - // Neither a local proposal nor an archived block: we cannot verify membership/order of the - // returned txs. This is a local-state gap, not a peer fault, so we do not penalize. - this.logger.warn( - `Block ${request.archiveRoot.toString()} not found in attestation pool or archiver; cannot validate membership/order of returned txs`, - ); - return false; - } - - return true; - } catch (e: any) { - if (e instanceof ValidationError) { - this.logger.warn(`Failed validation for requested block txs from peer ${peerId.toString()}`); - } else { - this.logger.error(`Error during validation of requested block txs`, e); - } - - return false; - } - } - - private getGasFees(): Promise { - return this.blockMinFeesProvider.getCurrentMinFees(); - } - /** * Get the BatchTxRequesterLibP2PService dependencies for creating BatchTxRequester instances */ @@ -1640,145 +939,19 @@ export class LibP2PService extends WithTracer implements P2PService { return { reqResp: this.reqresp, connectionSampler: this.reqresp.getConnectionSampler(), - txValidatorConfig: { - l1ChainId: this.config.l1ChainId, - rollupVersion: this.config.rollupVersion, - proofVerifier: this.proofVerifier, - txValidationCache: this.txValidationCache, - }, + txValidatorConfig: this.processor.getBatchTxValidatorConfig(), peerScoring: this.peerManager, - validateRequestedBlockTxsConsistency: this.validateRequestedBlockTxsConsistency.bind(this), + validateRequestedBlockTxsConsistency: this.processor.validateRequestedBlockTxsConsistency.bind(this.processor), }; } - public async validateTxsReceivedInBlockProposal(txs: Tx[]): Promise { - const validator = createTxValidatorForBlockProposalReceivedTxs( - this.proofVerifier, - { l1ChainId: this.config.l1ChainId, rollupVersion: this.config.rollupVersion }, - this.logger.getBindings(), - this.txValidationCache, - ); - - const results = await Promise.all( - txs.map(async tx => { - const result = await validator.validateTx(tx); - return result.result !== 'invalid'; - }), - ); - if (results.some(value => value === false)) { - throw new Error('Invalid tx detected'); - } + public validateTxsReceivedInBlockProposal(txs: Tx[]): Promise { + return this.processor.validateTxsReceivedInBlockProposal(txs); } - /** Creates the first stage (fast) validators for gossiped transactions. */ - protected async createFirstStageMessageValidators( - currentBlockNumber: BlockNumber, - nextSlotTimestamp: UInt64, - ): Promise> { - const gasFees = await this.getGasFees(); - const allowedInSetup = [ - ...(await getDefaultAllowedSetupFunctions()), - ...(this.config.txPublicSetupAllowListExtend ?? []), - ]; - const blockNumber = BlockNumber(currentBlockNumber + 1); - const l1Constants = await this.archiver.getL1Constants(); - - return createFirstStageTxValidationsForGossipedTransactions( - nextSlotTimestamp, - blockNumber, - this.worldStateSynchronizer, - gasFees, - this.config.l1ChainId, - this.config.rollupVersion, - protocolContractsHash, - this.archiver, - !this.config.disableTransactions, - allowedInSetup, - this.logger.getBindings(), - { - rollupManaLimit: l1Constants.rollupManaLimit, - maxBlockL2Gas: this.config.validateMaxL2BlockGas, - maxBlockDAGas: this.config.validateMaxDABlockGas, - }, - ); - } - - /** Creates the second stage (expensive proof verification) validators for gossiped transactions. */ - protected createSecondStageMessageValidators(): Record { - return createSecondStageTxValidationsForGossipedTransactions(this.proofVerifier, this.logger.getBindings()); - } - - /** - * Run validations on a tx. - * @param tx - The tx to validate. - * @param messageValidators - The message validators to run. - * @returns The validation outcome. - */ - private async runValidations( - tx: Tx, - messageValidators: Record, - ): Promise { - const validationPromises = Object.entries(messageValidators).map(async ([name, { validator, severity }]) => { - const { result } = await validator.validateTx(tx); - return { name, isValid: result !== 'invalid', severity }; - }); - - // A promise that resolves when all validations have been run - const allValidations = await Promise.all(validationPromises); - const failures = allValidations.filter(x => !x.isValid); - if (failures.length > 0) { - // Pick the most severe failure (lowest tolerance = harshest penalty) - const failed = maxBy(failures, f => PeerErrorSeverityByHarshness.indexOf(f.severity))!; - return { - allPassed: false, - failure: { - isValid: { result: 'invalid' as const, reason: ['Failed validation'] }, - name: failed.name, - severity: failed.severity, - }, - }; - } else { - return { - allPassed: true, - }; - } - } - - /** - * Handle a double spend failure. - * - * Double spend failures are managed on their own because they are a special case. - * We must check if the double spend is recent or old, if it is past a threshold, then we heavily penalize the peer. - * - * @param tx - The tx that failed the double spend validator. - * @param blockNumber - The block number of the tx. - * @param peerId - The peer ID of the peer that sent the tx. - * @returns Severity - */ - private async handleDoubleSpendFailure(tx: Tx, blockNumber: BlockNumber): Promise { - if (blockNumber <= this.config.doubleSpendSeverePeerPenaltyWindow) { - return PeerErrorSeverity.HighToleranceError; - } - - const snapshotValidator = new DoubleSpendTxValidator( - { - nullifiersExist: async (nullifiers: Buffer[]) => { - const merkleTree = this.worldStateSynchronizer.getSnapshot( - BlockNumber(blockNumber - this.config.doubleSpendSeverePeerPenaltyWindow), - ); - const indices = await merkleTree.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, nullifiers); - return indices.map(index => index !== undefined); - }, - }, - this.logger.getBindings(), - ); - - const validSnapshot = await snapshotValidator.validateTx(tx); - if (validSnapshot.result !== 'valid') { - return PeerErrorSeverity.LowToleranceError; - } - - return PeerErrorSeverity.HighToleranceError; + /** Applies a peer-scoring penalty. Exposed for the message processor to penalize peers during validation. */ + public penalizePeer(peerId: PeerId, severity: PeerErrorSeverity): void { + this.peerManager.penalizePeer(peerId, severity); } public getPeerScore(peerId: PeerId): number { diff --git a/yarn-project/p2p/src/services/libp2p/p2p_message_processor.ts b/yarn-project/p2p/src/services/libp2p/p2p_message_processor.ts new file mode 100644 index 000000000000..378799a51553 --- /dev/null +++ b/yarn-project/p2p/src/services/libp2p/p2p_message_processor.ts @@ -0,0 +1,895 @@ +import type { EpochCacheInterface } from '@aztec/epoch-cache'; +import { BlockNumber, type SlotNumber } from '@aztec/foundation/branded-types'; +import { maxBy } from '@aztec/foundation/collection'; +import { type Logger, createLogger } from '@aztec/foundation/log'; +import { protocolContractsHash } from '@aztec/protocol-contracts'; +import type { EthAddress, L2BlockSource } from '@aztec/stdlib/block'; +import type { ContractDataSource } from '@aztec/stdlib/contract'; +import { type BlockMinFeesProvider, GasFees } from '@aztec/stdlib/gas'; +import type { ClientProtocolCircuitVerifier, WorldStateSynchronizer } from '@aztec/stdlib/interfaces/server'; +import { + BlockProposal, + CheckpointAttestation, + CheckpointProposal, + type CheckpointProposalCore, + type Gossipable, + PeerErrorSeverity, + PeerErrorSeverityByHarshness, +} from '@aztec/stdlib/p2p'; +import { MerkleTreeId } from '@aztec/stdlib/trees'; +import { Tx, type TxValidationResult } from '@aztec/stdlib/tx'; +import type { UInt64 } from '@aztec/stdlib/types'; +import { Attributes, type TelemetryClient, WithTracer, trackSpan } from '@aztec/telemetry-client'; + +import { type PeerId, TopicValidatorResult } from '@libp2p/interface'; + +import type { P2PConfig } from '../../config.js'; +import { CheckpointProposalReceivedCallbackNotRegisteredError } from '../../errors/p2p-service.error.js'; +import type { MemPools } from '../../mem_pools/interface.js'; +import { + BlockProposalValidator, + CheckpointAttestationValidator, + CheckpointProposalValidator, + DoubleSpendTxValidator, + FishermanAttestationValidator, + getDefaultAllowedSetupFunctions, +} from '../../msg_validators/index.js'; +import { + type TransactionValidator, + createFirstStageTxValidationsForGossipedTransactions, + createSecondStageTxValidationsForGossipedTransactions, + createTxValidatorForBlockProposalReceivedTxs, +} from '../../msg_validators/tx_validator/factory.js'; +import type { TxValidationCache } from '../../msg_validators/tx_validator/tx_validation_cache.js'; +import type { BatchRequestTxValidatorConfig } from '../reqresp/batch-tx-requester/tx_validator.js'; +import { + BlockTxsRequest, + BlockTxsResponse, + ReqRespSubProtocol, + type ReqRespSubProtocolHandlers, + ValidationError, + reqRespBlockTxsHandler, + reqRespStatusHandler, + reqRespTxHandler, +} from '../reqresp/index.js'; +import type { + P2PBlockReceivedCallback, + P2PCheckpointAttestationCallback, + P2PCheckpointReceivedCallback, + P2PDuplicateAttestationCallback, +} from '../service.js'; + +interface ValidationResult { + name: string; + isValid: TxValidationResult; + severity: PeerErrorSeverity; +} + +type ValidationOutcome = { allPassed: true } | { allPassed: false; failure: ValidationResult }; + +// REFACTOR: Unify with the type above +export type ReceivedMessageValidationResult = + | { obj: T; result: Exclude; metadata?: M } + | { obj?: T; result: TopicValidatorResult.Reject; metadata?: M; severity: PeerErrorSeverity }; + +/** + * The subset of libp2p network operations the message processor invokes while handling received + * messages: re-broadcasting (propagate) and peer scoring (penalizePeer). Kept as a narrow interface + * so the processor stays decoupled from the libp2p node itself. + */ +export interface P2PNetwork { + propagate(message: T): Promise; + penalizePeer(peerId: PeerId, severity: PeerErrorSeverity): void; +} + +/** + * Handles the content of P2P messages received over gossip and request/response: validation against + * node state (world state, archiver, mempools), persistence into the mempools, and dispatch to the + * consensus callbacks. This is the part of the P2P stack that depends on main-thread node state, as + * opposed to the libp2p networking machinery in {@link LibP2PService}. + */ +export class P2PMessageProcessor extends WithTracer { + // Message validators + private blockProposalValidator: BlockProposalValidator; + private checkpointProposalValidator: CheckpointProposalValidator; + private checkpointAttestationValidator: CheckpointAttestationValidator; + + /** Callback invoked when a duplicate proposal is detected (triggers slashing). */ + private duplicateProposalCallback?: (info: { + slot: SlotNumber; + proposer: EthAddress; + type: 'checkpoint' | 'block'; + }) => void; + + /** Callback invoked when a duplicate attestation is detected (triggers slashing). */ + private duplicateAttestationCallback?: P2PDuplicateAttestationCallback; + + /** Callback invoked when a valid checkpoint attestation is accepted into the pool. */ + private checkpointAttestationCallback?: P2PCheckpointAttestationCallback; + + /** + * Callback for when a block is received from a peer. + * @param block - The block received from the peer. + * @returns The attestation for the block, if any. + */ + private blockReceivedCallback: P2PBlockReceivedCallback; + + /** + * Callback for when a checkpoint proposal is received from a peer. + * @param checkpoint - The checkpoint proposal received from the peer. + * @returns The attestations for the checkpoint, if any. + */ + private allNodesCheckpointReceivedCallback: P2PCheckpointReceivedCallback; + /** + * Callback for when a checkpoint proposal is received - specifically for validators - from a peer. + * @param checkpoint - The checkpoint proposal received from the peer. + * @returns The attestations for the checkpoint, if any. + */ + private validatorCheckpointReceivedCallback: P2PCheckpointReceivedCallback; + + protected logger: Logger; + + private network?: P2PNetwork; + + constructor( + private config: P2PConfig, + protected mempools: MemPools, + protected archiver: L2BlockSource & ContractDataSource, + private epochCache: EpochCacheInterface, + private proofVerifier: ClientProtocolCircuitVerifier, + private worldStateSynchronizer: WorldStateSynchronizer, + private blockMinFeesProvider: BlockMinFeesProvider, + telemetry: TelemetryClient, + logger: Logger = createLogger('p2p:libp2p_service'), + private txValidationCache?: TxValidationCache, + ) { + super(telemetry, 'P2PMessageProcessor'); + + // Create child logger with fisherman prefix if in fisherman mode + this.logger = config.fishermanMode ? logger.createChild('[FISHERMAN]') : logger; + + const p2pPropagationTime = config.attestationPropagationTime; + const proposalValidatorOpts = { + txsPermitted: !config.disableTransactions, + maxTxsPerBlock: config.validateMaxTxsPerBlock ?? config.validateMaxTxsPerCheckpoint, + maxBlocksPerCheckpoint: config.maxBlocksPerCheckpoint, + p2pPropagationTime, + skipSlotValidation: config.skipProposalSlotValidation, + signatureContext: { + chainId: config.l1ChainId, + rollupAddress: config.rollupAddress, + }, + }; + this.blockProposalValidator = new BlockProposalValidator(epochCache, proposalValidatorOpts); + this.checkpointProposalValidator = new CheckpointProposalValidator(epochCache, proposalValidatorOpts); + const attestationValidatorOpts = { + l1PublishingTime: config.l1PublishingTime, + p2pPropagationTime, + signatureContext: proposalValidatorOpts.signatureContext, + }; + this.checkpointAttestationValidator = config.fishermanMode + ? new FishermanAttestationValidator(epochCache, mempools.attestationPool, telemetry, attestationValidatorOpts) + : new CheckpointAttestationValidator(epochCache, attestationValidatorOpts); + + this.blockReceivedCallback = async (block: BlockProposal): Promise => { + this.logger.warn( + `Handler for block received not yet registered on P2P service. Received block ${block.blockNumber} for slot ${block.slotNumber} from peer.`, + { p2pMessageIdentifier: await block.p2pMessageLoggingIdentifier() }, + ); + return true; + }; + + this.allNodesCheckpointReceivedCallback = ( + _checkpoint: CheckpointProposalCore, + ): Promise => { + throw new CheckpointProposalReceivedCallbackNotRegisteredError(); + }; + + this.validatorCheckpointReceivedCallback = ( + _checkpoint: CheckpointProposalCore, + ): Promise => { + return Promise.resolve(undefined); + }; + } + + /** Wires up the libp2p network operations the processor invokes. Called once during construction of the service. */ + public setNetwork(network: P2PNetwork): void { + this.network = network; + } + + private get net(): P2PNetwork { + if (!this.network) { + throw new Error('P2PMessageProcessor network not set'); + } + return this.network; + } + + public registerBlockReceivedCallback(callback: P2PBlockReceivedCallback) { + this.blockReceivedCallback = callback; + } + + public registerValidatorCheckpointReceivedCallback(callback: P2PCheckpointReceivedCallback) { + this.validatorCheckpointReceivedCallback = callback; + } + + public registerAllNodesCheckpointReceivedCallback(callback: P2PCheckpointReceivedCallback) { + this.allNodesCheckpointReceivedCallback = callback; + } + + /** + * Registers a callback to be invoked when a duplicate proposal is detected. + * This callback is triggered on the first duplicate (when count goes from 1 to 2). + */ + public registerDuplicateProposalCallback( + callback: (info: { slot: SlotNumber; proposer: EthAddress; type: 'checkpoint' | 'block' }) => void, + ): void { + this.duplicateProposalCallback = callback; + } + + /** + * Registers a callback to be invoked when a duplicate attestation is detected. + * A validator signing attestations for different proposals at the same slot. + * This callback is triggered on the first duplicate (when count goes from 1 to 2). + */ + public registerDuplicateAttestationCallback(callback: P2PDuplicateAttestationCallback): void { + this.duplicateAttestationCallback = callback; + } + + public registerCheckpointAttestationCallback(callback: P2PCheckpointAttestationCallback): void { + this.checkpointAttestationCallback = callback; + } + + /** + * Validates a gossiped transaction against node state and, if valid, persists it to the tx pool. + * @returns The gossip validation result, indicating whether to re-broadcast the tx. + */ + public async validateAndStoreTx(tx: Tx, source: PeerId): Promise> { + const currentBlockNumber = await this.archiver.getBlockNumber(); + const { ts: nextSlotTimestamp } = this.epochCache.getEpochAndSlotInNextL1Slot(); + + // Stage 1: fast validators (metadata, data, timestamps, double-spend, gas, phases, block header) + const firstStageValidators = await this.createFirstStageMessageValidators(currentBlockNumber, nextSlotTimestamp); + const firstStageOutcome = await this.runValidations(tx, firstStageValidators); + if (!firstStageOutcome.allPassed) { + const { name } = firstStageOutcome.failure; + let { severity } = firstStageOutcome.failure; + + // Double spend validator has a special case handler. We perform more detailed examination + // as to how recently the nullifier was entered into the tree and if the transaction should + // have 'known' the nullifier existed. This determines the severity of the penalty applied to the peer. + if (name === 'doubleSpendValidator') { + const txBlockNumber = BlockNumber(currentBlockNumber + 1); + severity = await this.handleDoubleSpendFailure(tx, txBlockNumber); + } + + this.logger.verbose(`Rejecting gossiped tx ${tx.getTxHash().toString()}: stage 1 validation failed`, { + validator: name, + severity, + source: source.toString(), + }); + return { result: TopicValidatorResult.Reject, severity }; + } + + // Pool pre-check: see if the pool would accept this tx before doing expensive proof verification + const canAdd = await this.mempools.txPool.canAddPendingTx(tx); + if (canAdd === 'ignored') { + this.logger.verbose(`Ignoring gossiped tx ${tx.getTxHash().toString()}: pool pre-check returned ignored`, { + source: source.toString(), + }); + return { result: TopicValidatorResult.Ignore, obj: tx }; + } + + // Stage 2: expensive proof verification + const secondStageValidators = this.createSecondStageMessageValidators(); + const secondStageOutcome = await this.runValidations(tx, secondStageValidators); + if (!secondStageOutcome.allPassed) { + const { severity, name } = secondStageOutcome.failure; + this.logger.verbose(`Rejecting gossiped tx ${tx.getTxHash().toString()}: stage 2 validation failed`, { + validator: name, + severity, + source: source.toString(), + }); + return { result: TopicValidatorResult.Reject, severity }; + } + + // Pool add: persist the tx + const txHash = tx.getTxHash(); + const addResult = await this.mempools.txPool.addPendingTxs([tx], { source: 'gossip' }); + + const wasAccepted = addResult.accepted.some(h => h.equals(txHash)); + const wasIgnored = addResult.ignored.some(h => h.equals(txHash)); + + this.logger.verbose(`Validate propagated tx ${txHash.toString()}`, { + wasAccepted, + wasIgnored, + [Attributes.P2P_ID]: source.toString(), + }); + + if (wasAccepted) { + return { result: TopicValidatorResult.Accept, obj: tx }; + } else if (wasIgnored) { + return { result: TopicValidatorResult.Ignore, obj: tx }; + } else { + this.logger.warn(`Gossiped tx ${txHash.toString()} unexpectedly rejected by pool`, { + source: source.toString(), + txHash: txHash.toString(), + }); + return { result: TopicValidatorResult.Reject, severity: PeerErrorSeverity.HighToleranceError }; + } + } + + /** Validates a checkpoint attestation and adds it to the pool. Penalizes the peer if validation fails. */ + @trackSpan('Libp2pService.validateAndStoreCheckpointAttestation', (_peerId, attestation) => ({ + [Attributes.SLOT_NUMBER]: attestation.payload.header.slotNumber.toString(), + })) + public async validateAndStoreCheckpointAttestation( + peerId: PeerId, + attestation: CheckpointAttestation, + ): Promise> { + const validationResult = await this.checkpointAttestationValidator.validate(attestation); + + if (validationResult.result === 'reject') { + this.logger.warn(`Penalizing peer ${peerId} for checkpoint attestation validation failure`); + return { result: TopicValidatorResult.Reject, severity: validationResult.severity }; + } + + if (validationResult.result === 'ignore') { + return { result: TopicValidatorResult.Ignore, obj: attestation }; + } + + // Try to add the attestation: this handles existence check, cap check, and adding in one call + // count is the number of attestations by this signer for this slot (for duplicate detection) + const slot = attestation.payload.header.slotNumber; + const { added, alreadyExists, count } = + await this.mempools.attestationPool.tryAddCheckpointAttestation(attestation); + + this.logger.trace(`Validate propagated checkpoint attestation`, { + added, + alreadyExists, + count, + [Attributes.SLOT_NUMBER]: slot.toString(), + [Attributes.P2P_ID]: peerId.toString(), + }); + + // Exact same attestation received, no need to re-broadcast + if (alreadyExists) { + return { result: TopicValidatorResult.Ignore, obj: attestation }; + } + + // Could not add (cap reached for signer), penalize and do not re-broadcast + if (!added) { + this.logger.warn(`Rejecting checkpoint attestation due to cap`, { + slot: slot.toString(), + archive: attestation.archive.toString(), + source: peerId.toString(), + attester: attestation.getSender()?.toString(), + count, + }); + return { result: TopicValidatorResult.Reject, severity: PeerErrorSeverity.HighToleranceError }; + } + + // Check if this is a duplicate attestation (signer attested to a different proposal at the same slot) + // count is the number of attestations by this signer for this slot + if (count === 2) { + const attester = attestation.getSender(); + if (attester) { + this.logger.warn(`Detected duplicate attestation (equivocation) at slot ${slot}`, { + slot: slot.toString(), + archive: attestation.archive.toString(), + source: peerId.toString(), + attester: attester.toString(), + }); + this.duplicateAttestationCallback?.({ slot, attester }); + } + } + + // Attestation was added successfully - accept it so other nodes can also detect the equivocation + this.checkpointAttestationCallback?.(attestation); + return { result: TopicValidatorResult.Accept, obj: attestation }; + } + + /** Validates a block proposal. Triggers a penalization to the peer that sent it if invalid. Adds to the mempool if valid. */ + @trackSpan('Libp2pService.validateAndStoreBlockProposal', (_peerId, block) => ({ + [Attributes.BLOCK_NUMBER]: block.blockNumber.toString(), + [Attributes.SLOT_NUMBER]: block.slotNumber.toString(), + })) + public async validateAndStoreBlockProposal( + peerId: PeerId, + block: BlockProposal, + ): Promise> { + const validationResult = await this.blockProposalValidator.validate(block); + + if (validationResult.result === 'reject') { + this.logger.warn(`Penalizing peer ${peerId} for block proposal validation failure`); + return { result: TopicValidatorResult.Reject, severity: validationResult.severity }; + } + + if (validationResult.result === 'ignore') { + return { result: TopicValidatorResult.Ignore, obj: block }; + } + + // Try to add the proposal: this handles existence check, cap check, and adding in one call + const { added, alreadyExists, count } = await this.mempools.attestationPool.tryAddBlockProposal(block); + const isEquivocated = count !== undefined && count > 1; + + // Duplicate proposal received, no need to re-broadcast + if (alreadyExists) { + this.logger.debug(`Ignoring duplicate block proposal received`, { + ...block.toBlockInfo(), + indexWithinCheckpoint: block.indexWithinCheckpoint, + proposer: block.getSender()?.toString(), + source: peerId.toString(), + }); + return { result: TopicValidatorResult.Ignore, obj: block, metadata: { isEquivocated } }; + } + + // Too many blocks received for this slot and index, penalize peer and do not re-broadcast + if (!added) { + this.logger.warn(`Penalizing peer for block proposal exceeding per-position cap`, { + ...block.toBlockInfo(), + indexWithinCheckpoint: block.indexWithinCheckpoint, + count, + proposer: block.getSender()?.toString(), + source: peerId.toString(), + }); + return { + result: TopicValidatorResult.Reject, + metadata: { isEquivocated }, + severity: PeerErrorSeverity.HighToleranceError, + }; + } + + // If this was a duplicate proposal, do not process it, but do invoke the duplicate callback, + // and do re-broadcast it so other nodes in the network know to slash the proposer + if (isEquivocated) { + const proposer = block.getSender(); + this.logger.warn(`Detected duplicate block proposal (equivocation) at slot ${block.slotNumber}`, { + ...block.toBlockInfo(), + source: peerId.toString(), + proposer: proposer?.toString(), + }); + // Invoke the duplicate callback on the first duplicate spotted only + if (proposer && count === 2) { + this.duplicateProposalCallback?.({ slot: block.slotNumber, proposer, type: 'block' }); + } + return { result: TopicValidatorResult.Accept, obj: block, metadata: { isEquivocated } }; + } + + // Otherwise, we're good to go! + return { result: TopicValidatorResult.Accept, obj: block }; + } + + // REFACTOR(palla): This method should be moved to the p2p_client or to a separate component, + // should not be here as it does not deal with p2p networking. + @trackSpan('Libp2pService.processValidBlockProposal', async block => ({ + [Attributes.SLOT_NUMBER]: block.slotNumber, + [Attributes.BLOCK_ARCHIVE]: block.archive.toString(), + [Attributes.P2P_ID]: await block.p2pMessageLoggingIdentifier().then(i => i.toString()), + })) + public async processValidBlockProposal(block: BlockProposal, sender: PeerId) { + const slot = block.slotNumber; + this.logger.verbose(`Received block proposal for slot ${slot} from external peer ${sender.toString()}.`, { + p2pMessageIdentifier: await block.p2pMessageLoggingIdentifier(), + source: sender.toString(), + ...block.toBlockInfo(), + }); + + // Mark the txs in this proposal as protected + await this.mempools.txPool.protectTxs(block.txHashes, block.blockHeader); + + // Call the block received callback to validate the proposal. + // Note: Validators do NOT attest to individual blocks, only to checkpoint proposals. + const isValid = await this.blockReceivedCallback(block, sender); + if (!isValid) { + this.logger.info(`Block proposal validation failed for block ${block.blockNumber}`, block.toBlockInfo()); + } + } + + /** + * Validates a checkpoint proposal. Penalizes peer if validation fails. Adds the checkpoint and + * its last block (if present) to the mempool if valid. Triggers equivocation detection on both. + */ + @trackSpan('Libp2pService.validateAndStoreCheckpointProposal', (_peerId, checkpoint) => ({ + [Attributes.SLOT_NUMBER]: checkpoint.slotNumber.toString(), + })) + public async validateAndStoreCheckpointProposal( + peerId: PeerId, + checkpoint: CheckpointProposal, + ): Promise> { + const validationResult = await this.checkpointProposalValidator.validate(checkpoint); + + if (validationResult.result === 'reject') { + this.logger.warn(`Penalizing peer ${peerId} for checkpoint proposal validation failure`); + return { result: TopicValidatorResult.Reject, severity: validationResult.severity }; + } + + if (validationResult.result === 'ignore') { + return { result: TopicValidatorResult.Ignore, obj: checkpoint }; + } + + // Extract and try to add the block proposal first if present + const blockProposal = checkpoint.getBlockProposal(); + let processBlock = false; + if (blockProposal) { + this.logger.debug(`Validating block proposal from propagated checkpoint`, { + [Attributes.SLOT_NUMBER]: checkpoint.slotNumber.toString(), + [Attributes.P2P_ID]: peerId.toString(), + }); + const blockProposalResult = await this.validateAndStoreBlockProposal(peerId, blockProposal); + const { obj, metadata: { isEquivocated } = {} } = blockProposalResult; + if (blockProposalResult.result === TopicValidatorResult.Reject || !obj || isEquivocated) { + this.logger.debug(`Rejecting checkpoint due to invalid last block proposal`, { + [Attributes.SLOT_NUMBER]: checkpoint.slotNumber.toString(), + [Attributes.P2P_ID]: peerId.toString(), + isEquivocated, + result: blockProposalResult.result, + }); + return { + result: TopicValidatorResult.Reject, + severity: + 'severity' in blockProposalResult ? blockProposalResult.severity : PeerErrorSeverity.MidToleranceError, + }; + } else if (blockProposalResult.result === TopicValidatorResult.Accept && obj && !isEquivocated) { + processBlock = true; + } + } + + // Try to add the checkpoint proposal core: this handles existence check, cap check, and adding in one call + const checkpointCore = checkpoint.toCore(); + const tryAddResult = await this.mempools.attestationPool.tryAddCheckpointProposal(checkpointCore); + const { added, alreadyExists, count } = tryAddResult; + const isEquivocated = count !== undefined && count > 1; + + // Duplicate proposal received, do not re-broadcast + if (alreadyExists) { + this.logger.debug(`Ignoring duplicate checkpoint proposal received`, { + ...checkpoint.toCheckpointInfo(), + source: peerId.toString(), + }); + return { + result: TopicValidatorResult.Ignore, + obj: checkpoint, + metadata: { isEquivocated, processBlock }, + }; + } + + // Too many checkpoint proposals received for this slot, penalize peer and do not re-broadcast + // Note: We still return the checkpoint obj so the lastBlock can be processed if valid + if (!added) { + this.logger.warn(`Penalizing peer for checkpoint proposal exceeding per-slot cap`, { + ...checkpoint.toCheckpointInfo(), + count, + source: peerId.toString(), + }); + return { + result: TopicValidatorResult.Reject, + obj: checkpoint, + metadata: { isEquivocated, processBlock }, + severity: PeerErrorSeverity.HighToleranceError, + }; + } + + // If this was a duplicate proposal, do not process it, but do invoke the duplicate callback, + // and do re-broadcast it so other nodes in the network know to slash the proposer + if (isEquivocated) { + const proposer = checkpoint.getSender(); + this.logger.warn(`Detected duplicate checkpoint proposal (equivocation) at slot ${checkpoint.slotNumber}`, { + ...checkpoint.toCheckpointInfo(), + source: peerId.toString(), + proposer: proposer?.toString(), + }); + // Invoke the duplicate callback on the first duplicate spotted only + if (proposer && count === 2) { + this.duplicateProposalCallback?.({ slot: checkpoint.slotNumber, proposer, type: 'checkpoint' }); + } + return { + result: TopicValidatorResult.Accept, + obj: checkpoint, + metadata: { isEquivocated, processBlock }, + }; + } + + // Otherwise, we're good to go! + return { result: TopicValidatorResult.Accept, obj: checkpoint, metadata: { processBlock, isEquivocated } }; + } + + /** + * Process a validated checkpoint proposal. + * Note: The proposal was already added to the pool by tryAddCheckpointProposal in handleGossipedCheckpointProposal. + */ + @trackSpan('Libp2pService.processValidCheckpointProposal', async checkpoint => ({ + [Attributes.SLOT_NUMBER]: checkpoint.slotNumber, + [Attributes.BLOCK_ARCHIVE]: checkpoint.archive.toString(), + [Attributes.P2P_ID]: await checkpoint.p2pMessageLoggingIdentifier().then(i => i.toString()), + })) + public async processValidCheckpointProposal(checkpoint: CheckpointProposalCore, sender: PeerId) { + const slot = checkpoint.slotNumber; + this.logger.verbose(`Received checkpoint proposal for slot ${slot} from external peer ${sender.toString()}.`, { + p2pMessageIdentifier: await checkpoint.p2pMessageLoggingIdentifier(), + slot: checkpoint.slotNumber, + archive: checkpoint.archive.toString(), + source: sender.toString(), + }); + + await this.allNodesCheckpointReceivedCallback(checkpoint, sender); + + // Call the checkpoint received callback with the core version (without lastBlock) + // to validate and potentially generate attestations + const attestations = await this.validatorCheckpointReceivedCallback(checkpoint, sender); + if (attestations && attestations.length > 0) { + // If the callback returned attestations, add them to the pool and propagate them + await this.mempools.attestationPool.addOwnCheckpointAttestations(attestations); + for (const attestation of attestations) { + await this.net.propagate(attestation); + } + } + } + + /** + * Validate the requested block transactions request-response consistency. + * It does NOT validate the transactions themselves. + * @param request - The block transactions request. + * @param response - The block transactions response. + * @param peerId - The ID of the peer that made the request. + * @returns True if the request-response is consistent, false otherwise. + */ + @trackSpan('Libp2pService.validateRequestedBlockTxsConsistency', request => ({ + [Attributes.BLOCK_ARCHIVE]: request.archiveRoot.toString(), + })) + public async validateRequestedBlockTxsConsistency( + request: BlockTxsRequest, + response: BlockTxsResponse, + peerId: PeerId, + ): Promise { + try { + // A response with archiveRoot=Fr.zero is the documented "I don't have the block" signal from + // reqRespBlockTxsHandler (block_txs_handler.ts:54-58): the peer lacked the block in its + // attestation pool and archiver, but matched the requested hashes against its tx pool and + // shipped what it found. This is legitimate behaviour, not misbehaviour — we just can't verify + // membership/order without the block, so we drop the response without penalising the peer. + if (response.archiveRoot.isZero()) { + this.logger.debug(`Peer ${peerId.toString()} signalled missing block with Fr.zero archive root`); + return false; + } + + if (!response.archiveRoot.equals(request.archiveRoot)) { + this.net.penalizePeer(peerId, PeerErrorSeverity.MidToleranceError); + throw new ValidationError( + `Received block txs for unexpected archive root: expected ${request.archiveRoot.toString()}, got ${response.archiveRoot.toString()}`, + ); + } + + if (response.txIndices.getLength() !== request.txIndices.getLength()) { + this.net.penalizePeer(peerId, PeerErrorSeverity.MidToleranceError); + throw new ValidationError( + `Received block txs with mismatched bitvector length: expected ${request.txIndices.getLength()}, got ${response.txIndices.getLength()}`, + ); + } + + // Check no duplicates and not exceeding returnable count + const requestedIndices = new Set(request.txIndices.getTrueIndices()); + const availableIndices = new Set(response.txIndices.getTrueIndices()); + const maxReturnable = [...requestedIndices].filter(i => availableIndices.has(i)).length; + + const returnedHashes = await Promise.all(response.txs.map(tx => tx.getTxHash().toString())); + const uniqueReturned = new Set(returnedHashes.map(h => h.toString())); + if (uniqueReturned.size !== returnedHashes.length) { + this.net.penalizePeer(peerId, PeerErrorSeverity.MidToleranceError); + throw new ValidationError(`Received duplicate txs in block txs response`); + } + if (response.txs.length > maxReturnable) { + this.net.penalizePeer(peerId, PeerErrorSeverity.MidToleranceError); + throw new ValidationError( + `Received more txs (${response.txs.length}) than requested-and-available (${maxReturnable})`, + ); + } + + // To verify membership/order of the returned txs we need the canonical tx hash list for the + // block. Prefer the block proposal (held while a block is in flight), but fall back to the + // archiver for blocks we only know as mined — e.g. a prover collecting txs to prove a block it + // never received a proposal for. This mirrors the responder side (reqRespBlockTxsHandler), + // which serves from proposal-or-archiver. + const proposal = await this.mempools.attestationPool.getBlockProposalByArchive(request.archiveRoot.toString()); + const blockTxHashes = + proposal?.txHashes ?? + (await this.archiver.getBlock({ archive: request.archiveRoot }))?.body.txEffects.map(e => e.txHash); + + if (blockTxHashes) { + // Build intersected indices + const intersectIdx = request.txIndices.getTrueIndices().filter(i => response.txIndices.isSet(i)); + + // Enforce subset membership and preserve increasing order by index. + const hashToIndexInBlock = new Map( + blockTxHashes.map((h, i) => [h.toString(), i] as [string, number]), + ); + const allowedIndexSet = new Set(intersectIdx); + const indices = returnedHashes.map(h => hashToIndexInBlock.get(h)); + const allAllowed = indices.every(idx => idx !== undefined && allowedIndexSet.has(idx)); + const strictlyIncreasing = indices.every((idx, i) => (i === 0 ? idx !== undefined : idx! > indices[i - 1]!)); + if (!allAllowed || !strictlyIncreasing) { + this.net.penalizePeer(peerId, PeerErrorSeverity.LowToleranceError); + throw new ValidationError('Returned txs do not match expected subset/order for requested indices'); + } + } else { + // Neither a local proposal nor an archived block: we cannot verify membership/order of the + // returned txs. This is a local-state gap, not a peer fault, so we do not penalize. + this.logger.warn( + `Block ${request.archiveRoot.toString()} not found in attestation pool or archiver; cannot validate membership/order of returned txs`, + ); + return false; + } + + return true; + } catch (e: any) { + if (e instanceof ValidationError) { + this.logger.warn(`Failed validation for requested block txs from peer ${peerId.toString()}`); + } else { + this.logger.error(`Error during validation of requested block txs`, e); + } + + return false; + } + } + + public async validateTxsReceivedInBlockProposal(txs: Tx[]): Promise { + const validator = createTxValidatorForBlockProposalReceivedTxs( + this.proofVerifier, + { l1ChainId: this.config.l1ChainId, rollupVersion: this.config.rollupVersion }, + this.logger.getBindings(), + this.txValidationCache, + ); + + const results = await Promise.all( + txs.map(async tx => { + const result = await validator.validateTx(tx); + return result.result !== 'invalid'; + }), + ); + if (results.some(value => value === false)) { + throw new Error('Invalid tx detected'); + } + } + + /** Builds the request/response sub-protocol handlers that serve data from node state (status, tx, block txs). */ + public createReqRespDataHandlers(protocolVersion: string): Partial { + const handlers: Partial = { + [ReqRespSubProtocol.STATUS]: reqRespStatusHandler(protocolVersion, this.worldStateSynchronizer, this.logger), + }; + + if (!this.config.disableTransactions) { + handlers[ReqRespSubProtocol.BLOCK_TXS] = reqRespBlockTxsHandler( + this.mempools.attestationPool, + this.archiver, + this.mempools.txPool, + ); + handlers[ReqRespSubProtocol.TX] = reqRespTxHandler(this.mempools); + } + + return handlers; + } + + /** Returns the tx validator configuration used by the batch tx requester. */ + public getBatchTxValidatorConfig(): BatchRequestTxValidatorConfig { + return { + l1ChainId: this.config.l1ChainId, + rollupVersion: this.config.rollupVersion, + proofVerifier: this.proofVerifier, + txValidationCache: this.txValidationCache, + }; + } + + private getGasFees(): Promise { + return this.blockMinFeesProvider.getCurrentMinFees(); + } + + /** Creates the first stage (fast) validators for gossiped transactions. */ + protected async createFirstStageMessageValidators( + currentBlockNumber: BlockNumber, + nextSlotTimestamp: UInt64, + ): Promise> { + const gasFees = await this.getGasFees(); + const allowedInSetup = [ + ...(await getDefaultAllowedSetupFunctions()), + ...(this.config.txPublicSetupAllowListExtend ?? []), + ]; + const blockNumber = BlockNumber(currentBlockNumber + 1); + const l1Constants = await this.archiver.getL1Constants(); + + return createFirstStageTxValidationsForGossipedTransactions( + nextSlotTimestamp, + blockNumber, + this.worldStateSynchronizer, + gasFees, + this.config.l1ChainId, + this.config.rollupVersion, + protocolContractsHash, + this.archiver, + !this.config.disableTransactions, + allowedInSetup, + this.logger.getBindings(), + { + rollupManaLimit: l1Constants.rollupManaLimit, + maxBlockL2Gas: this.config.validateMaxL2BlockGas, + maxBlockDAGas: this.config.validateMaxDABlockGas, + }, + ); + } + + /** Creates the second stage (expensive proof verification) validators for gossiped transactions. */ + protected createSecondStageMessageValidators(): Record { + return createSecondStageTxValidationsForGossipedTransactions(this.proofVerifier, this.logger.getBindings()); + } + + /** + * Run validations on a tx. + * @param tx - The tx to validate. + * @param messageValidators - The message validators to run. + * @returns The validation outcome. + */ + private async runValidations( + tx: Tx, + messageValidators: Record, + ): Promise { + const validationPromises = Object.entries(messageValidators).map(async ([name, { validator, severity }]) => { + const { result } = await validator.validateTx(tx); + return { name, isValid: result !== 'invalid', severity }; + }); + + // A promise that resolves when all validations have been run + const allValidations = await Promise.all(validationPromises); + const failures = allValidations.filter(x => !x.isValid); + if (failures.length > 0) { + // Pick the most severe failure (lowest tolerance = harshest penalty) + const failed = maxBy(failures, f => PeerErrorSeverityByHarshness.indexOf(f.severity))!; + return { + allPassed: false, + failure: { + isValid: { result: 'invalid' as const, reason: ['Failed validation'] }, + name: failed.name, + severity: failed.severity, + }, + }; + } else { + return { + allPassed: true, + }; + } + } + + /** + * Handle a double spend failure. + * + * Double spend failures are managed on their own because they are a special case. + * We must check if the double spend is recent or old, if it is past a threshold, then we heavily penalize the peer. + * + * @param tx - The tx that failed the double spend validator. + * @param blockNumber - The block number of the tx. + * @param peerId - The peer ID of the peer that sent the tx. + * @returns Severity + */ + private async handleDoubleSpendFailure(tx: Tx, blockNumber: BlockNumber): Promise { + if (blockNumber <= this.config.doubleSpendSeverePeerPenaltyWindow) { + return PeerErrorSeverity.HighToleranceError; + } + + const snapshotValidator = new DoubleSpendTxValidator( + { + nullifiersExist: async (nullifiers: Buffer[]) => { + const merkleTree = this.worldStateSynchronizer.getSnapshot( + BlockNumber(blockNumber - this.config.doubleSpendSeverePeerPenaltyWindow), + ); + const indices = await merkleTree.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, nullifiers); + return indices.map(index => index !== undefined); + }, + }, + this.logger.getBindings(), + ); + + const validSnapshot = await snapshotValidator.validateTx(tx); + if (validSnapshot.result !== 'valid') { + return PeerErrorSeverity.LowToleranceError; + } + + return PeerErrorSeverity.HighToleranceError; + } +} diff --git a/yarn-project/p2p/src/services/service.ts b/yarn-project/p2p/src/services/service.ts index a2f550da5259..03581f06423d 100644 --- a/yarn-project/p2p/src/services/service.ts +++ b/yarn-project/p2p/src/services/service.ts @@ -95,28 +95,6 @@ export interface P2PService { */ propagate(message: T): Promise; - // Leaky abstraction: fix https://github.com/AztecProtocol/aztec-packages/issues/7963 - registerBlockReceivedCallback(callback: P2PBlockReceivedCallback): void; - - registerValidatorCheckpointReceivedCallback(callback: P2PCheckpointReceivedCallback): void; - - registerAllNodesCheckpointReceivedCallback(callback: P2PCheckpointReceivedCallback): void; - - /** - * Registers a callback invoked when a duplicate proposal is detected (equivocation). - * The callback is triggered on the first duplicate (when count goes from 1 to 2). - */ - registerDuplicateProposalCallback(callback: P2PDuplicateProposalCallback): void; - - /** - * Registers a callback invoked when a duplicate attestation is detected (equivocation). - * A validator signing attestations for different proposals at the same slot. - * The callback is triggered on the first duplicate (when count goes from 1 to 2). - */ - registerDuplicateAttestationCallback(callback: P2PDuplicateAttestationCallback): void; - - registerCheckpointAttestationCallback(callback: P2PCheckpointAttestationCallback): void; - getEnr(): ENR | undefined; getPeers(includePending?: boolean): PeerInfo[]; diff --git a/yarn-project/p2p/src/test-helpers/mock-pubsub.ts b/yarn-project/p2p/src/test-helpers/mock-pubsub.ts index ac1f1109b9ac..d1ee01b0865e 100644 --- a/yarn-project/p2p/src/test-helpers/mock-pubsub.ts +++ b/yarn-project/p2p/src/test-helpers/mock-pubsub.ts @@ -1,12 +1,5 @@ -import type { EpochCacheInterface } from '@aztec/epoch-cache'; -import { type Logger, createLogger } from '@aztec/foundation/log'; +import { createLogger } from '@aztec/foundation/log'; import { sleep } from '@aztec/foundation/sleep'; -import type { AztecAsyncKVStore } from '@aztec/kv-store'; -import type { L2BlockSource } from '@aztec/stdlib/block'; -import type { ContractDataSource } from '@aztec/stdlib/contract'; -import type { BlockMinFeesProvider } from '@aztec/stdlib/gas'; -import type { ClientProtocolCircuitVerifier, WorldStateSynchronizer } from '@aztec/stdlib/interfaces/server'; -import type { TelemetryClient } from '@aztec/telemetry-client'; import type { GossipsubEvents, GossipsubMessage } from '@chainsafe/libp2p-gossipsub'; import type { MsgIdStr, PeerIdStr, PublishOpts, TopicStr } from '@chainsafe/libp2p-gossipsub/types'; @@ -18,8 +11,6 @@ import { TypedEventEmitter, } from '@libp2p/interface'; -import type { P2PConfig } from '../config.js'; -import type { MemPools } from '../mem_pools/interface.js'; import { DummyPeerDiscoveryService, DummyPeerManager, LibP2PService } from '../services/index.js'; import type { P2PReqRespConfig } from '../services/reqresp/config.js'; import type { ConnectionSampler } from '../services/reqresp/connection-sampler/connection_sampler.js'; @@ -43,22 +34,7 @@ type GossipSubService = PubSubLibp2p['services']['pubsub']; export function getMockPubSubP2PServiceFactory( network: MockGossipSubNetwork, ): (...args: Parameters<(typeof LibP2PService)['new']>) => Promise { - return ( - config: P2PConfig, - peerId: PeerId, - deps: { - packageVersion: string; - mempools: MemPools; - l2BlockSource: L2BlockSource & ContractDataSource; - epochCache: EpochCacheInterface; - proofVerifier: ClientProtocolCircuitVerifier; - worldStateSynchronizer: WorldStateSynchronizer; - peerStore: AztecAsyncKVStore; - blockMinFeesProvider: BlockMinFeesProvider; - telemetry: TelemetryClient; - logger: Logger; - }, - ) => { + return (...[config, peerId, deps]: Parameters<(typeof LibP2PService)['new']>) => { deps.logger.verbose('Creating mock PubSub service'); const libp2p = new MockPubSub(peerId, network); const peerManager = new DummyPeerManager(peerId, network); @@ -70,12 +46,7 @@ export function getMockPubSubP2PServiceFactory( peerDiscoveryService, reqresp, peerManager, - deps.mempools, - deps.l2BlockSource, - deps.epochCache, - deps.proofVerifier, - deps.worldStateSynchronizer, - deps.blockMinFeesProvider, + deps.processor, deps.telemetry, deps.logger, ); diff --git a/yarn-project/p2p/src/test-helpers/reqresp-nodes.ts b/yarn-project/p2p/src/test-helpers/reqresp-nodes.ts index 8f9bbbbda460..57bd0c127a66 100644 --- a/yarn-project/p2p/src/test-helpers/reqresp-nodes.ts +++ b/yarn-project/p2p/src/test-helpers/reqresp-nodes.ts @@ -36,6 +36,7 @@ import type { MemPools } from '../mem_pools/interface.js'; import { DiscV5Service } from '../services/discv5/discV5_service.js'; import { APP_SPECIFIC_WEIGHT } from '../services/gossipsub/scoring.js'; import { LibP2PService } from '../services/libp2p/libp2p_service.js'; +import { P2PMessageProcessor } from '../services/libp2p/p2p_message_processor.js'; import { PeerManager } from '../services/peer-manager/peer_manager.js'; import { PeerScoring } from '../services/peer-manager/peer_scoring.js'; import type { P2PReqRespConfig } from '../services/reqresp/config.js'; @@ -162,12 +163,8 @@ export async function createTestLibP2PService( p2pNode.services.pubsub.score.params.appSpecificScore = (peerId: string) => peerManager.shouldDisableP2PGossip(peerId) ? -Infinity : peerManager.getPeerScore(peerId); - return new LibP2PService( + const processor = new P2PMessageProcessor( config, - p2pNode as PubSubLibp2p, - discoveryService, - reqresp, - peerManager, mempools, archiver, epochCache, @@ -176,6 +173,16 @@ export async function createTestLibP2PService( { getCurrentMinFees: () => Promise.resolve(GasFees.empty()) }, telemetry, ); + + return new LibP2PService( + config, + p2pNode as PubSubLibp2p, + discoveryService, + reqresp, + peerManager, + processor, + telemetry, + ); } /** diff --git a/yarn-project/p2p/src/testbench/p2p_client_testbench_worker.ts b/yarn-project/p2p/src/testbench/p2p_client_testbench_worker.ts index 274c86272f13..ec5d9a72d366 100644 --- a/yarn-project/p2p/src/testbench/p2p_client_testbench_worker.ts +++ b/yarn-project/p2p/src/testbench/p2p_client_testbench_worker.ts @@ -4,7 +4,6 @@ * Used when running testbench commands. */ import { MockL2BlockSource } from '@aztec/archiver/test'; -import type { EpochCacheInterface } from '@aztec/epoch-cache'; import { BlockNumber } from '@aztec/foundation/branded-types'; import { SecretValue } from '@aztec/foundation/config'; import { Secp256k1Signer } from '@aztec/foundation/crypto/secp256k1-signer'; @@ -15,10 +14,8 @@ import { DateProvider, Timer } from '@aztec/foundation/timer'; import { openTmpStore } from '@aztec/kv-store/lmdb-v2'; import { getVKTreeRoot } from '@aztec/noir-protocol-circuits-types/vk-tree'; import { protocolContractsHash } from '@aztec/protocol-contracts'; -import type { L2BlockSource } from '@aztec/stdlib/block'; -import type { ContractDataSource } from '@aztec/stdlib/contract'; import { GasFees } from '@aztec/stdlib/gas'; -import type { ClientProtocolCircuitVerifier, WorldStateSynchronizer } from '@aztec/stdlib/interfaces/server'; +import type { ClientProtocolCircuitVerifier } from '@aztec/stdlib/interfaces/server'; import type { DataStoreConfig } from '@aztec/stdlib/kv-store'; import { type BlockProposal, P2PMessage } from '@aztec/stdlib/p2p'; import { ChonkProof } from '@aztec/stdlib/proofs'; @@ -35,6 +32,7 @@ import type { P2PConfig } from '../config.js'; import { createP2PClient } from '../index.js'; import type { MemPools } from '../mem_pools/index.js'; import { LibP2PService } from '../services/index.js'; +import type { P2PMessageProcessor } from '../services/libp2p/p2p_message_processor.js'; import type { PeerManager } from '../services/peer-manager/peer_manager.js'; import { BatchTxRequester } from '../services/reqresp/batch-tx-requester/batch_tx_requester.js'; import type { BatchTxRequesterLibP2PService } from '../services/reqresp/batch-tx-requester/interface.js'; @@ -94,30 +92,13 @@ class TestLibP2PService extends LibP2PService { peerDiscoveryService: PeerDiscoveryService, reqresp: ReqResp, peerManager: PeerManager, - mempools: MemPools, - archiver: L2BlockSource & ContractDataSource, - epochCache: EpochCacheInterface, - proofVerifier: ClientProtocolCircuitVerifier, - worldStateSynchronizer: WorldStateSynchronizer, + processor: P2PMessageProcessor, + private readonly mempools: MemPools, telemetry: TelemetryClient, logger = createLogger('p2p:test:libp2p_service'), disableTxValidation = true, ) { - super( - config, - node, - peerDiscoveryService, - reqresp, - peerManager, - mempools, - archiver, - epochCache, - proofVerifier, - worldStateSynchronizer, - { getCurrentMinFees: () => Promise.resolve(GasFees.empty()) }, - telemetry, - logger, - ); + super(config, node, peerDiscoveryService, reqresp, peerManager, processor, telemetry, logger); this.disableTxValidation = disableTxValidation; } @@ -407,11 +388,8 @@ process.on('message', async msg => { (client as any).p2pService.peerDiscoveryService, (client as any).p2pService.reqresp, (client as any).p2pService.peerManager, - (client as any).p2pService.mempools, - (client as any).p2pService.archiver, - epochCache, - proofVerifier, - worldState, + (client as any).p2pService.processor, + (client as any).p2pService.processor.mempools, telemetry as TelemetryClient, workerLogger, true,