Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions yarn-project/p2p/src/client/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
import { TxValidationCache } from '../msg_validators/tx_validator/tx_validation_cache.js';
import { DummyP2PService } from '../services/dummy_service.js';
import { LibP2PService } from '../services/index.js';
import { P2PMessageProcessor } from '../services/libp2p/p2p_message_processor.js';
import { createFileStoreTxSources } from '../services/tx_collection/file_store_tx_source.js';
import { TxCollection } from '../services/tx_collection/tx_collection.js';
import { NodeRpcTxSource, type TxSource, createNodeRpcTxSources } from '../services/tx_collection/tx_source.js';
Expand Down Expand Up @@ -141,21 +142,33 @@ export async function createP2PClient(
const txValidationCache =
config.txValidationCacheSize > 0 ? new TxValidationCache(config.txValidationCacheSize) : undefined;

const p2pService = await createP2PService(
// The message processor owns received-message handling (validation, persistence, consensus callbacks).
// It is shared between the P2PClient (which registers callbacks on it) and the libp2p service (which
// invokes it when messages arrive), so it is built here and injected into both.
const processor = new P2PMessageProcessor(
config,
mempools,
archiver,
epochCache,
proofVerifier,
worldStateSynchronizer,
epochCache,
blockMinFeesProvider,
telemetry,
logger.createChild('libp2p_service'),
txValidationCache,
);

const p2pService = await createP2PService(
config,
worldStateSynchronizer,
epochCache,
store,
peerStore,
mempools,
processor,
deps.p2pServiceFactory,
packageVersion,
logger.createChild('libp2p_service'),
telemetry,
txValidationCache,
);

const txValidatorForTxCollection = createTxValidatorForOnDemandReceivedTxs(
Expand Down Expand Up @@ -215,6 +228,7 @@ export async function createP2PClient(
archiver,
mempools,
p2pService,
processor,
txCollection,
txFileStore,
epochCache,
Expand All @@ -228,19 +242,15 @@ export async function createP2PClient(

async function createP2PService(
config: P2PConfig & DataStoreConfig,
archiver: L2BlockSource & ContractDataSource,
proofVerifier: ClientProtocolCircuitVerifier,
worldStateSynchronizer: WorldStateSynchronizer,
epochCache: EpochCacheInterface,
blockMinFeesProvider: BlockMinFeesProvider,
store: AztecAsyncKVStore,
peerStore: AztecLMDBStoreV2,
mempools: MemPools,
processor: P2PMessageProcessor,
p2pServiceFactory: P2PClientDeps['p2pServiceFactory'],
packageVersion: string,
logger: Logger,
telemetry: TelemetryClient,
txValidationCache?: TxValidationCache,
) {
if (!config.p2pEnabled) {
logger.verbose('P2P is disabled. Using dummy P2P service.');
Expand All @@ -255,16 +265,12 @@ async function createP2PService(

const p2pService = await (p2pServiceFactory ?? LibP2PService.new)(config, peerId, {
packageVersion,
mempools,
l2BlockSource: archiver,
processor,
epochCache,
proofVerifier,
worldStateSynchronizer,
peerStore,
blockMinFeesProvider,
telemetry,
logger: logger.createChild(`libp2p_service`),
txValidationCache,
});

return p2pService;
Expand Down
2 changes: 2 additions & 0 deletions yarn-project/p2p/src/client/p2p_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import type { P2PService } from '../index.js';
import { type AttestationPool, createTestAttestationPool } from '../mem_pools/attestation_pool/attestation_pool.js';
import type { MemPools } from '../mem_pools/interface.js';
import type { TxPoolV2 } from '../mem_pools/tx_pool_v2/interfaces.js';
import type { P2PMessageProcessor } from '../services/libp2p/p2p_message_processor.js';
import type { TxCollection } from '../services/tx_collection/tx_collection.js';
import { P2PClient } from './p2p_client.js';

Expand Down Expand Up @@ -68,6 +69,7 @@ describe('P2P Client', () => {
blockSource,
mempools,
p2pService,
mock<P2PMessageProcessor>(),
txCollection,
undefined,
epochCache,
Expand Down
18 changes: 10 additions & 8 deletions yarn-project/p2p/src/client/p2p_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import type { AttestationPoolApi, ProposalsForSlot } from '../mem_pools/attestat
import type { MemPools } from '../mem_pools/interface.js';
import type { TxPoolV2 } from '../mem_pools/tx_pool_v2/interfaces.js';
import type { AuthRequest, StatusMessage } from '../services/index.js';
import type { P2PMessageProcessor } from '../services/libp2p/p2p_message_processor.js';
import { ReqRespSubProtocol, type ReqRespSubProtocolHandler } from '../services/reqresp/interface.js';
import type {
DuplicateAttestationInfo,
Expand Down Expand Up @@ -91,6 +92,7 @@ export class P2PClient extends WithTracer implements P2P {
private l2BlockSource: L2BlockSource & ContractDataSource,
mempools: MemPools,
private p2pService: P2PService,
private processor: P2PMessageProcessor,
private txCollection: TxCollection,
private txFileStore: TxFileStore | undefined,
private epochCache: EpochCacheInterface,
Expand Down Expand Up @@ -406,30 +408,30 @@ export class P2PClient extends WithTracer implements P2P {
return this.attestationPool.hasBlockProposalsForSlot(slot);
}

// REVIEW: https://github.com/AztecProtocol/aztec-packages/issues/7963
// ^ This pattern is not my favorite (md)
// Consensus callbacks are registered directly on the message processor (the main-thread component
// that owns received-message handling), not on the libp2p networking service.
public registerBlockProposalHandler(handler: P2PBlockReceivedCallback): void {
this.p2pService.registerBlockReceivedCallback(handler);
this.processor.registerBlockReceivedCallback(handler);
}

public registerValidatorCheckpointProposalHandler(handler: P2PCheckpointReceivedCallback): void {
this.p2pService.registerValidatorCheckpointReceivedCallback(handler);
this.processor.registerValidatorCheckpointReceivedCallback(handler);
}

public registerAllNodesCheckpointProposalHandler(handler: P2PCheckpointReceivedCallback): void {
this.p2pService.registerAllNodesCheckpointReceivedCallback(handler);
this.processor.registerAllNodesCheckpointReceivedCallback(handler);
}

public registerDuplicateProposalCallback(callback: (info: DuplicateProposalInfo) => void): void {
this.p2pService.registerDuplicateProposalCallback(callback);
this.processor.registerDuplicateProposalCallback(callback);
}

public registerDuplicateAttestationCallback(callback: (info: DuplicateAttestationInfo) => void): void {
this.p2pService.registerDuplicateAttestationCallback(callback);
this.processor.registerDuplicateAttestationCallback(callback);
}

public registerCheckpointAttestationCallback(callback: (attestation: CheckpointAttestation) => void): void {
this.p2pService.registerCheckpointAttestationCallback(callback);
this.processor.registerCheckpointAttestationCallback(callback);
}

public async getPendingTxs(limit?: number, after?: TxHash): Promise<Tx[]> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ describe('p2p client integration block txs protocol ', () => {

// Run the response through client1's real validation — this is what BatchTxRequester does in
// production. The peer must not be penalized.
await (client1.p2pService as any).validateRequestedBlockTxsConsistency(
await (client1.p2pService as any).processor.validateRequestedBlockTxsConsistency(
request,
decoded,
client2.p2pService.node.peerId,
Expand Down
38 changes: 1 addition & 37 deletions yarn-project/p2p/src/services/dummy_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,12 @@ import type {
} from './reqresp/interface.js';
import type { GoodByeReason } from './reqresp/protocols/goodbye.js';
import { ReqRespStatus } from './reqresp/status.js';
import {
type P2PBlockReceivedCallback,
type P2PCheckpointAttestationCallback,
type P2PCheckpointReceivedCallback,
type P2PDuplicateAttestationCallback,
type P2PDuplicateProposalCallback,
type P2PService,
type PeerDiscoveryService,
PeerDiscoveryState,
} from './service.js';
import { type P2PService, type PeerDiscoveryService, PeerDiscoveryState } from './service.js';

/**
* A dummy implementation of the P2P Service.
*/
export class DummyP2PService implements P2PService {
private allNodesCheckpointReceivedCallback?: P2PCheckpointReceivedCallback;

updateConfig(_config: Partial<P2PReqRespConfig>): void {}

/** Returns an empty array for peers. */
Expand Down Expand Up @@ -80,31 +69,6 @@ export class DummyP2PService implements P2PService {
*/
public settledTxs(_: TxHash[]) {}

/**
* Register a callback into the validator client for when a block proposal is received
*/
public registerBlockReceivedCallback(_callback: P2PBlockReceivedCallback) {}

/**
* Register a callback into the validator client for when a checkpoint proposal is received
*/
public registerValidatorCheckpointReceivedCallback(_callback: P2PCheckpointReceivedCallback) {}
public registerAllNodesCheckpointReceivedCallback(callback: P2PCheckpointReceivedCallback) {
this.allNodesCheckpointReceivedCallback = callback;
}

/**
* Register a callback for when a duplicate proposal is detected
*/
public registerDuplicateProposalCallback(_callback: P2PDuplicateProposalCallback): void {}

/**
* Register a callback for when a duplicate attestation is detected
*/
public registerDuplicateAttestationCallback(_callback: P2PDuplicateAttestationCallback): void {}

public registerCheckpointAttestationCallback(_callback: P2PCheckpointAttestationCallback): void {}

/**
* Sends a request to a peer.
* @param _protocol - The protocol to send the request on.
Expand Down
Loading
Loading