Skip to content

Defer producer configuration building#115

Merged
toizo merged 25 commits into
dfds:masterfrom
nabisobhi:nabi/new-add-producers
Mar 24, 2026
Merged

Defer producer configuration building#115
toizo merged 25 commits into
dfds:masterfrom
nabisobhi:nabi/new-add-producers

Conversation

@nabisobhi
Copy link
Copy Markdown
Contributor

@nabisobhi nabisobhi commented Mar 3, 2026

What changed

Added new AddProducerFor overloads that allow ProducerOptions to be created using the built IServiceProvider.

Previously, producers could only be configured at registration time:

services.AddProducerFor<MyProducer>(options =>
{
    options.WithBootstrapServers("...");
});

This required configuration to be defined before the DI container was available.

Now producers can also be configured using a factory:

services.AddProducerFor<MyProducer>(provider =>
{
    var configuration = provider.GetRequiredService<IConfiguration>();

    var options = new ProducerOptions();
    options.WithBootstrapServers(configuration["Kafka:BootstrapServers"]);

    return options;
});

Implementation

The producer factory is now registered using a DI-aware singleton:

services.AddSingleton(provider =>
{
    var options = optionsFactory(provider);
    return new ProducerFactory<TImplementation>(options);
});

This ensures that:

  • ProducerOptions can be constructed using services from the DI container
  • The ProducerFactory is created once per producer type
  • All producers resolved through DI share the same factory instance

ProducerFactory remains singleton because it holds configuration and infrastructure required to construct producer instances, while the producer services themselves are still registered as transient.

Why

The previous Action<ProducerOptions> approach built configuration too early and could not access services from the DI container.

This created limitations:

  • Producer configuration could not depend on other services (e.g. IConfiguration)
  • Integration tests could not override dependencies cleanly
  • Configuration values were effectively fixed at registration time

By allowing ProducerOptions to be created using IServiceProvider:

  • The full DI container is available when building configuration
  • Overrides (especially in integration tests) are respected
  • Configuration can depend on runtime services

Net Effect

  • Producers can now be configured using runtime dependencies
  • Improved integration test flexibility
  • Better alignment with dependency injection patterns
  • Clear separation between registration-time and runtime configuration

@DFDS-Snyk
Copy link
Copy Markdown

DFDS-Snyk commented Mar 3, 2026

Snyk checks have passed. No issues have been found so far.

Status Scan Engine Critical High Medium Low Total (0)
Open Source Security 0 0 0 0 0 issues
Licenses 0 0 0 0 0 issues
Code Security 0 0 0 0 0 issues

💻 Catch issues earlier using the plugins for VS Code, JetBrains IDEs, Visual Studio, and Eclipse.

@nabisobhi nabisobhi changed the title Defer producer configuration building WIP: Defer producer configuration building Mar 3, 2026
@nabisobhi nabisobhi changed the title WIP: Defer producer configuration building WIP: Add IServiceProvider-based configuration for producers Mar 10, 2026
@nabisobhi nabisobhi changed the title WIP: Add IServiceProvider-based configuration for producers WIP: Defer consumer configuration building Mar 10, 2026
@nabisobhi nabisobhi changed the title WIP: Defer consumer configuration building WIP: Defer producer configuration building Mar 10, 2026
@nabisobhi nabisobhi changed the title WIP: Defer producer configuration building Defer producer configuration building Mar 16, 2026
@nabisobhi nabisobhi marked this pull request as ready for review March 16, 2026 09:51
Comment thread src/Dafda/Configuration/ProducerServiceCollectionExtensions.cs Outdated
Copy link
Copy Markdown
Collaborator

@toizo toizo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks good to me, except the one mistake in the assertion

Comment thread src/Dafda.Tests/Configuration/TestServiceCollectionExtensions.cs Outdated
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates Dafda’s producer registration APIs to support deferring ProducerOptions creation until the IServiceProvider is available, enabling DI-dependent configuration (e.g., IConfiguration) while keeping producer services transient and the underlying factory infrastructure singleton.

