From 9580828c6fd1139cefb56f357b83f1808a13a5c8 Mon Sep 17 00:00:00 2001 From: shrohilla Date: Sat, 10 Dec 2022 13:19:58 +0530 Subject: [PATCH 1/3] initial commit for adding support for Linger.ms --- .../Output/KafkaAttribute.cs | 6 ++++++ .../Output/KafkaProducerFactory.cs | 3 ++- .../KafkaProducerFactoryTest.cs | 4 ++-- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs index 5e13d211..b70f9099 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs @@ -139,5 +139,11 @@ public KafkaAttribute() /// ssl.key.password in librdkafka /// public string SslKeyPassword { get; set; } + + /// + /// Linger.MS property provides the time between batches of messages + /// being sent to cluster. Larger value allows more batching results in high throughput. + /// + public int LingerMs { get; set; } = 0; } } \ No newline at end of file diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs index fa574680..0f4ef14b 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs @@ -131,7 +131,8 @@ public ProducerConfig GetProducerConfig(KafkaProducerEntity entity) SslCaLocation = resolvedSslCaLocation, Debug = kafkaOptions?.LibkafkaDebug, MetadataMaxAgeMs = kafkaOptions?.MetadataMaxAgeMs, - SocketKeepaliveEnable = kafkaOptions?.SocketKeepaliveEnable + SocketKeepaliveEnable = kafkaOptions?.SocketKeepaliveEnable, + LingerMs = entity.Attribute.LingerMs }; if (entity.Attribute.AuthenticationMode != BrokerAuthenticationMode.NotSet) diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs index 69b9d6fd..2c299bde 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs @@ -209,7 +209,7 @@ public void GetProducerConfig_When_Auth_Defined_Should_Contain_Them() var factory = new KafkaProducerFactory(emptyConfiguration, new DefaultNameResolver(emptyConfiguration), NullLoggerFactory.Instance); var config = factory.GetProducerConfig(entity); - Assert.Equal(11, config.Count()); + Assert.Equal(12, config.Count()); Assert.Equal("brokers:9092", config.BootstrapServers); Assert.Equal(SecurityProtocol.SaslSsl, config.SecurityProtocol); Assert.Equal(SaslMechanism.Plain, config.SaslMechanism); @@ -237,7 +237,7 @@ public void GetProducerConfig_When_Ssl_Auth_Defined_Should_Contain_Them() var factory = new KafkaProducerFactory(emptyConfiguration, new DefaultNameResolver(emptyConfiguration), NullLoggerFactory.Instance); var config = factory.GetProducerConfig(entity); - Assert.Equal(12, config.Count()); + Assert.Equal(13, config.Count()); Assert.Equal("brokers:9092", config.BootstrapServers); Assert.Equal(SecurityProtocol.Ssl, config.SecurityProtocol); Assert.Equal("path/to/key", config.SslKeyLocation); From f6983e37f09ca2e65d622f31cdb731f890dbd9e9 Mon Sep 17 00:00:00 2001 From: shrohilla Date: Sat, 10 Dec 2022 13:31:26 +0530 Subject: [PATCH 2/3] addign the documentation --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 013b4125..0ace6728 100644 --- a/README.md +++ b/README.md @@ -323,6 +323,7 @@ The settings exposed here are targeted to more advanced users that want to custo |LibkafkaDebug|debug|Both |MetadataMaxAgeMs|metadata.max.age.ms|Both |SocketKeepaliveEnable|socket.keepalive.enable|Both +|LingerMs|linger.ms|Output **NOTE:** `MetadataMaxAgeMs` default is `180000` `SocketKeepaliveEnable` default is `true` otherwise, the default value is the same as the [Configuration properties](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). The reason of the default settings, refer to this [issue](https://github.com/Azure/azure-functions-kafka-extension/issues/187). **NOTE:** `AutoOffsetReset` default is Earliest. Allowed Values are `Earliest` and `Latest`. From 9e218ce886ab9a5473aeebdcf36e81249b21451c Mon Sep 17 00:00:00 2001 From: shrohilla Date: Sat, 10 Dec 2022 13:37:41 +0530 Subject: [PATCH 3/3] updating to 5ms as default value --- .../Output/KafkaAttribute.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs index b70f9099..dea4d300 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs @@ -144,6 +144,6 @@ public KafkaAttribute() /// Linger.MS property provides the time between batches of messages /// being sent to cluster. Larger value allows more batching results in high throughput. /// - public int LingerMs { get; set; } = 0; + public int LingerMs { get; set; } = 5; } } \ No newline at end of file