Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Abstract confluent builders into factories #624

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ public static IClusterConfigurationBuilder WithSchemaRegistry(
{
var config = new SchemaRegistryConfig();
handler(config);
cluster.DependencyConfigurator.AddSingleton<ISchemaRegistryClient>(_ => new CachedSchemaRegistryClient(config));

cluster.DependencyConfigurator
.AddSingleton<ISchemaRegistryClientFactory, SchemaRegistryClientFactory>()
.AddSingleton(resolver => resolver.Resolve<ISchemaRegistryClientFactory>().CreateSchemaRegistryClient(config));

return cluster;
}
}
16 changes: 16 additions & 0 deletions src/KafkaFlow.SchemaRegistry/ISchemaRegistryClientFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using Confluent.SchemaRegistry;

namespace KafkaFlow;

/// <summary>
/// Factory for creating an <see cref="ISchemaRegistryClient"/>.
/// </summary>
public interface ISchemaRegistryClientFactory
{
/// <summary>
/// Creates an instance of <see cref="ISchemaRegistryClient"/>.
/// </summary>
/// <param name="config">The schema registry config</param>
/// <returns>An <see cref="ISchemaRegistryClient"/>.</returns>
ISchemaRegistryClient CreateSchemaRegistryClient(SchemaRegistryConfig config);
}
11 changes: 11 additions & 0 deletions src/KafkaFlow.SchemaRegistry/SchemaRegistryClientFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using Confluent.SchemaRegistry;

namespace KafkaFlow;

internal class SchemaRegistryClientFactory : ISchemaRegistryClientFactory
{
public ISchemaRegistryClient CreateSchemaRegistryClient(SchemaRegistryConfig config)
{
return new CachedSchemaRegistryClient(config);
}
}
11 changes: 11 additions & 0 deletions src/KafkaFlow/Clusters/AdminClientBuilderFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using Confluent.Kafka;

namespace KafkaFlow.Clusters;

internal class AdminClientBuilderFactory : IAdminClientBuilderFactory
{
public IAdminClientBuilder CreateAdminClientBuilder(AdminClientConfig config)
{
return new AdminClientBuilderWrapper(config);
}
}
26 changes: 26 additions & 0 deletions src/KafkaFlow/Clusters/AdminClientBuilderWrapper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using System;
using Confluent.Kafka;

namespace KafkaFlow.Clusters;

internal class AdminClientBuilderWrapper : IAdminClientBuilder
{
private readonly AdminClientBuilder _builder;

public AdminClientBuilderWrapper(AdminClientConfig config)
{
_builder = new AdminClientBuilder(config);
}

public IAdminClientBuilder SetOAuthBearerTokenRefreshHandler(Action<IProducer<Null, Null>, string> oAuthBearerTokenRefreshHandler)
{
this._builder.SetOAuthBearerTokenRefreshHandler(oAuthBearerTokenRefreshHandler);

return this;
}

public IAdminClient Build()
{
return this._builder.Build();
}
}
4 changes: 2 additions & 2 deletions src/KafkaFlow/Clusters/ClusterManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ internal class ClusterManager : IClusterManager, IDisposable

private readonly ConcurrentDictionary<string, TopicMetadata> _topicMetadataCache = new();

