From 51243169b75ca316f91f5482de47efa329c18efb Mon Sep 17 00:00:00 2001 From: Tsuyoshi Ushio Date: Wed, 10 Feb 2021 22:23:54 -0800 Subject: [PATCH 1/3] First implementation of compression --- .../Output/KafkaAttribute.cs | 24 +++++++++++++++++++ .../Output/KafkaProducerFactory.cs | 5 +++- 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs index 74cc80b0..9e19d0ae 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs @@ -5,6 +5,7 @@ using Avro.Specific; using Confluent.Kafka; using Microsoft.Azure.WebJobs.Description; +using Microsoft.Azure.WebJobs.Extensions.Kafka.Config; namespace Microsoft.Azure.WebJobs.Extensions.Kafka { @@ -139,5 +140,28 @@ public KafkaAttribute() /// ssl.key.password in librdkafka /// public string SslKeyPassword { get; set; } + + /// + /// CompressionType for sending message + /// Default is None + /// + /// + public CompressionType CompressionType { get; set; } = CompressionType.None; + + /// + /// Delay in milliseconds to wait for messages in the producer queue to accumulate before construction message batches to transmit to broker. + /// default: 5 + /// linger.ms in librdkafka + /// + public double? LingerMs { get; set; } = 5; + + /// + /// Compression level parameter for algorithm selected by configuration property + /// compression.codec`. Higher values will result in better compression at the cost + /// of more CPU usage. Usable range is algorithm-dependent: [0-9] for gzip; [0-12] + /// for lz4; only 0 for snappy; -1 = codec-dependent default compression level. + /// default: -1 + /// + public int? CompressionLevel { get; set; } = -1; } } \ No newline at end of file diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs index 178f3ffc..ca557371 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs @@ -130,7 +130,10 @@ public ProducerConfig GetProducerConfig(KafkaProducerEntity entity) SslCaLocation = resolvedSslCaLocation, Debug = kafkaOptions?.LibkafkaDebug, MetadataMaxAgeMs = kafkaOptions?.MetadataMaxAgeMs, - SocketKeepaliveEnable = kafkaOptions?.SocketKeepaliveEnable + SocketKeepaliveEnable = kafkaOptions?.SocketKeepaliveEnable, + CompressionType = entity.Attribute.CompressionType, + CompressionLevel = entity.Attribute.CompressionLevel, + LingerMs = entity.Attribute.LingerMs }; if (entity.Attribute.AuthenticationMode != BrokerAuthenticationMode.NotSet) From a87799a71595e0130dcf43ab8253c409c1eb688e Mon Sep 17 00:00:00 2001 From: Tsuyoshi Ushio Date: Wed, 10 Feb 2021 23:47:09 -0800 Subject: [PATCH 2/3] Change config from Attribute to host.json --- .../Config/KafkaOptions.cs | 39 +++++++++++++++++++ .../Output/KafkaAttribute.cs | 24 ------------ .../Output/KafkaProducerFactory.cs | 11 ++++-- .../KafkaOptionsTest.cs | 31 +++++++++++++++ 4 files changed, 78 insertions(+), 27 deletions(-) create mode 100644 test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaOptionsTest.cs diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Config/KafkaOptions.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Config/KafkaOptions.cs index e120582c..323824d0 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Config/KafkaOptions.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Config/KafkaOptions.cs @@ -158,6 +158,45 @@ public int MaxBatchSize // public bool? SocketKeepaliveEnable { get; set; } = true; + private string compressionType = "None"; + /// + /// CompressionType for sending message + /// Default is None + /// Producer Only + /// + public string CompressionType { + get => this.compressionType ; + set + { + if (Enum.TryParse(value, out CompressionType type)) + { + this.compressionType = value; + } else + { + throw new InvalidOperationException("CompressionType should be None, Gzip, Snappy, Lz4, Zstd"); + } + } + } + + /// + /// Delay in milliseconds to wait for messages in the producer queue to accumulate before construction message batches to transmit to broker. + /// default: 5 + /// linger.ms in librdkafka + /// Producer Only + /// + public double? LingerMs { get; set; } = 5; + + /// + /// Compression level parameter for algorithm selected by configuration property + /// compression.codec`. Higher values will result in better compression at the cost + /// of more CPU usage. Usable range is algorithm-dependent: [0-9] for gzip; [0-12] + /// for lz4; only 0 for snappy; -1 = codec-dependent default compression level. + /// default: -1 + /// Producer Only + /// + public int? CompressionLevel { get; set; } = -1; + + int subscriberIntervalInSeconds = 1; /// /// Defines the minimum frequency in which messages will be executed by function. Only if the message volume is less than / diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs index 9e19d0ae..74cc80b0 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs @@ -5,7 +5,6 @@ using Avro.Specific; using Confluent.Kafka; using Microsoft.Azure.WebJobs.Description; -using Microsoft.Azure.WebJobs.Extensions.Kafka.Config; namespace Microsoft.Azure.WebJobs.Extensions.Kafka { @@ -140,28 +139,5 @@ public KafkaAttribute() /// ssl.key.password in librdkafka /// public string SslKeyPassword { get; set; } - - /// - /// CompressionType for sending message - /// Default is None - /// - /// - public CompressionType CompressionType { get; set; } = CompressionType.None; - - /// - /// Delay in milliseconds to wait for messages in the producer queue to accumulate before construction message batches to transmit to broker. - /// default: 5 - /// linger.ms in librdkafka - /// - public double? LingerMs { get; set; } = 5; - - /// - /// Compression level parameter for algorithm selected by configuration property - /// compression.codec`. Higher values will result in better compression at the cost - /// of more CPU usage. Usable range is algorithm-dependent: [0-9] for gzip; [0-12] - /// for lz4; only 0 for snappy; -1 = codec-dependent default compression level. - /// default: -1 - /// - public int? CompressionLevel { get; set; } = -1; } } \ No newline at end of file diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs index ca557371..b330b6e7 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs @@ -114,6 +114,11 @@ public ProducerConfig GetProducerConfig(KafkaProducerEntity entity) resolvedSslKeyLocation = entity.Attribute.SslKeyLocation; } var kafkaOptions = this.config.Get(); + CompressionType compressionType; + if (!Enum.TryParse(kafkaOptions?.CompressionType, out compressionType)) { + compressionType = CompressionType.None; + } + var conf = new ProducerConfig() { BootstrapServers = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.BrokerList), @@ -131,9 +136,9 @@ public ProducerConfig GetProducerConfig(KafkaProducerEntity entity) Debug = kafkaOptions?.LibkafkaDebug, MetadataMaxAgeMs = kafkaOptions?.MetadataMaxAgeMs, SocketKeepaliveEnable = kafkaOptions?.SocketKeepaliveEnable, - CompressionType = entity.Attribute.CompressionType, - CompressionLevel = entity.Attribute.CompressionLevel, - LingerMs = entity.Attribute.LingerMs + CompressionType = compressionType, + CompressionLevel = kafkaOptions?.CompressionLevel, + LingerMs = kafkaOptions?.LingerMs }; if (entity.Attribute.AuthenticationMode != BrokerAuthenticationMode.NotSet) diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaOptionsTest.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaOptionsTest.cs new file mode 100644 index 00000000..da12a77d --- /dev/null +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaOptionsTest.cs @@ -0,0 +1,31 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Text; +using Xunit; + +namespace Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests +{ + public class KafkaOptionsTest + { + [Fact] + public void When_Pass_The_Value_Available_For_CompressionType() + { + var options = new KafkaOptions(); + Assert.Equal("None", options.CompressionType); + options.CompressionType = "Gzip"; + Assert.True(true); + } + + [Fact] + public void When_Pass_The_Value_Not_Available_For_CompressionType() + { + var options = new KafkaOptions(); + Assert.Throws(() => { + options.CompressionType = "Not Available"; + }); + } + } +} From 3b920c4611b46ba068060c7ac86367557396d499 Mon Sep 17 00:00:00 2001 From: Tsuyoshi Ushio Date: Thu, 11 Feb 2021 00:25:26 -0800 Subject: [PATCH 3/3] fix ci issue --- .../KafkaProducerFactoryTest.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs index a0221620..c5edf668 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs @@ -183,7 +183,7 @@ public void GetProducerConfig_When_No_Auth_Defined_Should_Contain_Only_BrokerLis var factory = new KafkaProducerFactory(emptyConfiguration, new DefaultNameResolver(emptyConfiguration), NullLoggerProvider.Instance); var config = factory.GetProducerConfig(entity); - Assert.Single(config); + Assert.Equal(2, config.Count()); Assert.Equal("brokers:9092", config.BootstrapServers); } @@ -206,7 +206,7 @@ public void GetProducerConfig_When_Auth_Defined_Should_Contain_Them() var factory = new KafkaProducerFactory(emptyConfiguration, new DefaultNameResolver(emptyConfiguration), NullLoggerProvider.Instance); var config = factory.GetProducerConfig(entity); - Assert.Equal(5, config.Count()); + Assert.Equal(6, config.Count()); Assert.Equal("brokers:9092", config.BootstrapServers); Assert.Equal(SecurityProtocol.SaslSsl, config.SecurityProtocol); Assert.Equal(SaslMechanism.Plain, config.SaslMechanism); @@ -234,7 +234,7 @@ public void GetProducerConfig_When_Ssl_Auth_Defined_Should_Contain_Them() var factory = new KafkaProducerFactory(emptyConfiguration, new DefaultNameResolver(emptyConfiguration), NullLoggerProvider.Instance); var config = factory.GetProducerConfig(entity); - Assert.Equal(6, config.Count()); + Assert.Equal(7, config.Count()); Assert.Equal("brokers:9092", config.BootstrapServers); Assert.Equal(SecurityProtocol.Ssl, config.SecurityProtocol); Assert.Equal("path/to/key", config.SslKeyLocation);