Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,45 @@ public int MaxBatchSize
// </summary>
public bool? SocketKeepaliveEnable { get; set; } = true;

private string compressionType = "None";
/// <summary>
/// CompressionType for sending message
/// Default is None
/// Producer Only
/// </summary>
public string CompressionType {
get => this.compressionType ;
set
{
if (Enum.TryParse<CompressionType>(value, out CompressionType type))
{
this.compressionType = value;
} else
{
throw new InvalidOperationException("CompressionType should be None, Gzip, Snappy, Lz4, Zstd");
}
}
}

/// <summary>
/// Delay in milliseconds to wait for messages in the producer queue to accumulate before construction message batches to transmit to broker.
/// default: 5
/// linger.ms in librdkafka
/// Producer Only
/// </summary>
public double? LingerMs { get; set; } = 5;

/// <summary>
/// Compression level parameter for algorithm selected by configuration property
/// compression.codec`. Higher values will result in better compression at the cost
/// of more CPU usage. Usable range is algorithm-dependent: [0-9] for gzip; [0-12]
/// for lz4; only 0 for snappy; -1 = codec-dependent default compression level.
/// default: -1
/// Producer Only
/// </summary>
public int? CompressionLevel { get; set; } = -1;


int subscriberIntervalInSeconds = 1;
/// <summary>
/// Defines the minimum frequency in which messages will be executed by function. Only if the message volume is less than <see cref="MaxBatchSize"/> / <see cref="SubscriberIntervalInSeconds"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ public ProducerConfig GetProducerConfig(KafkaProducerEntity entity)
resolvedSslKeyLocation = entity.Attribute.SslKeyLocation;
}
var kafkaOptions = this.config.Get<KafkaOptions>();
CompressionType compressionType;
if (!Enum.TryParse<CompressionType>(kafkaOptions?.CompressionType, out compressionType)) {
compressionType = CompressionType.None;
}

var conf = new ProducerConfig()
{
BootstrapServers = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.BrokerList),
Expand All @@ -130,7 +135,10 @@ public ProducerConfig GetProducerConfig(KafkaProducerEntity entity)
SslCaLocation = resolvedSslCaLocation,
Debug = kafkaOptions?.LibkafkaDebug,
MetadataMaxAgeMs = kafkaOptions?.MetadataMaxAgeMs,
SocketKeepaliveEnable = kafkaOptions?.SocketKeepaliveEnable
SocketKeepaliveEnable = kafkaOptions?.SocketKeepaliveEnable,
CompressionType = compressionType,
CompressionLevel = kafkaOptions?.CompressionLevel,
LingerMs = kafkaOptions?.LingerMs
};

if (entity.Attribute.AuthenticationMode != BrokerAuthenticationMode.NotSet)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Text;
using Xunit;

namespace Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests
{
public class KafkaOptionsTest
{
[Fact]
public void When_Pass_The_Value_Available_For_CompressionType()
{
var options = new KafkaOptions();
Assert.Equal("None", options.CompressionType);
options.CompressionType = "Gzip";
Assert.True(true);
}

[Fact]
public void When_Pass_The_Value_Not_Available_For_CompressionType()
{
var options = new KafkaOptions();
Assert.Throws<InvalidOperationException>(() => {
options.CompressionType = "Not Available";
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public void GetProducerConfig_When_No_Auth_Defined_Should_Contain_Only_BrokerLis

var factory = new KafkaProducerFactory(emptyConfiguration, new DefaultNameResolver(emptyConfiguration), NullLoggerProvider.Instance);
var config = factory.GetProducerConfig(entity);
Assert.Single(config);
Assert.Equal(2, config.Count());
Assert.Equal("brokers:9092", config.BootstrapServers);
}

Expand All @@ -206,7 +206,7 @@ public void GetProducerConfig_When_Auth_Defined_Should_Contain_Them()

var factory = new KafkaProducerFactory(emptyConfiguration, new DefaultNameResolver(emptyConfiguration), NullLoggerProvider.Instance);
var config = factory.GetProducerConfig(entity);
Assert.Equal(5, config.Count());
Assert.Equal(6, config.Count());
Assert.Equal("brokers:9092", config.BootstrapServers);
Assert.Equal(SecurityProtocol.SaslSsl, config.SecurityProtocol);
Assert.Equal(SaslMechanism.Plain, config.SaslMechanism);
Expand Down Expand Up @@ -234,7 +234,7 @@ public void GetProducerConfig_When_Ssl_Auth_Defined_Should_Contain_Them()

var factory = new KafkaProducerFactory(emptyConfiguration, new DefaultNameResolver(emptyConfiguration), NullLoggerProvider.Instance);
var config = factory.GetProducerConfig(entity);
Assert.Equal(6, config.Count());
Assert.Equal(7, config.Count());
Assert.Equal("brokers:9092", config.BootstrapServers);
Assert.Equal(SecurityProtocol.Ssl, config.SecurityProtocol);
Assert.Equal("path/to/key", config.SslKeyLocation);
Expand Down