diff --git a/README.md b/README.md index 013b4125..0ace6728 100644 --- a/README.md +++ b/README.md @@ -323,6 +323,7 @@ The settings exposed here are targeted to more advanced users that want to custo |LibkafkaDebug|debug|Both |MetadataMaxAgeMs|metadata.max.age.ms|Both |SocketKeepaliveEnable|socket.keepalive.enable|Both +|LingerMs|linger.ms|Output **NOTE:** `MetadataMaxAgeMs` default is `180000` `SocketKeepaliveEnable` default is `true` otherwise, the default value is the same as the [Configuration properties](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). The reason of the default settings, refer to this [issue](https://github.com/Azure/azure-functions-kafka-extension/issues/187). **NOTE:** `AutoOffsetReset` default is Earliest. Allowed Values are `Earliest` and `Latest`. diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs index 5e13d211..dea4d300 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs @@ -139,5 +139,11 @@ public KafkaAttribute() /// ssl.key.password in librdkafka /// public string SslKeyPassword { get; set; } + + /// + /// Linger.MS property provides the time between batches of messages + /// being sent to cluster. Larger value allows more batching results in high throughput. + /// + public int LingerMs { get; set; } = 5; } } \ No newline at end of file diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs index fa574680..0f4ef14b 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs @@ -131,7 +131,8 @@ public ProducerConfig GetProducerConfig(KafkaProducerEntity entity) SslCaLocation = resolvedSslCaLocation, Debug = kafkaOptions?.LibkafkaDebug, MetadataMaxAgeMs = kafkaOptions?.MetadataMaxAgeMs, - SocketKeepaliveEnable = kafkaOptions?.SocketKeepaliveEnable + SocketKeepaliveEnable = kafkaOptions?.SocketKeepaliveEnable, + LingerMs = entity.Attribute.LingerMs }; if (entity.Attribute.AuthenticationMode != BrokerAuthenticationMode.NotSet) diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs index 69b9d6fd..2c299bde 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs @@ -209,7 +209,7 @@ public void GetProducerConfig_When_Auth_Defined_Should_Contain_Them() var factory = new KafkaProducerFactory(emptyConfiguration, new DefaultNameResolver(emptyConfiguration), NullLoggerFactory.Instance); var config = factory.GetProducerConfig(entity); - Assert.Equal(11, config.Count()); + Assert.Equal(12, config.Count()); Assert.Equal("brokers:9092", config.BootstrapServers); Assert.Equal(SecurityProtocol.SaslSsl, config.SecurityProtocol); Assert.Equal(SaslMechanism.Plain, config.SaslMechanism); @@ -237,7 +237,7 @@ public void GetProducerConfig_When_Ssl_Auth_Defined_Should_Contain_Them() var factory = new KafkaProducerFactory(emptyConfiguration, new DefaultNameResolver(emptyConfiguration), NullLoggerFactory.Instance); var config = factory.GetProducerConfig(entity); - Assert.Equal(12, config.Count()); + Assert.Equal(13, config.Count()); Assert.Equal("brokers:9092", config.BootstrapServers); Assert.Equal(SecurityProtocol.Ssl, config.SecurityProtocol); Assert.Equal("path/to/key", config.SslKeyLocation);