diff --git a/.github/workflows/ci_build_test.yaml b/.github/workflows/ci_build_test.yaml index 6d457220..0463770c 100644 --- a/.github/workflows/ci_build_test.yaml +++ b/.github/workflows/ci_build_test.yaml @@ -74,6 +74,7 @@ jobs: CI_KAFKA_HEADER_INDEX: kafka CI_DATAGEN_IMAGE: rock1017/log-generator:latest CI_OLD_CONNECTOR_VERSION: v2.0.1 + SCHEMA_REGISTRY_URL: ${{ Secrets/SCHEMA_REGISTRY_URL }} steps: - name: Checkout diff --git a/README.md b/README.md index 8fa0323e..d88a126b 100644 --- a/README.md +++ b/README.md @@ -106,6 +106,12 @@ Use the below schema to configure Splunk Connect for Kafka "splunk.hec.raw": "", "splunk.hec.raw.line.breaker": "", "splunk.hec.json.event.enrichment": "", + "value.converter": "", + "value.converter.schema.registry.url": "", + "value.converter.schemas.enable": "", + "key.converter": "", + "key.converter.schema.registry.url": "", + "key.converter.schemas.enable": "", "splunk.hec.ack.enabled": "", "splunk.hec.ack.poll.interval": "", "splunk.hec.ack.poll.threads": "", @@ -206,6 +212,16 @@ Use the below schema to configure Splunk Connect for Kafka | `kerberos.user.principal` | The Kerberos user principal the connector may use to authenticate with Kerberos. | `""` | | `kerberos.keytab.path` | The path to the keytab file to use for authentication with Kerberos. | `""` | +### Protobuf Parameters +| Name | Description | Default Value | +|-------- |----------------------------|-----------------------| +| `value.converter` | Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the values in messages written to or read from Kafka. For using protobuf format ,set the value of this field to `io.confluent.connect.protobuf.ProtobufConverter` | `org.apache.kafka.connect.storage.StringConverter` | +| `value.converter.schema.registry.url` | Schema Registry URL. | `null` | +| `value.converter.schemas.enable` | For using protobuf format ,set the value of this field to `true` | `false` | +| `key.converter` | Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the keys in messages written to or read from Kafka. For using protobuf format ,set the value of this field to `io.confluent.connect.protobuf.ProtobufConverter` | `org.apache.kafka.connect.storage.StringConverter` | +| `key.converter.schema.registry.url` | Schema Registry URL. | `null` | +| `key.converter.schemas.enable` | For using protobuf format ,set the value of this field to `true` | `false` | + ## Load balancing See [Splunk Docs](https://docs.splunk.com/Documentation/KafkaConnect/latest/User/LoadBalancing) for considerations when using load balancing in your deployment. diff --git a/pom.xml b/pom.xml index 86d09396..cbe1e2cd 100644 --- a/pom.xml +++ b/pom.xml @@ -152,8 +152,21 @@ 3.7 compile + + + io.confluent + kafka-connect-protobuf-converter + 7.0.1 + + + + confluent + https://packages.confluent.io/maven/ + + + diff --git a/test/config.yaml b/test/config.yaml index ca56821c..2c285147 100644 --- a/test/config.yaml +++ b/test/config.yaml @@ -9,6 +9,7 @@ kafka_broker_url: 127.0.0.1:9092 kafka_connect_url: http://127.0.0.1:8083 kafka_topic: test-datagen kafka_topic_2: kafka_topic_2 +kafka_topic_3: prototopic kafka_header_topic: kafka_header_topic kafka_header_index: kafka connector_path: /usr/local/share/kafka/plugins diff --git a/test/conftest.py b/test/conftest.py index 2e41ed82..f7819ff7 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -37,12 +37,13 @@ def setup(request): def pytest_configure(): # Generate message data - topics = [config["kafka_topic"], config["kafka_topic_2"], config["kafka_header_topic"], + topics = [config["kafka_topic"], config["kafka_topic_2"], config["kafka_header_topic"],config["kafka_topic_3"], "test_splunk_hec_malformed_events"] create_kafka_topics(config, topics) producer = KafkaProducer(bootstrap_servers=config["kafka_broker_url"], value_serializer=lambda v: json.dumps(v).encode('utf-8')) + protobuf_producer= KafkaProducer(bootstrap_servers=config["kafka_broker_url"]) for _ in range(3): msg = {"timestamp": config['timestamp']} @@ -67,7 +68,9 @@ def pytest_configure(): producer.send("test_splunk_hec_malformed_events", {}) producer.send("test_splunk_hec_malformed_events", {"&&": "null", "message": ["$$$$****////", 123, None]}) + protobuf_producer.send(config["kafka_topic_3"],value=b'\x00\x00\x00\x00\x01\x00\n\x011\x12\r10-01-04-3:45\x18\x15%\x00\x00*C*\x02No:\x12\n\x011\x12\x04this\x1a\x07New oneB\x0c\n\x011\x12\x07shampooJ\x04Many') producer.flush() + protobuf_producer.flush() # Launch all connectors for tests for param in connect_params: diff --git a/test/lib/connect_params.py b/test/lib/connect_params.py index dc0043ac..8d25ff7d 100644 --- a/test/lib/connect_params.py +++ b/test/lib/connect_params.py @@ -192,5 +192,13 @@ {"name": "test_splunk_hec_malformed_events", "topics": "test_splunk_hec_malformed_events", "splunk_hec_raw": False, - "splunk_hec_json_event_enrichment": "chars=hec_malformed_events_{}".format(config['timestamp'])} + "splunk_hec_json_event_enrichment": "chars=hec_malformed_events_{}".format(config['timestamp'])}, + {"name": "test_protobuf_events", + "splunk_sourcetypes": "protobuf", + "topics": "prototopic", + "splunk_hec_raw": False, + "value_converter":"io.confluent.connect.protobuf.ProtobufConverter", + "value_converter_schema_registry_url":os.environ["SCHEMA_REGISTRY_URL"], + "value_converter_schemas_enable":"true" + } ] diff --git a/test/lib/connector.template b/test/lib/connector.template index bf393e43..380d5f3c 100644 --- a/test/lib/connector.template +++ b/test/lib/connector.template @@ -44,6 +44,9 @@ {% if splunk_hec_json_event_formatted %} "splunk.hec.json.event.formatted": "{{splunk_hec_json_event_formatted}}", {% endif %} - "splunk.sourcetypes": "{{splunk_sourcetypes}}" + "splunk.sourcetypes": "{{splunk_sourcetypes}}", + "value.converter": "{{value_converter}}", + "value.converter.schema.registry.url": "{{value_converter_schema_registry_url}}", + "value.converter.schemas.enable": "{{value_converter_schemas_enable}}" } } diff --git a/test/lib/data_gen.py b/test/lib/data_gen.py index 7cbbb63c..9747139e 100644 --- a/test/lib/data_gen.py +++ b/test/lib/data_gen.py @@ -32,7 +32,10 @@ def generate_connector_content(input_disc=None): "splunk_header_sourcetype": None, "splunk_header_host": None, "splunk_hec_json_event_formatted": None, - "splunk_sourcetypes": "kafka" + "splunk_sourcetypes": "kafka", + "value_converter": None, + "value_converter_schema_registry_url": None, + "value_converter_schemas_enable":"false" } if input_disc: diff --git a/test/testcases/test_data_onboarding.py b/test/testcases/test_data_onboarding.py index 5a930e26..240667f4 100644 --- a/test/testcases/test_data_onboarding.py +++ b/test/testcases/test_data_onboarding.py @@ -28,3 +28,19 @@ def test_data_onboarding(self, setup, test_scenario, test_input, expected): password=setup["splunk_password"]) logger.info("Splunk received %s events in the last hour", len(events)) assert len(events) == expected + + @pytest.mark.parametrize("test_scenario, test_input, expected", [ + ("protobuf", "sourcetype::protobuf", 1), + ]) + def test_proto_data_onboarding(self, setup, test_scenario, test_input, expected): + logger.info("testing {0} input={1} expected={2} event(s)".format(test_scenario, test_input, expected)) + search_query = "index={0} | search {1}".format(setup['splunk_index'], + test_input) + logger.info(search_query) + events = check_events_from_splunk(start_time="-15m@m", + url=setup["splunkd_url"], + user=setup["splunk_user"], + query=["search {}".format(search_query)], + password=setup["splunk_password"]) + logger.info("Splunk received %s events in the last hour", len(events)) + assert len(events) == expected