Defer producer configuration building#115
Conversation
✅ Snyk checks have passed. No issues have been found so far.
💻 Catch issues earlier using the plugins for VS Code, JetBrains IDEs, Visual Studio, and Eclipse. |
toizo
left a comment
There was a problem hiding this comment.
it looks good to me, except the one mistake in the assertion
There was a problem hiding this comment.
Pull request overview
This PR updates Dafda’s producer registration APIs to support deferring ProducerOptions creation until the IServiceProvider is available, enabling DI-dependent configuration (e.g., IConfiguration) while keeping producer services transient and the underlying factory infrastructure singleton.
Changes:
- Add
AddProducerForoverloads that acceptFunc<IServiceProvider, ProducerOptions>for DI-aware producer configuration. - Refactor producer infrastructure: replace the old shared
ProducerFactoryimplementation with a typed singleton factory per producer implementation type. - Update Kafka producer factory plumbing to accept
IServiceProviderand adjust tests accordingly.
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| src/Dafda/Producing/ProducerFactoryException.cs | Update XML docs to reference all AddProducerFor overloads that can throw duplicate-registration exceptions. |
| src/Dafda/Producing/ProducerFactory.cs | Remove legacy producer factory implementation. |
| src/Dafda/Producing/Producer.cs | Expose internal KafkaProducer for test verification and update usage accordingly. |
| src/Dafda/Configuration/ProducerServiceCollectionExtensions.cs | Add DI-aware overloads and introduce typed singleton ProducerFactory<TImplementation> used by transient producer services. |
| src/Dafda/Configuration/ProducerOptions.cs | Make ProducerOptions instantiable for factory-based configuration; update internal wiring for builder/registry. |
| src/Dafda/Configuration/ProducerConfigurationBuilder.cs | Change Kafka producer factory delegate to Func<IServiceProvider, KafkaProducer> and construct default producer using DI-resolved ILoggerFactory. |
| src/Dafda/Configuration/ProducerConfiguration.cs | Update stored Kafka producer factory delegate type to accept IServiceProvider. |
| src/Dafda/Configuration/OutboxServiceCollectionExtensions.cs | Update outbox producer creation to invoke KafkaProducerFactory with IServiceProvider. |
| src/Dafda/Configuration/OutboxProducerOptions.cs | Update internal Kafka producer factory delegate type to accept IServiceProvider. |
| src/Dafda.Tests/Producing/TestProducerFactory.cs | Remove tests for legacy ProducerFactory implementation. |
| src/Dafda.Tests/Configuration/TestServiceCollectionExtensions.cs | Add coverage for DI-aware producer options factories and singleton Kafka producer reuse; update name assertions. |
| src/Dafda.Tests/Builders/ProducerFactoryBuilder.cs | Remove legacy test builder tied to deleted factory. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 12 out of 12 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 12 out of 12 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| public Producer CreateProducerInstance(IServiceProvider provider) | ||
| { | ||
| var outgoingMessageRegistry = new OutgoingMessageRegistry(); | ||
| var configurationBuilder = new ProducerConfigurationBuilder(); | ||
| var consumerOptions = new ProducerOptions(configurationBuilder, outgoingMessageRegistry); | ||
| options?.Invoke(consumerOptions); | ||
| // Use LazyInitializer to ensure thread-safe, once-only construction while | ||
| // capturing `provider` in the factory delegate. | ||
| LazyInitializer.EnsureInitialized( | ||
| ref _kafkaProducer, | ||
| () => new Lazy<KafkaProducer>(() => _configuration.KafkaProducerFactory(provider), isThreadSafe: true)); | ||
|
|
There was a problem hiding this comment.
ProducerFactory<TService> is registered as a singleton, but the lazy initialization captures the provider passed to CreateProducerInstance. If the first resolution happens from a scoped provider (e.g., inside an ASP.NET request), that scoped IServiceProvider (and potentially scoped services) will be retained by the singleton for the lifetime of the app, causing a scope leak and potentially resolving scoped dependencies into a singleton KafkaProducer. Consider capturing the root provider at singleton construction time (e.g., pass it into ProducerFactory<TService> via the AddSingleton(provider => ...) delegate) and use that stored root provider when creating the KafkaProducer instead of the per-resolution provider.
There was a problem hiding this comment.
Very complex and edge case, but agree 😆
| if (services.Any(d => d.ServiceType == typeof(TService))) | ||
| { | ||
| throw new ProducerFactoryException( | ||
| $"A producer has already been registered for service type \"{typeof(TService).FullName}\". Each producer must use a unique service type."); |
There was a problem hiding this comment.
The duplicate-registration guard checks whether any service descriptor already exists for TService, not whether an existing producer registration exists. In that case the exception message ("A producer has already been registered...") can be misleading because the existing registration might be unrelated. Consider rewording to indicate the service type is already registered (or tightening the check to detect Dafda producer registrations specifically).
| $"A producer has already been registered for service type \"{typeof(TService).FullName}\". Each producer must use a unique service type."); | |
| $"Service type \"{typeof(TService).FullName}\" is already registered in the dependency injection container. Each Dafda producer must use a unique service type."); |
There was a problem hiding this comment.
I think it makes more sense, change it.
| ThrowIfProducerServiceAlreadyRegisteredFor<TService>(services); | ||
|
|
There was a problem hiding this comment.
ThrowIfProducerServiceAlreadyRegisteredFor introduces new behavior (throwing ProducerFactoryException on duplicates), but the updated test suite no longer has coverage asserting that registering the same producer service type twice fails. Consider adding a test that calls AddProducerFor<T> twice for the same TService and asserts the expected ProducerFactoryException is thrown.
| /// Exception thrown when multiple identical producers are added via any overload of | ||
| /// <see cref="ProducerServiceCollectionExtensions.AddProducerFor{TClient}(Microsoft.Extensions.DependencyInjection.IServiceCollection,System.Action{Dafda.Configuration.ProducerOptions})"/>, | ||
| /// <see cref="ProducerServiceCollectionExtensions.AddProducerFor{TService,TImplementation}(Microsoft.Extensions.DependencyInjection.IServiceCollection,System.Action{Dafda.Configuration.ProducerOptions})"/>, | ||
| /// <see cref="ProducerServiceCollectionExtensions.AddProducerFor{TClient}(Microsoft.Extensions.DependencyInjection.IServiceCollection,System.Func{System.IServiceProvider,Dafda.Configuration.ProducerOptions})"/>, or | ||
| /// <see cref="ProducerServiceCollectionExtensions.AddProducerFor{TService,TImplementation}(Microsoft.Extensions.DependencyInjection.IServiceCollection,System.Func{System.IServiceProvider,Dafda.Configuration.ProducerOptions})"/>. |
There was a problem hiding this comment.
The exception is now thrown when AddProducerFor detects that TService is already present in the IServiceCollection, even if that existing registration is not a Dafda producer registration. The summary currently says "multiple identical producers" were added, which may be inaccurate/misleading in that scenario. Consider updating the XML doc to describe the actual condition (duplicate service type registration for a producer).
There was a problem hiding this comment.
It helps better debugging it for the user, I believe. Updated it 👍🏻
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 12 out of 12 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| internal class ProducerConfiguration | ||
| { | ||
| public ProducerConfiguration(IDictionary<string, string> configuration, MessageIdGenerator messageIdGenerator, Func<ILoggerFactory, KafkaProducer> kafkaProducerFactory) | ||
| public ProducerConfiguration(IDictionary<string, string> configuration, MessageIdGenerator messageIdGenerator, Func<IServiceProvider, KafkaProducer> kafkaProducerFactory) |
There was a problem hiding this comment.
ProducerConfiguration no longer references ILoggerFactory, but the file still imports Microsoft.Extensions.Logging. With TreatWarningsAsErrors=true, this will cause a build failure (CS8019). Remove the unused using directive.
What changed
Added new
AddProducerForoverloads that allowProducerOptionsto be created using the builtIServiceProvider.Previously, producers could only be configured at registration time:
This required configuration to be defined before the DI container was available.
Now producers can also be configured using a factory:
Implementation
The producer factory is now registered using a DI-aware singleton:
This ensures that:
ProducerOptionscan be constructed using services from the DI containerProducerFactoryis created once per producer typeProducerFactoryremains singleton because it holds configuration and infrastructure required to construct producer instances, while the producer services themselves are still registered as transient.Why
The previous
Action<ProducerOptions>approach built configuration too early and could not access services from the DI container.This created limitations:
IConfiguration)By allowing
ProducerOptionsto be created usingIServiceProvider:Net Effect