diff --git a/.github/workflows/ci_build_test.yaml b/.github/workflows/ci_build_test.yaml index 6d457220..f78e4ae4 100644 --- a/.github/workflows/ci_build_test.yaml +++ b/.github/workflows/ci_build_test.yaml @@ -74,6 +74,7 @@ jobs: CI_KAFKA_HEADER_INDEX: kafka CI_DATAGEN_IMAGE: rock1017/log-generator:latest CI_OLD_CONNECTOR_VERSION: v2.0.1 + SCHEMA_REGISTRY_URL: ${{ Secrets.SCHEMA_REGISTRY_URL }} steps: - name: Checkout diff --git a/README.md b/README.md index 8fa0323e..03a69c91 100644 --- a/README.md +++ b/README.md @@ -106,6 +106,12 @@ Use the below schema to configure Splunk Connect for Kafka "splunk.hec.raw": "", "splunk.hec.raw.line.breaker": "", "splunk.hec.json.event.enrichment": "", + "value.converter": "", + "value.converter.schema.registry.url": "", + "value.converter.schemas.enable": "", + "key.converter": "", + "key.converter.schema.registry.url": "", + "key.converter.schemas.enable": "", "splunk.hec.ack.enabled": "", "splunk.hec.ack.poll.interval": "", "splunk.hec.ack.poll.threads": "", @@ -206,6 +212,16 @@ Use the below schema to configure Splunk Connect for Kafka | `kerberos.user.principal` | The Kerberos user principal the connector may use to authenticate with Kerberos. | `""` | | `kerberos.keytab.path` | The path to the keytab file to use for authentication with Kerberos. | `""` | +### Protobuf Parameters +| Name | Description | Default Value | +|-------- |----------------------------|-----------------------| +| `value.converter` | Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the values in messages written to or read from Kafka. For using protobuf format ,set the value of this field to `io.confluent.connect.protobuf.ProtobufConverter` | `org.apache.kafka.connect.storage.StringConverter` | +| `value.converter.schema.registry.url` | Schema Registry URL. | `""` | +| `value.converter.schemas.enable` | For using protobuf format ,set the value of this field to `true` | `false` | +| `key.converter` | Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the keys in messages written to or read from Kafka. For using protobuf format ,set the value of this field to `io.confluent.connect.protobuf.ProtobufConverter` | `org.apache.kafka.connect.storage.StringConverter` | +| `key.converter.schema.registry.url` | Schema Registry URL. | `""` | +| `key.converter.schemas.enable` | For using protobuf format ,set the value of this field to `true` | `false` | + ## Load balancing See [Splunk Docs](https://docs.splunk.com/Documentation/KafkaConnect/latest/User/LoadBalancing) for considerations when using load balancing in your deployment. diff --git a/pom.xml b/pom.xml index 7b8e59b5..afa8da98 100644 --- a/pom.xml +++ b/pom.xml @@ -152,8 +152,21 @@ 3.7 compile + + + io.confluent + kafka-connect-protobuf-converter + 7.0.1 + + + + confluent + https://packages.confluent.io/maven/ + + + diff --git a/test/conftest.py b/test/conftest.py index 2e41ed82..03ff71dc 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -37,12 +37,13 @@ def setup(request): def pytest_configure(): # Generate message data - topics = [config["kafka_topic"], config["kafka_topic_2"], config["kafka_header_topic"], + topics = [config["kafka_topic"], config["kafka_topic_2"], config["kafka_header_topic"],"prototopic", "test_splunk_hec_malformed_events"] create_kafka_topics(config, topics) producer = KafkaProducer(bootstrap_servers=config["kafka_broker_url"], value_serializer=lambda v: json.dumps(v).encode('utf-8')) + protobuf_producer = KafkaProducer(bootstrap_servers=config["kafka_broker_url"]) for _ in range(3): msg = {"timestamp": config['timestamp']} @@ -67,7 +68,9 @@ def pytest_configure(): producer.send("test_splunk_hec_malformed_events", {}) producer.send("test_splunk_hec_malformed_events", {"&&": "null", "message": ["$$$$****////", 123, None]}) + protobuf_producer.send("prototopic",value=b'\x00\x00\x00\x00\x01\x00\n\x011\x12\r10-01-04-3:45\x18\x15%\x00\x00*C*\x02No:\x12\n\x011\x12\x04this\x1a\x07New oneB\x0c\n\x011\x12\x07shampooJ\x04Many') producer.flush() + protobuf_producer.flush() # Launch all connectors for tests for param in connect_params: diff --git a/test/lib/connect_params.py b/test/lib/connect_params.py index dc0043ac..e960ad7a 100644 --- a/test/lib/connect_params.py +++ b/test/lib/connect_params.py @@ -192,5 +192,13 @@ {"name": "test_splunk_hec_malformed_events", "topics": "test_splunk_hec_malformed_events", "splunk_hec_raw": False, - "splunk_hec_json_event_enrichment": "chars=hec_malformed_events_{}".format(config['timestamp'])} + "splunk_hec_json_event_enrichment": "chars=hec_malformed_events_{}".format(config['timestamp'])}, + {"name": "test_protobuf_events", + "splunk_sourcetypes": "protobuf", + "splunk_hec_raw": False, + "topics": "prototopic", + "value_converter":"io.confluent.connect.protobuf.ProtobufConverter", + "value_converter_schema_registry_url": os.environ["SCHEMA_REGISTRY_URL"], + "value_converter_schemas_enable":"true" + } ] diff --git a/test/lib/connector.template b/test/lib/connector.template index bf393e43..380d5f3c 100644 --- a/test/lib/connector.template +++ b/test/lib/connector.template @@ -44,6 +44,9 @@ {% if splunk_hec_json_event_formatted %} "splunk.hec.json.event.formatted": "{{splunk_hec_json_event_formatted}}", {% endif %} - "splunk.sourcetypes": "{{splunk_sourcetypes}}" + "splunk.sourcetypes": "{{splunk_sourcetypes}}", + "value.converter": "{{value_converter}}", + "value.converter.schema.registry.url": "{{value_converter_schema_registry_url}}", + "value.converter.schemas.enable": "{{value_converter_schemas_enable}}" } } diff --git a/test/lib/data_gen.py b/test/lib/data_gen.py index 7cbbb63c..77b227bd 100644 --- a/test/lib/data_gen.py +++ b/test/lib/data_gen.py @@ -32,7 +32,10 @@ def generate_connector_content(input_disc=None): "splunk_header_sourcetype": None, "splunk_header_host": None, "splunk_hec_json_event_formatted": None, - "splunk_sourcetypes": "kafka" + "splunk_sourcetypes": "kafka", + "value_converter": "org.apache.kafka.connect.storage.StringConverter", + "value_converter_schema_registry_url": "", + "value_converter_schemas_enable":"false" } if input_disc: diff --git a/test/testcases/test_data_onboarding.py b/test/testcases/test_data_onboarding.py index 5a930e26..240667f4 100644 --- a/test/testcases/test_data_onboarding.py +++ b/test/testcases/test_data_onboarding.py @@ -28,3 +28,19 @@ def test_data_onboarding(self, setup, test_scenario, test_input, expected): password=setup["splunk_password"]) logger.info("Splunk received %s events in the last hour", len(events)) assert len(events) == expected + + @pytest.mark.parametrize("test_scenario, test_input, expected", [ + ("protobuf", "sourcetype::protobuf", 1), + ]) + def test_proto_data_onboarding(self, setup, test_scenario, test_input, expected): + logger.info("testing {0} input={1} expected={2} event(s)".format(test_scenario, test_input, expected)) + search_query = "index={0} | search {1}".format(setup['splunk_index'], + test_input) + logger.info(search_query) + events = check_events_from_splunk(start_time="-15m@m", + url=setup["splunkd_url"], + user=setup["splunk_user"], + query=["search {}".format(search_query)], + password=setup["splunk_password"]) + logger.info("Splunk received %s events in the last hour", len(events)) + assert len(events) == expected