From 466e8044d3bc4309ddf6718eb67f2068822e4b93 Mon Sep 17 00:00:00 2001 From: Gunnar Liljas Date: Sat, 31 Oct 2020 23:33:37 +0100 Subject: [PATCH] Enable setting compression via the KafkaAttribute --- README.md | 2 ++ .../Config/MessageCompressionType.cs | 18 +++++++++++++ .../Output/KafkaAttribute.cs | 12 +++++++++ .../Output/KafkaProducerFactory.cs | 8 +++++- .../KafkaProducerFactoryTest.cs | 25 +++++++++++++++++++ 5 files changed, 64 insertions(+), 1 deletion(-) create mode 100644 src/Microsoft.Azure.WebJobs.Extensions.Kafka/Config/MessageCompressionType.cs diff --git a/README.md b/README.md index b9f71dbf..de298659 100644 --- a/README.md +++ b/README.md @@ -322,6 +322,8 @@ The settings exposed here are targeted to more advanced users that want to custo |LibkafkaDebug|debug|Both |MetadataMaxAgeMs|metadata.max.age.ms|Both |SocketKeepaliveEnable|socket.keepalive.enable|Both +|CompressionType|compression.codec|Output +|CompressionLevel|compression.level|Output **NOTE:** `MetadataMaxAgeMs` default is `180000` `SocketKeepaliveEnable` default is `true` otherwise, the default value is the same as the [Configuration properties](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). The reason of the default settings, refer to this [issue](https://github.com/Azure/azure-functions-kafka-extension/issues/187). diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Config/MessageCompressionType.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Config/MessageCompressionType.cs new file mode 100644 index 00000000..88668ec8 --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Config/MessageCompressionType.cs @@ -0,0 +1,18 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +namespace Microsoft.Azure.WebJobs.Extensions.Kafka +{ + /// + /// Defines the message compression type + /// + public enum MessageCompressionType + { + NotSet = -1, + None = 0, + Gzip = 1, + Snappy = 2, + Lz4 = 3, + Zstd = 4 + } +} diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs index 74cc80b0..0756c755 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs @@ -139,5 +139,17 @@ public KafkaAttribute() /// ssl.key.password in librdkafka /// public string SslKeyPassword { get; set; } + + /// + /// Compression level parameter for algorithm selected by configuration property + /// compression.level in librdkafka + /// + public int CompressionLevel { get; set; } = -1; + + /// + /// Compression codec to use for compressing message sets. + /// compression.codec in librdkafka + /// + public MessageCompressionType CompressionType { get; set; } = MessageCompressionType.NotSet; } } \ No newline at end of file diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs index 178f3ffc..f9c8f5c9 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs @@ -130,9 +130,15 @@ public ProducerConfig GetProducerConfig(KafkaProducerEntity entity) SslCaLocation = resolvedSslCaLocation, Debug = kafkaOptions?.LibkafkaDebug, MetadataMaxAgeMs = kafkaOptions?.MetadataMaxAgeMs, - SocketKeepaliveEnable = kafkaOptions?.SocketKeepaliveEnable + SocketKeepaliveEnable = kafkaOptions?.SocketKeepaliveEnable, + CompressionLevel = entity.Attribute.CompressionLevel }; + if (entity.Attribute.CompressionType != MessageCompressionType.NotSet) + { + conf.CompressionType = (CompressionType)entity.Attribute.CompressionType; + } + if (entity.Attribute.AuthenticationMode != BrokerAuthenticationMode.NotSet) { conf.SaslMechanism = (SaslMechanism)entity.Attribute.AuthenticationMode; diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs index a0221620..09a7b27a 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs @@ -304,5 +304,30 @@ public void GetProducerConfig_When_Ssl_Locations_Resolve_InAzure_Should_Contain_ Assert.Equal(sslCa.FullName, config.SslCaLocation); Assert.Equal(sslKeyLocation.FullName, config.SslKeyLocation); } + + [Theory] + [InlineData(MessageCompressionType.NotSet, null)] + [InlineData(MessageCompressionType.None, CompressionType.None)] + [InlineData(MessageCompressionType.Gzip, CompressionType.Gzip)] + [InlineData(MessageCompressionType.Snappy, CompressionType.Snappy)] + [InlineData(MessageCompressionType.Lz4, CompressionType.Lz4)] + [InlineData(MessageCompressionType.Zstd, CompressionType.Zstd)] + public void GetProducerConfig_When_CompressionType_Defined_Should_Set_CompressionType(MessageCompressionType sourceType, CompressionType? targetType) + { + var attribute = new KafkaAttribute("brokers:9092", "myTopic") + { + CompressionType = sourceType + }; + + var entity = new KafkaProducerEntity() + { + Attribute = attribute, + ValueType = typeof(ProtoUser), + }; + + var factory = new KafkaProducerFactory(emptyConfiguration, new DefaultNameResolver(emptyConfiguration), NullLoggerProvider.Instance); + var config = factory.GetProducerConfig(entity); + Assert.Equal(targetType, config.CompressionType); + } } } \ No newline at end of file