Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ namespace Microsoft.Azure.WebJobs.Extensions.Kafka
/// </summary>
[AttributeUsage(AttributeTargets.Parameter | AttributeTargets.ReturnValue)]
[Binding]
public sealed class KafkaAttribute : Attribute
public sealed class KafkaAttribute : Attribute, IAttributeWithConfigurationString
{
private readonly string configurationString;

/// <summary>
/// Initialize a new instance of the <see cref="KafkaAttribute"/>
/// </summary>
Expand All @@ -26,6 +28,11 @@ public KafkaAttribute(string brokerList, string topic)
Topic = topic;
}

public KafkaAttribute(string brokerList, string topic, string configurationString) : this(brokerList, topic)
{
this.configurationString = configurationString;
}

/// <summary>
/// Initializes a new instance of the <see cref="T:Microsoft.Azure.WebJobs.Extensions.Kafka.KafkaAttribute"/> class.
/// </summary>
Expand Down Expand Up @@ -139,5 +146,7 @@ public KafkaAttribute()
/// ssl.key.password in librdkafka
/// </summary>
public string SslKeyPassword { get; set; }

string IAttributeWithConfigurationString.ConfigurationString => configurationString;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public Task<IBinding> TryCreateAsync(BindingProviderContext context)
{
return Task.FromResult<IBinding>(null);
}


new AttributeEnricher().Enrich(attribute, config, nameResolver);

var argumentBinding = InnerProvider.TryCreate(parameter);
var keyAndValueTypes = SerializationHelper.GetKeyAndValueTypes(attribute.AvroSchema, parameter.ParameterType, typeof(Null));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using Microsoft.Extensions.Configuration;
using System;
using System.Data.Common;
using System.Globalization;
using System.Linq;

namespace Microsoft.Azure.WebJobs.Extensions.Kafka
{
internal class AttributeEnricher
{
public virtual void Enrich<T>(T attribute, IConfiguration config, INameResolver nameResolver) where T : IAttributeWithConfigurationString
{

if (!string.IsNullOrWhiteSpace(attribute.ConfigurationString))
{
var builder = new DbConnectionStringBuilder();
builder.ConnectionString = attribute.ConfigurationString;
foreach (string key in builder.Keys)
{
var prop = typeof(T).GetProperties().Where(x => x.CanWrite).Where(x => x.Name.Equals(key, StringComparison.OrdinalIgnoreCase)).FirstOrDefault();
if (prop != null)
{
var propertyType = prop.PropertyType;
if (propertyType.IsGenericType && propertyType.GetGenericTypeDefinition() == typeof(Nullable<>))
{
propertyType = propertyType.GetGenericArguments()[0];
}
var value = Convert.ToString(builder[key]);
value = config.ResolveSecureSetting(nameResolver, value);
object propValue = null;
if (propertyType.IsEnum)
{
propValue = Enum.Parse(propertyType, value, true);
}
else
{
propValue = Convert.ChangeType(value, propertyType, CultureInfo.InvariantCulture);
}
prop.SetValue(attribute, propValue);
}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

namespace Microsoft.Azure.WebJobs.Extensions.Kafka
{
internal interface IAttributeWithConfigurationString
{
internal string ConfigurationString { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,21 @@ namespace Microsoft.Azure.WebJobs.Extensions.Kafka
/// </summary>
[AttributeUsage(AttributeTargets.Parameter | AttributeTargets.ReturnValue)]
[Binding]
public class KafkaTriggerAttribute : Attribute
public class KafkaTriggerAttribute : Attribute, IAttributeWithConfigurationString
{
private string configurationString;

public KafkaTriggerAttribute(string brokerList, string topic)
{
this.BrokerList = brokerList;
this.Topic = topic;
}

public KafkaTriggerAttribute(string brokerList, string topic, string configurationString) : this(brokerList, topic)
{
this.configurationString = configurationString;
}

/// <summary>
/// Gets or sets the topic
/// </summary>
Expand All @@ -41,7 +47,7 @@ public KafkaTriggerAttribute(string brokerList, string topic)
/// Gets or sets the consumer group
/// </summary>
public string ConsumerGroup { get; set; }


/// <summary>
/// Gets or sets the Avro schema.
Expand Down Expand Up @@ -107,6 +113,7 @@ public KafkaTriggerAttribute(string brokerList, string topic)
/// </summary>
public string SslKeyPassword { get; set; }

string IAttributeWithConfigurationString.ConfigurationString => configurationString;

bool IsValidValueType(Type value)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public Task<ITriggerBinding> TryCreateAsync(TriggerBindingProviderContext contex
return Task.FromResult<ITriggerBinding>(null);
}

new AttributeEnricher().Enrich(attribute, config, nameResolver);

var consumerConfig = CreateConsumerConfiguration(attribute);

var keyAndValueTypes = SerializationHelper.GetKeyAndValueTypes(attribute.AvroSchema, parameter.ParameterType, typeof(Ignore));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using Microsoft.Extensions.Configuration;
using System;
using System.Collections.Generic;
using System.Text;
using Xunit;

namespace Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests
{
public class AttributeEnricherTest
{
private AttributeEnricher enricher;
private IConfigurationRoot config;
private DefaultNameResolver nameResolver;

public AttributeEnricherTest()
{
this.enricher = new AttributeEnricher();
this.config = new ConfigurationBuilder()
.AddInMemoryCollection(new Dictionary<string, string>
{
{"Protocol", "SaslPlaintext"}
}).Build();
this.nameResolver = new DefaultNameResolver(config);
}

[Fact]
public void When_ConfigurationString_Provided_Populates_Properties()
{
var attr = new KafkaTriggerAttribute("", "",
@"
Username=""test user"";
AuthenticationMode=Plain;
Protocol=%Protocol%
"
);

enricher.Enrich(attr, config, nameResolver);

Assert.Equal(attr.Username, "test user");
Assert.Equal(attr.AuthenticationMode, BrokerAuthenticationMode.Plain);
Assert.Equal(attr.Protocol, BrokerProtocol.SaslPlaintext);
}

[Fact]
public void When_ConfigurationString_Provided_Populates_NullableProperties()
{
var attr = new KafkaAttribute("", "",
"MessageTimeoutMs=3000"
);

enricher.Enrich(attr, config, nameResolver);

Assert.Equal(attr.MessageTimeoutMs, 3000);
}
}
}