Changes:

  • Add AddProducerFor overloads that accept Func<IServiceProvider, ProducerOptions> for DI-aware producer configuration.
  • Refactor producer infrastructure: replace the old shared ProducerFactory implementation with a typed singleton factory per producer implementation type.
  • Update Kafka producer factory plumbing to accept IServiceProvider and adjust tests accordingly.

Reviewed changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
src/Dafda/Producing/ProducerFactoryException.cs Update XML docs to reference all AddProducerFor overloads that can throw duplicate-registration exceptions.
src/Dafda/Producing/ProducerFactory.cs Remove legacy producer factory implementation.
src/Dafda/Producing/Producer.cs Expose internal KafkaProducer for test verification and update usage accordingly.
src/Dafda/Configuration/ProducerServiceCollectionExtensions.cs Add DI-aware overloads and introduce typed singleton ProducerFactory<TImplementation> used by transient producer services.
src/Dafda/Configuration/ProducerOptions.cs Make ProducerOptions instantiable for factory-based configuration; update internal wiring for builder/registry.
src/Dafda/Configuration/ProducerConfigurationBuilder.cs Change Kafka producer factory delegate to Func<IServiceProvider, KafkaProducer> and construct default producer using DI-resolved ILoggerFactory.
src/Dafda/Configuration/ProducerConfiguration.cs Update stored Kafka producer factory delegate type to accept IServiceProvider.
src/Dafda/Configuration/OutboxServiceCollectionExtensions.cs Update outbox producer creation to invoke KafkaProducerFactory with IServiceProvider.
src/Dafda/Configuration/OutboxProducerOptions.cs Update internal Kafka producer factory delegate type to accept IServiceProvider.
src/Dafda.Tests/Producing/TestProducerFactory.cs Remove tests for legacy ProducerFactory implementation.
src/Dafda.Tests/Configuration/TestServiceCollectionExtensions.cs Add coverage for DI-aware producer options factories and singleton Kafka producer reuse; update name assertions.
src/Dafda.Tests/Builders/ProducerFactoryBuilder.cs Remove legacy test builder tied to deleted factory.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/Dafda/Configuration/ProducerServiceCollectionExtensions.cs
Comment thread src/Dafda/Configuration/ProducerServiceCollectionExtensions.cs
Comment thread src/Dafda/Configuration/ProducerServiceCollectionExtensions.cs
Comment thread src/Dafda.Tests/Configuration/TestServiceCollectionExtensions.cs Outdated
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 12 out of 12 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/Dafda/Configuration/ProducerServiceCollectionExtensions.cs Outdated
Copy link
Copy Markdown
Collaborator

@toizo toizo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me now

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 12 out of 12 changed files in this pull request and generated 4 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 112 to 119
public Producer CreateProducerInstance(IServiceProvider provider)
{
var outgoingMessageRegistry = new OutgoingMessageRegistry();
var configurationBuilder = new ProducerConfigurationBuilder();
var consumerOptions = new ProducerOptions(configurationBuilder, outgoingMessageRegistry);
options?.Invoke(consumerOptions);
// Use LazyInitializer to ensure thread-safe, once-only construction while
// capturing `provider` in the factory delegate.
LazyInitializer.EnsureInitialized(
ref _kafkaProducer,
() => new Lazy<KafkaProducer>(() => _configuration.KafkaProducerFactory(provider), isThreadSafe: true));

Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ProducerFactory<TService> is registered as a singleton, but the lazy initialization captures the provider passed to CreateProducerInstance. If the first resolution happens from a scoped provider (e.g., inside an ASP.NET request), that scoped IServiceProvider (and potentially scoped services) will be retained by the singleton for the lifetime of the app, causing a scope leak and potentially resolving scoped dependencies into a singleton KafkaProducer. Consider capturing the root provider at singleton construction time (e.g., pass it into ProducerFactory<TService> via the AddSingleton(provider => ...) delegate) and use that stored root provider when creating the KafkaProducer instead of the per-resolution provider.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very complex and edge case, but agree 😆

if (services.Any(d => d.ServiceType == typeof(TService)))
{
throw new ProducerFactoryException(
$"A producer has already been registered for service type \"{typeof(TService).FullName}\". Each producer must use a unique service type.");
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The duplicate-registration guard checks whether any service descriptor already exists for TService, not whether an existing producer registration exists. In that case the exception message ("A producer has already been registered...") can be misleading because the existing registration might be unrelated. Consider rewording to indicate the service type is already registered (or tightening the check to detect Dafda producer registrations specifically).

Suggested change
$"A producer has already been registered for service type \"{typeof(TService).FullName}\". Each producer must use a unique service type.");
$"Service type \"{typeof(TService).FullName}\" is already registered in the dependency injection container. Each Dafda producer must use a unique service type.");

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes more sense, change it.

