diff --git a/README.md b/README.md
index 013b4125..8a5000c9 100644
--- a/README.md
+++ b/README.md
@@ -323,6 +323,8 @@ The settings exposed here are targeted to more advanced users that want to custo
|LibkafkaDebug|debug|Both
|MetadataMaxAgeMs|metadata.max.age.ms|Both
|SocketKeepaliveEnable|socket.keepalive.enable|Both
+|CompressionType|compression.codec|Output
+|CompressionLevel|compression.level|Output
**NOTE:** `MetadataMaxAgeMs` default is `180000` `SocketKeepaliveEnable` default is `true` otherwise, the default value is the same as the [Configuration properties](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). The reason of the default settings, refer to this [issue](https://github.com/Azure/azure-functions-kafka-extension/issues/187).
**NOTE:** `AutoOffsetReset` default is Earliest. Allowed Values are `Earliest` and `Latest`.
diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Config/MessageCompressionType.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Config/MessageCompressionType.cs
new file mode 100644
index 00000000..88668ec8
--- /dev/null
+++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Config/MessageCompressionType.cs
@@ -0,0 +1,18 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the MIT License. See License.txt in the project root for license information.
+
+namespace Microsoft.Azure.WebJobs.Extensions.Kafka
+{
+ ///
+ /// Defines the message compression type
+ ///
+ public enum MessageCompressionType
+ {
+ NotSet = -1,
+ None = 0,
+ Gzip = 1,
+ Snappy = 2,
+ Lz4 = 3,
+ Zstd = 4
+ }
+}
diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs
index 5e13d211..631964de 100644
--- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs
+++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs
@@ -139,5 +139,17 @@ public KafkaAttribute()
/// ssl.key.password in librdkafka
///
public string SslKeyPassword { get; set; }
+
+ ///
+ /// Compression level parameter for algorithm selected by configuration property
+ /// compression.level in librdkafka
+ ///
+ public int CompressionLevel { get; set; } = -1;
+
+ ///
+ /// Compression codec to use for compressing message sets.
+ /// compression.codec in librdkafka
+ ///
+ public MessageCompressionType CompressionType { get; set; } = MessageCompressionType.NotSet;
}
}
\ No newline at end of file
diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs
index fa574680..596431f1 100644
--- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs
+++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs
@@ -131,9 +131,15 @@ public ProducerConfig GetProducerConfig(KafkaProducerEntity entity)
SslCaLocation = resolvedSslCaLocation,
Debug = kafkaOptions?.LibkafkaDebug,
MetadataMaxAgeMs = kafkaOptions?.MetadataMaxAgeMs,
- SocketKeepaliveEnable = kafkaOptions?.SocketKeepaliveEnable
+ SocketKeepaliveEnable = kafkaOptions?.SocketKeepaliveEnable,
+ CompressionLevel = entity.Attribute.CompressionLevel
};
+ if (entity.Attribute.CompressionType != MessageCompressionType.NotSet)
+ {
+ conf.CompressionType = (CompressionType)entity.Attribute.CompressionType;
+ }
+
if (entity.Attribute.AuthenticationMode != BrokerAuthenticationMode.NotSet)
{
conf.SaslMechanism = (SaslMechanism)entity.Attribute.AuthenticationMode;
diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs
index 69b9d6fd..2573ed29 100644
--- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs
+++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs
@@ -307,5 +307,30 @@ public void GetProducerConfig_When_Ssl_Locations_Resolve_InAzure_Should_Contain_
Assert.Equal(sslCa.FullName, config.SslCaLocation);
Assert.Equal(sslKeyLocation.FullName, config.SslKeyLocation);
}
+
+ [Theory]
+ [InlineData(MessageCompressionType.NotSet, null)]
+ [InlineData(MessageCompressionType.None, CompressionType.None)]
+ [InlineData(MessageCompressionType.Gzip, CompressionType.Gzip)]
+ [InlineData(MessageCompressionType.Snappy, CompressionType.Snappy)]
+ [InlineData(MessageCompressionType.Lz4, CompressionType.Lz4)]
+ [InlineData(MessageCompressionType.Zstd, CompressionType.Zstd)]
+ public void GetProducerConfig_When_CompressionType_Defined_Should_Set_CompressionType(MessageCompressionType sourceType, CompressionType? targetType)
+ {
+ var attribute = new KafkaAttribute("brokers:9092", "myTopic")
+ {
+ CompressionType = sourceType
+ };
+
+ var entity = new KafkaProducerEntity()
+ {
+ Attribute = attribute,
+ ValueType = typeof(ProtoUser),
+ };
+
+ var factory = new KafkaProducerFactory(emptyConfiguration, new DefaultNameResolver(emptyConfiguration), NullLoggerProvider.Instance);
+ var config = factory.GetProducerConfig(entity);
+ Assert.Equal(targetType, config.CompressionType);
+ }
}
}
\ No newline at end of file