diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs
index 74cc80b0..40ed2d83 100644
--- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs
+++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs
@@ -13,8 +13,10 @@ namespace Microsoft.Azure.WebJobs.Extensions.Kafka
///
[AttributeUsage(AttributeTargets.Parameter | AttributeTargets.ReturnValue)]
[Binding]
- public sealed class KafkaAttribute : Attribute
+ public sealed class KafkaAttribute : Attribute, IAttributeWithConfigurationString
{
+ private readonly string configurationString;
+
///
/// Initialize a new instance of the
///
@@ -26,6 +28,11 @@ public KafkaAttribute(string brokerList, string topic)
Topic = topic;
}
+ public KafkaAttribute(string brokerList, string topic, string configurationString) : this(brokerList, topic)
+ {
+ this.configurationString = configurationString;
+ }
+
///
/// Initializes a new instance of the class.
///
@@ -139,5 +146,7 @@ public KafkaAttribute()
/// ssl.key.password in librdkafka
///
public string SslKeyPassword { get; set; }
+
+ string IAttributeWithConfigurationString.ConfigurationString => configurationString;
}
}
\ No newline at end of file
diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttributeBindingProvider.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttributeBindingProvider.cs
index 6b26139f..d08958b4 100644
--- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttributeBindingProvider.cs
+++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttributeBindingProvider.cs
@@ -44,7 +44,8 @@ public Task TryCreateAsync(BindingProviderContext context)
{
return Task.FromResult(null);
}
-
+
+ new AttributeEnricher().Enrich(attribute, config, nameResolver);
var argumentBinding = InnerProvider.TryCreate(parameter);
var keyAndValueTypes = SerializationHelper.GetKeyAndValueTypes(attribute.AvroSchema, parameter.ParameterType, typeof(Null));
diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/AttributeEnricher.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/AttributeEnricher.cs
new file mode 100644
index 00000000..fa4fa027
--- /dev/null
+++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/AttributeEnricher.cs
@@ -0,0 +1,48 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the MIT License. See License.txt in the project root for license information.
+
+using Microsoft.Extensions.Configuration;
+using System;
+using System.Data.Common;
+using System.Globalization;
+using System.Linq;
+
+namespace Microsoft.Azure.WebJobs.Extensions.Kafka
+{
+ internal class AttributeEnricher
+ {
+ public virtual void Enrich(T attribute, IConfiguration config, INameResolver nameResolver) where T : IAttributeWithConfigurationString
+ {
+
+ if (!string.IsNullOrWhiteSpace(attribute.ConfigurationString))
+ {
+ var builder = new DbConnectionStringBuilder();
+ builder.ConnectionString = attribute.ConfigurationString;
+ foreach (string key in builder.Keys)
+ {
+ var prop = typeof(T).GetProperties().Where(x => x.CanWrite).Where(x => x.Name.Equals(key, StringComparison.OrdinalIgnoreCase)).FirstOrDefault();
+ if (prop != null)
+ {
+ var propertyType = prop.PropertyType;
+ if (propertyType.IsGenericType && propertyType.GetGenericTypeDefinition() == typeof(Nullable<>))
+ {
+ propertyType = propertyType.GetGenericArguments()[0];
+ }
+ var value = Convert.ToString(builder[key]);
+ value = config.ResolveSecureSetting(nameResolver, value);
+ object propValue = null;
+ if (propertyType.IsEnum)
+ {
+ propValue = Enum.Parse(propertyType, value, true);
+ }
+ else
+ {
+ propValue = Convert.ChangeType(value, propertyType, CultureInfo.InvariantCulture);
+ }
+ prop.SetValue(attribute, propValue);
+ }
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/IAttributeWithConfigurationString.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/IAttributeWithConfigurationString.cs
new file mode 100644
index 00000000..e13914f5
--- /dev/null
+++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/IAttributeWithConfigurationString.cs
@@ -0,0 +1,10 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the MIT License. See License.txt in the project root for license information.
+
+namespace Microsoft.Azure.WebJobs.Extensions.Kafka
+{
+ internal interface IAttributeWithConfigurationString
+ {
+ internal string ConfigurationString { get; }
+ }
+}
\ No newline at end of file
diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttribute.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttribute.cs
index e8cb85cf..c72f11b8 100644
--- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttribute.cs
+++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttribute.cs
@@ -13,8 +13,9 @@ namespace Microsoft.Azure.WebJobs.Extensions.Kafka
///
[AttributeUsage(AttributeTargets.Parameter | AttributeTargets.ReturnValue)]
[Binding]
- public class KafkaTriggerAttribute : Attribute
+ public class KafkaTriggerAttribute : Attribute, IAttributeWithConfigurationString
{
+ private string configurationString;
public KafkaTriggerAttribute(string brokerList, string topic)
{
@@ -22,6 +23,11 @@ public KafkaTriggerAttribute(string brokerList, string topic)
this.Topic = topic;
}
+ public KafkaTriggerAttribute(string brokerList, string topic, string configurationString) : this(brokerList, topic)
+ {
+ this.configurationString = configurationString;
+ }
+
///
/// Gets or sets the topic
///
@@ -41,7 +47,7 @@ public KafkaTriggerAttribute(string brokerList, string topic)
/// Gets or sets the consumer group
///
public string ConsumerGroup { get; set; }
-
+
///
/// Gets or sets the Avro schema.
@@ -107,6 +113,7 @@ public KafkaTriggerAttribute(string brokerList, string topic)
///
public string SslKeyPassword { get; set; }
+ string IAttributeWithConfigurationString.ConfigurationString => configurationString;
bool IsValidValueType(Type value)
{
diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttributeBindingProvider.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttributeBindingProvider.cs
index 22218234..32161528 100644
--- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttributeBindingProvider.cs
+++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttributeBindingProvider.cs
@@ -47,6 +47,8 @@ public Task TryCreateAsync(TriggerBindingProviderContext contex
return Task.FromResult(null);
}
+ new AttributeEnricher().Enrich(attribute, config, nameResolver);
+
var consumerConfig = CreateConsumerConfiguration(attribute);
var keyAndValueTypes = SerializationHelper.GetKeyAndValueTypes(attribute.AvroSchema, parameter.ParameterType, typeof(Ignore));
diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/AttributeEnricherTest.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/AttributeEnricherTest.cs
new file mode 100644
index 00000000..c4bba2a2
--- /dev/null
+++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/AttributeEnricherTest.cs
@@ -0,0 +1,59 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the MIT License. See License.txt in the project root for license information.
+
+using Microsoft.Extensions.Configuration;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using Xunit;
+
+namespace Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests
+{
+ public class AttributeEnricherTest
+ {
+ private AttributeEnricher enricher;
+ private IConfigurationRoot config;
+ private DefaultNameResolver nameResolver;
+
+ public AttributeEnricherTest()
+ {
+ this.enricher = new AttributeEnricher();
+ this.config = new ConfigurationBuilder()
+ .AddInMemoryCollection(new Dictionary
+ {
+ {"Protocol", "SaslPlaintext"}
+ }).Build();
+ this.nameResolver = new DefaultNameResolver(config);
+ }
+
+ [Fact]
+ public void When_ConfigurationString_Provided_Populates_Properties()
+ {
+ var attr = new KafkaTriggerAttribute("", "",
+ @"
+ Username=""test user"";
+ AuthenticationMode=Plain;
+ Protocol=%Protocol%
+ "
+ );
+
+ enricher.Enrich(attr, config, nameResolver);
+
+ Assert.Equal(attr.Username, "test user");
+ Assert.Equal(attr.AuthenticationMode, BrokerAuthenticationMode.Plain);
+ Assert.Equal(attr.Protocol, BrokerProtocol.SaslPlaintext);
+ }
+
+ [Fact]
+ public void When_ConfigurationString_Provided_Populates_NullableProperties()
+ {
+ var attr = new KafkaAttribute("", "",
+ "MessageTimeoutMs=3000"
+ );
+
+ enricher.Enrich(attr, config, nameResolver);
+
+ Assert.Equal(attr.MessageTimeoutMs, 3000);
+ }
+ }
+}