Comment on lines +25 to +26
ThrowIfProducerServiceAlreadyRegisteredFor<TService>(services);

Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ThrowIfProducerServiceAlreadyRegisteredFor introduces new behavior (throwing ProducerFactoryException on duplicates), but the updated test suite no longer has coverage asserting that registering the same producer service type twice fails. Consider adding a test that calls AddProducerFor<T> twice for the same TService and asserts the expected ProducerFactoryException is thrown.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some tests

Comment on lines +7 to +11
/// Exception thrown when multiple identical producers are added via any overload of
/// <see cref="ProducerServiceCollectionExtensions.AddProducerFor{TClient}(Microsoft.Extensions.DependencyInjection.IServiceCollection,System.Action{Dafda.Configuration.ProducerOptions})"/>,
/// <see cref="ProducerServiceCollectionExtensions.AddProducerFor{TService,TImplementation}(Microsoft.Extensions.DependencyInjection.IServiceCollection,System.Action{Dafda.Configuration.ProducerOptions})"/>,
/// <see cref="ProducerServiceCollectionExtensions.AddProducerFor{TClient}(Microsoft.Extensions.DependencyInjection.IServiceCollection,System.Func{System.IServiceProvider,Dafda.Configuration.ProducerOptions})"/>, or
/// <see cref="ProducerServiceCollectionExtensions.AddProducerFor{TService,TImplementation}(Microsoft.Extensions.DependencyInjection.IServiceCollection,System.Func{System.IServiceProvider,Dafda.Configuration.ProducerOptions})"/>.
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception is now thrown when AddProducerFor detects that TService is already present in the IServiceCollection, even if that existing registration is not a Dafda producer registration. The summary currently says "multiple identical producers" were added, which may be inaccurate/misleading in that scenario. Consider updating the XML doc to describe the actual condition (duplicate service type registration for a producer).

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It helps better debugging it for the user, I believe. Updated it 👍🏻

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 12 out of 12 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

internal class ProducerConfiguration
{
public ProducerConfiguration(IDictionary<string, string> configuration, MessageIdGenerator messageIdGenerator, Func<ILoggerFactory, KafkaProducer> kafkaProducerFactory)
public ProducerConfiguration(IDictionary<string, string> configuration, MessageIdGenerator messageIdGenerator, Func<IServiceProvider, KafkaProducer> kafkaProducerFactory)
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ProducerConfiguration no longer references ILoggerFactory, but the file still imports Microsoft.Extensions.Logging. With TreatWarningsAsErrors=true, this will cause a build failure (CS8019). Remove the unused using directive.

Copilot uses AI. Check for mistakes.
Comment thread src/Dafda/Configuration/OutboxProducerOptions.cs
Comment thread src/Dafda/Configuration/ProducerServiceCollectionExtensions.cs
@toizo toizo merged commit 7ad2069 into dfds:master Mar 24, 2026
4 checks passed
@nabisobhi nabisobhi deleted the nabi/new-add-producers branch March 24, 2026 14:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants