diff --git a/README.md b/README.md
index 814d40e..89fcc14 100644
--- a/README.md
+++ b/README.md
@@ -177,6 +177,9 @@ With the introduction of the rdkafka-ruby based input plugin we hope to support
See also [rdkafka-ruby](https://github.com/appsignal/rdkafka-ruby) and [librdkafka](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) for more detailed documentation about Kafka consumer options.
+`topics` supports POSIX Extended Regular Expression pattern (not Ruby regex syntax) since v0.19.6. If you want to use regex pattern, use `/pattern/` like `/foo.*/`.
+**Note**: A caret (`^`) is automatically added to the beginning of the pattern.
+
Consuming topic name is used for event tag. So when the target topic name is `app_event`, the tag is `app_event`. If you want to modify tag, use `add_prefix` or `add_suffix` parameter. With `add_prefix kafka`, the tag is `kafka.app_event`.
### Output plugin
diff --git a/lib/fluent/plugin/in_rdkafka_group.rb b/lib/fluent/plugin/in_rdkafka_group.rb
index 5141a91..4e49e65 100644
--- a/lib/fluent/plugin/in_rdkafka_group.rb
+++ b/lib/fluent/plugin/in_rdkafka_group.rb
@@ -70,19 +70,29 @@ def initialize
end
def _config_to_array(config)
- config_array = config.split(',').map {|k| k.strip }
+ config_array = config.split(',').map {|k| _config_regex_pattern(k.strip) }
if config_array.empty?
raise Fluent::ConfigError, "kafka_group: '#{config}' is a required parameter"
end
config_array
end
+ private :_config_to_array
+
+ def _config_regex_pattern(topic)
+ if (m = /^\/(.+)\/$/.match(topic))
+ # librdkafka recognizes string as regex pattern if the topic name starts with '^'.
+ # https://github.com/confluentinc/librdkafka/blob/570c785e9e35812db8f50254bd2f7e0cf47def39/src/rdkafka.h#L4148
+ # https://github.com/confluentinc/librdkafka/blob/e1db7eaa517f0a6438bc846a9c49ede73b9ea211/src/rdkafka_topic.c#L2064
+ return "^#{m[1]}"
+ end
+ topic
+ end
+ private :_config_regex_pattern
def multi_workers_ready?
true
end
- private :_config_to_array
-
def configure(conf)
compat_parameters_convert(conf, :parser)
diff --git a/test/plugin/test_in_rdkafka_group.rb b/test/plugin/test_in_rdkafka_group.rb
new file mode 100644
index 0000000..712616b
--- /dev/null
+++ b/test/plugin/test_in_rdkafka_group.rb
@@ -0,0 +1,123 @@
+require 'helper'
+require 'fluent/test/driver/input'
+require 'securerandom'
+
+class RdkafkaGroupInputTest < Test::Unit::TestCase
+
+ def have_rdkafka
+ begin
+ require 'fluent/plugin/in_rdkafka_group'
+ true
+ rescue LoadError
+ false
+ end
+ end
+
+ def setup
+ omit_unless(have_rdkafka, "rdkafka isn't installed")
+ Fluent::Test.setup
+ end
+
+ TOPIC_NAME = "kafka-input-#{SecureRandom.uuid}"
+
+ CONFIG = %[
+ topics #{TOPIC_NAME}
+ kafka_configs {"bootstrap.servers": "localhost:9092", "group.id": "test_group"}
+
+ @type none
+
+ ]
+
+ def create_driver(conf = CONFIG)
+ Fluent::Test::Driver::Input.new(Fluent::Plugin::RdKafkaGroupInput).configure(conf)
+ end
+
+
+ def test_configure
+ d = create_driver
+ assert_equal [TOPIC_NAME], d.instance.topics
+ assert_equal 'localhost:9092', d.instance.kafka_configs['bootstrap.servers']
+ end
+
+ def test_multi_worker_support
+ d = create_driver
+ assert_true d.instance.multi_workers_ready?
+ end
+
+ class ConsumeTest < self
+ TOPIC_NAME = "kafka-input-#{SecureRandom.uuid}"
+
+ def setup
+ @kafka = Kafka.new(["localhost:9092"], client_id: 'kafka')
+ @producer = @kafka.producer
+ @kafka.create_topic(TOPIC_NAME)
+ end
+
+ def teardown
+ @kafka.delete_topic(TOPIC_NAME)
+ @kafka.close
+ end
+
+ def test_consume
+ conf = %[
+ topics #{TOPIC_NAME}
+ kafka_configs {"bootstrap.servers": "localhost:9092", "group.id": "test_group"}
+
+ @type none
+
+ ]
+
+ d = create_driver(conf)
+
+ d.run(expect_records: 1, timeout: 10) do
+ sleep 0.1
+ @producer.produce("Hello, fluent-plugin-kafka!", topic: TOPIC_NAME)
+ @producer.deliver_messages
+ end
+
+ expected = {'message' => 'Hello, fluent-plugin-kafka!'}
+ assert_equal expected, d.events[0][2]
+ end
+ end
+
+ class ConsumeTopicWithRegexpTest < self
+ TOPIC_NAME1 = "kafka-input-1-#{SecureRandom.uuid}"
+ TOPIC_NAME2 = "kafka-input-2-#{SecureRandom.uuid}"
+ TOPIC_NAME_REGEXP = "/kafka-input-(1|2)-.*/"
+
+ def setup
+ @kafka = Kafka.new(["localhost:9092"], client_id: 'kafka')
+ @producer = @kafka.producer
+ @kafka.create_topic(TOPIC_NAME1)
+ @kafka.create_topic(TOPIC_NAME2)
+ end
+
+ def teardown
+ @kafka.delete_topic(TOPIC_NAME1)
+ @kafka.delete_topic(TOPIC_NAME2)
+ @kafka.close
+ end
+
+ def test_consume_with_regexp
+ conf = %[
+ topics #{TOPIC_NAME_REGEXP}
+ kafka_configs {"bootstrap.servers": "localhost:9092", "group.id": "test_group"}
+
+ @type none
+
+ ]
+ d = create_driver(conf)
+
+ d.run(expect_records: 2, timeout: 10) do
+ sleep 0.1
+ @producer.produce("Hello, fluent-plugin-kafka! in topic 1", topic: TOPIC_NAME1)
+ @producer.produce("Hello, fluent-plugin-kafka! in topic 2", topic: TOPIC_NAME2)
+ @producer.deliver_messages
+ end
+ expected_message_pattern = /Hello, fluent-plugin-kafka! in topic [12]/
+ assert_equal 2, d.events.size
+ assert_match(expected_message_pattern, d.events[0][2]['message'])
+ assert_match(expected_message_pattern, d.events[1][2]['message'])
+ end
+ end
+end