diff --git a/Gemfile b/Gemfile index db01f5a..d5c272e 100644 --- a/Gemfile +++ b/Gemfile @@ -2,5 +2,3 @@ source 'https://rubygems.org' # Specify your gem's dependencies in fluent-plugin-kafka.gemspec gemspec - -gem 'rdkafka', ENV['RDKAFKA_VERSION_MIN_RANGE'], ENV['RDKAFKA_VERSION_MAX_RANGE'] if ENV['USE_RDKAFKA'] diff --git a/README.md b/README.md index 814d40e..b472c94 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,36 @@ See [Authentication using SASL](https://github.com/zendesk/ruby-kafka#authentica Set username, password, scram_mechanism and sasl_over_ssl for SASL/Plain or Scram authentication. See [Authentication using SASL](https://github.com/zendesk/ruby-kafka#authentication-using-sasl) for more details. +##### with MSK IAM Authentication (only for `rdkafka2` output type) +Authentication and authorization with an MSK cluster are facilitated through a base64-encoded signed URL, which is generated by the [aws-msk-iam-sasl-signer-ruby](https://github.com/bruce-szalwinski-he/aws-msk-iam-sasl-signer-ruby) library. + +**Configuration Example** +To enable this feature, configure your Fluentd input as follows: + +``` + + @type rdkafka2 + # Kafka brokers to connect to (typically on port 9098 or 9198 for IAM authentication) + brokers + # Topic to write events to + topic_key test-topic-1 + default_topic test-topic-1 + + # AWS Region (required) + aws_msk_region us-east-1 + + # Use a shared producer for the connection (required) + share_producer true + + # MSK IAM authentication settings (required) + rdkafka_options { + "security.protocol": "sasl_ssl", + "sasl.mechanisms": "OAUTHBEARER" + } + +``` +With this configuration, Fluentd will handle the token refresh and manage the connection to your MSK cluster using AWS IAM authentication. + ### Input plugin (@type 'kafka') Consume events by single consumer. diff --git a/fluent-plugin-kafka.gemspec b/fluent-plugin-kafka.gemspec index 8fee639..b401256 100644 --- a/fluent-plugin-kafka.gemspec +++ b/fluent-plugin-kafka.gemspec @@ -23,6 +23,13 @@ Gem::Specification.new do |gem| # gems that aren't default gems as of Ruby 3.4 gem.add_dependency("bigdecimal", ["~> 3.1"]) + if ENV['USE_RDKAFKA'] + gem.add_dependency 'rdkafka', [ENV['RDKAFKA_VERSION_MIN_RANGE'], ENV['RDKAFKA_VERSION_MAX_RANGE']] + if Gem::Version.new(RUBY_VERSION) >= Gem::Version.new('3.0') + gem.add_dependency 'aws-msk-iam-sasl-signer', '~> 0.1.1' + end + end + gem.add_development_dependency "rake", ">= 0.9.2" gem.add_development_dependency "test-unit", ">= 3.0.8" gem.add_development_dependency "test-unit-rr", "~> 1.0" diff --git a/lib/fluent/plugin/out_rdkafka2.rb b/lib/fluent/plugin/out_rdkafka2.rb index b306cfb..b3a63be 100644 --- a/lib/fluent/plugin/out_rdkafka2.rb +++ b/lib/fluent/plugin/out_rdkafka2.rb @@ -20,6 +20,10 @@ raise "unable to patch rdkafka." end +if Gem::Version.create(RUBY_VERSION) >= Gem::Version.create('3.0') + require 'aws-msk-iam-sasl-signer' +end + module Fluent::Plugin class Fluent::Rdkafka2Output < Output Fluent::Plugin.register_output('rdkafka2', self) @@ -100,6 +104,7 @@ class Fluent::Rdkafka2Output < Output config_param :service_name, :string, :default => nil, :desc => 'Used for sasl.kerberos.service.name' config_param :unrecoverable_error_codes, :array, :default => ["topic_authorization_failed", "msg_size_too_large"], :desc => 'Handle some of the error codes should be unrecoverable if specified' + config_param :aws_msk_region, :string, :default => nil, :desc => 'AWS region for MSK' config_section :buffer do config_set_default :chunk_keys, ["topic"] @@ -209,6 +214,10 @@ def add(level, message = nil) config = build_config @rdkafka = Rdkafka::Config.new(config) + if config[:"security.protocol"] == "sasl_ssl" && config[:"sasl.mechanisms"] == "OAUTHBEARER" + Rdkafka::Config.oauthbearer_token_refresh_callback = method(:refresh_token) + end + if @default_topic.nil? if @use_default_for_unknown_topic || @use_default_for_unknown_partition_error raise Fluent::ConfigError, "default_topic must be set when use_default_for_unknown_topic or use_default_for_unknown_partition_error is true" @@ -296,9 +305,39 @@ def build_config config end + def refresh_token(_config, _client_name) + log.info("+--- Refreshing token") + client = get_producer + # This will happen once upon initialization and is expected to fail, as the producer isnt set yet + # We will set the token manually after creation and after that this refresh method will work + unless client + log.info("Could not get shared client handle, unable to set/refresh token (this is expected one time on startup)") + return + end + signer = AwsMskIamSaslSigner::MSKTokenProvider.new(region: @aws_msk_region) + token = signer.generate_auth_token + + if token + client.oauthbearer_set_token( + token: token.token, + lifetime_ms: token.expiration_time_ms, + principal_name: "kafka-cluster" + ) + else + client.oauthbearer_set_token_failure( + "Failed to generate token." + ) + end + end + def start if @share_producer @shared_producer = @rdkafka.producer + log.info("Created shared producer") + if @aws_msk_region + refresh_token(nil, nil) + log.info("Set initial token for shared producer") + end else @producers = {} @producers_mutex = Mutex.new