Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci_build_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ jobs:
CI_KAFKA_HEADER_INDEX: kafka
CI_DATAGEN_IMAGE: rock1017/log-generator:latest
CI_OLD_CONNECTOR_VERSION: v2.0.1
SCHEMA_REGISTRY_URL: ${{ Secrets.SCHEMA_REGISTRY_URL }}

steps:
- name: Checkout
Expand Down
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ Use the below schema to configure Splunk Connect for Kafka
"splunk.hec.raw": "<true|false>",
"splunk.hec.raw.line.breaker": "<line breaker separator>",
"splunk.hec.json.event.enrichment": "<key value pairs separated by comma, only applicable to /event HEC>",
"value.converter": "<converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka>",
"value.converter.schema.registry.url": "<Schema-Registry-URL>",
"value.converter.schemas.enable": "<true|false>",
"key.converter": "<converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka>",
"key.converter.schema.registry.url": "<Schema-Registry-URL>",
"key.converter.schemas.enable": "<true|false>",
"splunk.hec.ack.enabled": "<true|false>",
"splunk.hec.ack.poll.interval": "<event ack poll interval>",
"splunk.hec.ack.poll.threads": "<number of threads used to poll event acks>",
Expand Down Expand Up @@ -206,6 +212,16 @@ Use the below schema to configure Splunk Connect for Kafka
| `kerberos.user.principal` | The Kerberos user principal the connector may use to authenticate with Kerberos. | `""` |
| `kerberos.keytab.path` | The path to the keytab file to use for authentication with Kerberos. | `""` |

### Protobuf Parameters
| Name | Description | Default Value |
|-------- |----------------------------|-----------------------|
| `value.converter` | Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the values in messages written to or read from Kafka. For using protobuf format ,set the value of this field to `io.confluent.connect.protobuf.ProtobufConverter` | `org.apache.kafka.connect.storage.StringConverter` |
| `value.converter.schema.registry.url` | Schema Registry URL. | `""` |
| `value.converter.schemas.enable` | For using protobuf format ,set the value of this field to `true` | `false` |
| `key.converter` | Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the keys in messages written to or read from Kafka. For using protobuf format ,set the value of this field to `io.confluent.connect.protobuf.ProtobufConverter` | `org.apache.kafka.connect.storage.StringConverter` |
| `key.converter.schema.registry.url` | Schema Registry URL. | `""` |
| `key.converter.schemas.enable` | For using protobuf format ,set the value of this field to `true` | `false` |

## Load balancing

See [Splunk Docs](https://docs.splunk.com/Documentation/KafkaConnect/latest/User/LoadBalancing) for considerations when using load balancing in your deployment.
Expand Down
13 changes: 13 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,21 @@
<version>3.7</version>
<scope>compile</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/io.confluent/kafka-connect-protobuf-converter -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-protobuf-converter</artifactId>
<version>7.0.1</version>
</dependency>
</dependencies>

<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>

<reporting>
<plugins>
<plugin>
Expand Down
5 changes: 4 additions & 1 deletion test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ def setup(request):

def pytest_configure():
# Generate message data
topics = [config["kafka_topic"], config["kafka_topic_2"], config["kafka_header_topic"],
topics = [config["kafka_topic"], config["kafka_topic_2"], config["kafka_header_topic"],"prototopic",
"test_splunk_hec_malformed_events"]

create_kafka_topics(config, topics)
producer = KafkaProducer(bootstrap_servers=config["kafka_broker_url"],
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
protobuf_producer = KafkaProducer(bootstrap_servers=config["kafka_broker_url"])

for _ in range(3):
msg = {"timestamp": config['timestamp']}
Expand All @@ -67,7 +68,9 @@ def pytest_configure():

producer.send("test_splunk_hec_malformed_events", {})
producer.send("test_splunk_hec_malformed_events", {"&&": "null", "message": ["$$$$****////", 123, None]})
protobuf_producer.send("prototopic",value=b'\x00\x00\x00\x00\x01\x00\n\x011\x12\r10-01-04-3:45\x18\x15%\x00\x00*C*\x02No:\x12\n\x011\x12\x04this\x1a\x07New oneB\x0c\n\x011\x12\x07shampooJ\x04Many')
producer.flush()
protobuf_producer.flush()

# Launch all connectors for tests
for param in connect_params:
Expand Down
10 changes: 9 additions & 1 deletion test/lib/connect_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,5 +192,13 @@
{"name": "test_splunk_hec_malformed_events",
"topics": "test_splunk_hec_malformed_events",
"splunk_hec_raw": False,
"splunk_hec_json_event_enrichment": "chars=hec_malformed_events_{}".format(config['timestamp'])}
"splunk_hec_json_event_enrichment": "chars=hec_malformed_events_{}".format(config['timestamp'])},
{"name": "test_protobuf_events",
"splunk_sourcetypes": "protobuf",
"splunk_hec_raw": False,
"topics": "prototopic",
"value_converter":"io.confluent.connect.protobuf.ProtobufConverter",
"value_converter_schema_registry_url": os.environ["SCHEMA_REGISTRY_URL"],
"value_converter_schemas_enable":"true"
}
]
5 changes: 4 additions & 1 deletion test/lib/connector.template
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
{% if splunk_hec_json_event_formatted %}
"splunk.hec.json.event.formatted": "{{splunk_hec_json_event_formatted}}",
{% endif %}
"splunk.sourcetypes": "{{splunk_sourcetypes}}"
"splunk.sourcetypes": "{{splunk_sourcetypes}}",
"value.converter": "{{value_converter}}",
"value.converter.schema.registry.url": "{{value_converter_schema_registry_url}}",
"value.converter.schemas.enable": "{{value_converter_schemas_enable}}"
}
}
5 changes: 4 additions & 1 deletion test/lib/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ def generate_connector_content(input_disc=None):
"splunk_header_sourcetype": None,
"splunk_header_host": None,
"splunk_hec_json_event_formatted": None,
"splunk_sourcetypes": "kafka"
"splunk_sourcetypes": "kafka",
"value_converter": "org.apache.kafka.connect.storage.StringConverter",
"value_converter_schema_registry_url": "",
"value_converter_schemas_enable":"false"
}

if input_disc:
Expand Down
16 changes: 16 additions & 0 deletions test/testcases/test_data_onboarding.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,19 @@ def test_data_onboarding(self, setup, test_scenario, test_input, expected):
password=setup["splunk_password"])
logger.info("Splunk received %s events in the last hour", len(events))
assert len(events) == expected

@pytest.mark.parametrize("test_scenario, test_input, expected", [
("protobuf", "sourcetype::protobuf", 1),
])
def test_proto_data_onboarding(self, setup, test_scenario, test_input, expected):
logger.info("testing {0} input={1} expected={2} event(s)".format(test_scenario, test_input, expected))
search_query = "index={0} | search {1}".format(setup['splunk_index'],
test_input)
logger.info(search_query)
events = check_events_from_splunk(start_time="-15m@m",
url=setup["splunkd_url"],
user=setup["splunk_user"],
query=["search {}".format(search_query)],
password=setup["splunk_password"])
logger.info("Splunk received %s events in the last hour", len(events))
assert len(events) == expected