diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Config/KafkaOptions.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Config/KafkaOptions.cs index e120582c..323824d0 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Config/KafkaOptions.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Config/KafkaOptions.cs @@ -158,6 +158,45 @@ public int MaxBatchSize // public bool? SocketKeepaliveEnable { get; set; } = true; + private string compressionType = "None"; + /// + /// CompressionType for sending message + /// Default is None + /// Producer Only + /// + public string CompressionType { + get => this.compressionType ; + set + { + if (Enum.TryParse(value, out CompressionType type)) + { + this.compressionType = value; + } else + { + throw new InvalidOperationException("CompressionType should be None, Gzip, Snappy, Lz4, Zstd"); + } + } + } + + /// + /// Delay in milliseconds to wait for messages in the producer queue to accumulate before construction message batches to transmit to broker. + /// default: 5 + /// linger.ms in librdkafka + /// Producer Only + /// + public double? LingerMs { get; set; } = 5; + + /// + /// Compression level parameter for algorithm selected by configuration property + /// compression.codec`. Higher values will result in better compression at the cost + /// of more CPU usage. Usable range is algorithm-dependent: [0-9] for gzip; [0-12] + /// for lz4; only 0 for snappy; -1 = codec-dependent default compression level. + /// default: -1 + /// Producer Only + /// + public int? CompressionLevel { get; set; } = -1; + + int subscriberIntervalInSeconds = 1; /// /// Defines the minimum frequency in which messages will be executed by function. Only if the message volume is less than / diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs index 178f3ffc..b330b6e7 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs @@ -114,6 +114,11 @@ public ProducerConfig GetProducerConfig(KafkaProducerEntity entity) resolvedSslKeyLocation = entity.Attribute.SslKeyLocation; } var kafkaOptions = this.config.Get(); + CompressionType compressionType; + if (!Enum.TryParse(kafkaOptions?.CompressionType, out compressionType)) { + compressionType = CompressionType.None; + } + var conf = new ProducerConfig() { BootstrapServers = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.BrokerList), @@ -130,7 +135,10 @@ public ProducerConfig GetProducerConfig(KafkaProducerEntity entity) SslCaLocation = resolvedSslCaLocation, Debug = kafkaOptions?.LibkafkaDebug, MetadataMaxAgeMs = kafkaOptions?.MetadataMaxAgeMs, - SocketKeepaliveEnable = kafkaOptions?.SocketKeepaliveEnable + SocketKeepaliveEnable = kafkaOptions?.SocketKeepaliveEnable, + CompressionType = compressionType, + CompressionLevel = kafkaOptions?.CompressionLevel, + LingerMs = kafkaOptions?.LingerMs }; if (entity.Attribute.AuthenticationMode != BrokerAuthenticationMode.NotSet) diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaOptionsTest.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaOptionsTest.cs new file mode 100644 index 00000000..da12a77d --- /dev/null +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaOptionsTest.cs @@ -0,0 +1,31 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Text; +using Xunit; + +namespace Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests +{ + public class KafkaOptionsTest + { + [Fact] + public void When_Pass_The_Value_Available_For_CompressionType() + { + var options = new KafkaOptions(); + Assert.Equal("None", options.CompressionType); + options.CompressionType = "Gzip"; + Assert.True(true); + } + + [Fact] + public void When_Pass_The_Value_Not_Available_For_CompressionType() + { + var options = new KafkaOptions(); + Assert.Throws(() => { + options.CompressionType = "Not Available"; + }); + } + } +} diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs index a0221620..c5edf668 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs @@ -183,7 +183,7 @@ public void GetProducerConfig_When_No_Auth_Defined_Should_Contain_Only_BrokerLis var factory = new KafkaProducerFactory(emptyConfiguration, new DefaultNameResolver(emptyConfiguration), NullLoggerProvider.Instance); var config = factory.GetProducerConfig(entity); - Assert.Single(config); + Assert.Equal(2, config.Count()); Assert.Equal("brokers:9092", config.BootstrapServers); } @@ -206,7 +206,7 @@ public void GetProducerConfig_When_Auth_Defined_Should_Contain_Them() var factory = new KafkaProducerFactory(emptyConfiguration, new DefaultNameResolver(emptyConfiguration), NullLoggerProvider.Instance); var config = factory.GetProducerConfig(entity); - Assert.Equal(5, config.Count()); + Assert.Equal(6, config.Count()); Assert.Equal("brokers:9092", config.BootstrapServers); Assert.Equal(SecurityProtocol.SaslSsl, config.SecurityProtocol); Assert.Equal(SaslMechanism.Plain, config.SaslMechanism); @@ -234,7 +234,7 @@ public void GetProducerConfig_When_Ssl_Auth_Defined_Should_Contain_Them() var factory = new KafkaProducerFactory(emptyConfiguration, new DefaultNameResolver(emptyConfiguration), NullLoggerProvider.Instance); var config = factory.GetProducerConfig(entity); - Assert.Equal(6, config.Count()); + Assert.Equal(7, config.Count()); Assert.Equal("brokers:9092", config.BootstrapServers); Assert.Equal(SecurityProtocol.Ssl, config.SecurityProtocol); Assert.Equal("path/to/key", config.SslKeyLocation);