public ClusterManager(ILogHandler logHandler, ClusterConfiguration configuration)
public ClusterManager(IAdminClientBuilderFactory clientBuilderFactory, ILogHandler logHandler, ClusterConfiguration configuration)
{
_logHandler = logHandler;
_configuration = configuration;
Expand All @@ -33,7 +33,7 @@ public ClusterManager(ILogHandler logHandler, ClusterConfiguration configuration

config.ReadSecurityInformationFrom(configuration);

var adminClientBuilder = new AdminClientBuilder(config);
var adminClientBuilder = clientBuilderFactory.CreateAdminClientBuilder(config);

var security = configuration.GetSecurityInformation();

Expand Down
15 changes: 15 additions & 0 deletions src/KafkaFlow/Clusters/IAdminClientBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using System;
using Confluent.Kafka;

namespace KafkaFlow.Clusters;

/// <inheritdoc cref="AdminClientBuilder"/>
public interface IAdminClientBuilder
{
/// <inheritdoc cref="AdminClientBuilder.SetOAuthBearerTokenRefreshHandler"/>
IAdminClientBuilder SetOAuthBearerTokenRefreshHandler(
Action<IProducer<Null, Null>, string> oAuthBearerTokenRefreshHandler);

/// <inheritdoc cref="AdminClientBuilder.Build"/>
IAdminClient Build();
}
16 changes: 16 additions & 0 deletions src/KafkaFlow/Clusters/IAdminClientBuilderFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using Confluent.Kafka;

namespace KafkaFlow.Clusters;

/// <summary>
/// Factory for creating an <see cref="AdminClientBuilder"/>.
/// </summary>
public interface IAdminClientBuilderFactory
{
/// <summary>
/// Creates an instance of <see cref="AdminClientBuilder"/>.
/// </summary>
/// <param name="config">The admin client config</param>
/// <returns>An <see cref="AdminClientBuilder"/>.</returns>
IAdminClientBuilder CreateAdminClientBuilder(AdminClientConfig config);
}
5 changes: 4 additions & 1 deletion src/KafkaFlow/Configuration/KafkaConfigurationBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,15 @@ public KafkaConfiguration Build()
{
_dependencyConfigurator.AddSingleton<IClusterManager>(
resolver =>
new ClusterManager(resolver.Resolve<ILogHandler>(), cluster));
new ClusterManager(resolver.Resolve<IAdminClientBuilderFactory>(), resolver.Resolve<ILogHandler>(), cluster));
}

_dependencyConfigurator
.AddTransient(typeof(ILogHandler), _logHandlerType)
.AddSingleton<IDateTimeProvider, DateTimeProvider>()
.AddSingleton<IAdminClientBuilderFactory, AdminClientBuilderFactory>()
.AddSingleton<IProducerBuilderFactory, ProducerBuilderFactory>()
.AddSingleton<IConsumerBuilderFactory, ConsumerBuilderFactory>()
.AddSingleton<IConsumerAccessor>(new ConsumerAccessor())
.AddSingleton<IConsumerManagerFactory>(new ConsumerManagerFactory())
.AddSingleton<IClusterManagerAccessor, ClusterManagerAccessor>()
Expand Down
4 changes: 3 additions & 1 deletion src/KafkaFlow/Consumers/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ private readonly List<Action<IDependencyResolver, IConsumer<byte[], byte[]>, Lis
private readonly ConsumerFlowManager _flowManager;
private readonly Event _maxPollIntervalExceeded;

private readonly IConsumerBuilderFactory _consumerBuilderFactory;
private IConsumer<byte[], byte[]> _consumer;

public Consumer(
Expand All @@ -42,6 +43,7 @@ public Consumer(
this.Configuration = configuration;
_flowManager = new ConsumerFlowManager(this, _logHandler);
_maxPollIntervalExceeded = new(_logHandler);
_consumerBuilderFactory = dependencyResolver.Resolve<IConsumerBuilderFactory>();
_stopTheWorldStrategy = Configuration.GetKafkaConfig().PartitionAssignmentStrategy.IsStopTheWorldStrategy();

foreach (var handler in this.Configuration.StatisticsHandlers)
Expand Down Expand Up @@ -249,7 +251,7 @@ private void EnsureConsumer()

var kafkaConfig = this.Configuration.GetKafkaConfig();

var consumerBuilder = new ConsumerBuilder<byte[], byte[]>(kafkaConfig)
var consumerBuilder = this._consumerBuilderFactory.CreateConsumerBuilder(kafkaConfig)
.SetPartitionsAssignedHandler(FirePartitionsAssignedHandlers)
.SetPartitionsRevokedHandler(FirePartitionRevokedHandlers)
.SetErrorHandler((consumer, error) => _errorsHandlers.ForEach(x => x(consumer, error)))
Expand Down
11 changes: 11 additions & 0 deletions src/KafkaFlow/Consumers/ConsumerBuilderFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using Confluent.Kafka;

namespace KafkaFlow.Consumers;

internal class ConsumerBuilderFactory : IConsumerBuilderFactory
{
public IConsumerBuilder CreateConsumerBuilder(ConsumerConfig config)
{
return new ConsumerBuilderWrapper(config);
}
}
55 changes: 55 additions & 0 deletions src/KafkaFlow/Consumers/ConsumerBuilderWrapper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
using System;
using System.Collections.Generic;
using Confluent.Kafka;

namespace KafkaFlow.Consumers;

internal class ConsumerBuilderWrapper : IConsumerBuilder
{
private readonly ConsumerBuilder<byte[], byte[]> _builder;

public ConsumerBuilderWrapper(ConsumerConfig config)
{
_builder = new ConsumerBuilder<byte[], byte[]>(config);
}

public IConsumerBuilder SetPartitionsAssignedHandler(Action<IConsumer<byte[], byte[]>, List<TopicPartition>> partitionAssignmentHandler)
{
this._builder.SetPartitionsAssignedHandler(partitionAssignmentHandler);

return this;
}

public IConsumerBuilder SetPartitionsRevokedHandler(Action<IConsumer<byte[], byte[]>, List<Confluent.Kafka.TopicPartitionOffset>> partitionsRevokedHandler)
{
this._builder.SetPartitionsRevokedHandler(partitionsRevokedHandler);

return this;
}

public IConsumerBuilder SetErrorHandler(Action<IConsumer<byte[], byte[]>, Error> errorHandler)
{
this._builder.SetErrorHandler(errorHandler);

return this;
}

public IConsumerBuilder SetStatisticsHandler(Action<IConsumer<byte[], byte[]>, string> statisticsHandler)
{
this._builder.SetStatisticsHandler(statisticsHandler);

return this;
}

public IConsumerBuilder SetOAuthBearerTokenRefreshHandler(Action<IConsumer<byte[], byte[]>, string> oAuthBearerTokenRefreshHandler)
{
this._builder.SetOAuthBearerTokenRefreshHandler(oAuthBearerTokenRefreshHandler);

return this;
}

public IConsumer<byte[], byte[]> Build()
{
return this._builder.Build();
}
}
32 changes: 32 additions & 0 deletions src/KafkaFlow/Consumers/IConsumerBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using System;
using System.Collections.Generic;
using Confluent.Kafka;

namespace KafkaFlow.Consumers;

/// <inheritdoc cref="ConsumerBuilder{TKey,TValue}"/>
public interface IConsumerBuilder
{
/// <inheritdoc cref="ConsumerBuilder{TKey,TValue}.SetPartitionsAssignedHandler(Action{IConsumer{TKey,TValue},List{TopicPartition}})"/>
IConsumerBuilder SetPartitionsAssignedHandler(
Action<IConsumer<byte[], byte[]>, List<TopicPartition>> partitionAssignmentHandler);

/// <inheritdoc cref="ConsumerBuilder{TKey,TValue}.SetPartitionsRevokedHandler(Action{IConsumer{TKey,TValue},List{Confluent.Kafka.TopicPartitionOffset}})"/>
IConsumerBuilder SetPartitionsRevokedHandler(
Action<IConsumer<byte[], byte[]>, List<Confluent.Kafka.TopicPartitionOffset>> partitionsRevokedHandler);

/// <inheritdoc cref="ConsumerBuilder{TKey,TValue}.SetErrorHandler"/>
IConsumerBuilder SetErrorHandler(
Action<IConsumer<byte[], byte[]>, Error> errorHandler);

/// <inheritdoc cref="ConsumerBuilder{TKey,TValue}.SetStatisticsHandler"/>
IConsumerBuilder SetStatisticsHandler(
Action<IConsumer<byte[], byte[]>, string> statisticsHandler);

/// <inheritdoc cref="ConsumerBuilder{TKey,TValue}.SetOAuthBearerTokenRefreshHandler"/>
IConsumerBuilder SetOAuthBearerTokenRefreshHandler(
Action<IConsumer<byte[], byte[]>, string> oAuthBearerTokenRefreshHandler);

/// <inheritdoc cref="ConsumerBuilder{TKey,TValue}.Build"/>
IConsumer<byte[], byte[]> Build();
}
16 changes: 16 additions & 0 deletions src/KafkaFlow/Consumers/IConsumerBuilderFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using Confluent.Kafka;

namespace KafkaFlow.Consumers;

/// <summary>
/// Factory for creating an <see cref="ConsumerBuilder{TKey,TValue}"/>.
/// </summary>
public interface IConsumerBuilderFactory
{
/// <summary>
/// Creates an instance of <see cref="ConsumerBuilder{TKey,TValue}"/>.
/// </summary>
/// <param name="config">The consumer config</param>
/// <returns>An <see cref="ConsumerBuilder{TKey,TValue}"/>.</returns>
IConsumerBuilder CreateConsumerBuilder(ConsumerConfig config);
}
23 changes: 23 additions & 0 deletions src/KafkaFlow/Producers/IProducerBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using System;
using Confluent.Kafka;

namespace KafkaFlow.Producers;

/// <inheritdoc cref="ProducerBuilder{TKey,TValue}"/>
public interface IProducerBuilder
{
/// <inheritdoc cref="ProducerBuilder{TKey,TValue}.SetErrorHandler"/>
IProducerBuilder SetErrorHandler(
Action<IProducer<byte[], byte[]>, Error> errorHandler);

/// <inheritdoc cref="ProducerBuilder{TKey,TValue}.SetStatisticsHandler"/>
IProducerBuilder SetStatisticsHandler(
Action<IProducer<byte[], byte[]>, string> statisticsHandler);

/// <inheritdoc cref="ProducerBuilder{TKey,TValue}.SetOAuthBearerTokenRefreshHandler"/>
IProducerBuilder SetOAuthBearerTokenRefreshHandler(
Action<IProducer<byte[], byte[]>, string> oAuthBearerTokenRefreshHandler);

/// <inheritdoc cref="ProducerBuilder{TKey,TValue}.Build"/>
IProducer<byte[], byte[]> Build();
}
16 changes: 16 additions & 0 deletions src/KafkaFlow/Producers/IProducerBuilderFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using Confluent.Kafka;

namespace KafkaFlow.Producers;

/// <summary>
/// Factory for creating an <see cref="ProducerBuilder{TKey,TValue}"/>.
/// </summary>
public interface IProducerBuilderFactory
{
/// <summary>
/// Creates an instance of <see cref="ProducerBuilder{TKey,TValue}"/>.
/// </summary>
/// <param name="config">The producer config</param>
/// <returns>An <see cref="ProducerBuilder{TKey,TValue}"/>.</returns>
IProducerBuilder CreateProducerBuilder(ProducerConfig config);
}
6 changes: 4 additions & 2 deletions src/KafkaFlow/Producers/MessageProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ internal class MessageProducer : IMessageProducer, IDisposable
private readonly IProducerConfiguration _configuration;
private readonly MiddlewareExecutor _middlewareExecutor;
private readonly GlobalEvents _globalEvents;

private readonly object _producerCreationSync = new();

private readonly IProducerBuilderFactory _producerBuilderFactory;
private volatile IProducer<byte[], byte[]> _producer;

public MessageProducer(
Expand All @@ -28,6 +29,7 @@ public MessageProducer(
_configuration = configuration;
_middlewareExecutor = new MiddlewareExecutor(configuration.MiddlewaresConfigurations);
_globalEvents = dependencyResolver.Resolve<GlobalEvents>();
_producerBuilderFactory = dependencyResolver.Resolve<IProducerBuilderFactory>();
}

public string ProducerName => _configuration.Name;
Expand Down Expand Up @@ -250,7 +252,7 @@ private IProducer<byte[], byte[]> EnsureProducer()
return _producer;
}

var producerBuilder = new ProducerBuilder<byte[], byte[]>(_configuration.GetKafkaConfig())
var producerBuilder = this._producerBuilderFactory.CreateProducerBuilder(_configuration.GetKafkaConfig())
.SetErrorHandler(
(_, error) =>
{
Expand Down
11 changes: 11 additions & 0 deletions src/KafkaFlow/Producers/ProducerBuilderFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using Confluent.Kafka;

namespace KafkaFlow.Producers;

internal class ProducerBuilderFactory : IProducerBuilderFactory
{
public IProducerBuilder CreateProducerBuilder(ProducerConfig config)
{
return new ProducerBuilderWrapper(config);
}
}
Loading
Loading