Skip to content

Commit 6cc9a3c

Browse files
author
Vihas Splunk
committed
Add protobuf test cases, add schema registry to workflows.
1 parent dd8f7d3 commit 6cc9a3c

File tree

6 files changed

+50
-5
lines changed

6 files changed

+50
-5
lines changed

.github/workflows/ci_build_test.yaml

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,6 @@ jobs:
149149
CI_KAFKA_HEADER_INDEX: kafka
150150
CI_DATAGEN_IMAGE: rock1017/log-generator:latest
151151
CI_OLD_CONNECTOR_VERSION: v2.0.1
152-
SCHEMA_REGISTRY_URL: ${{ Secrets.SCHEMA_REGISTRY_URL }}
153152

154153
steps:
155154
- name: Checkout
@@ -222,6 +221,18 @@ jobs:
222221
name: splunk-kafka-connector
223222
path: /tmp
224223

224+
- name: Up the Schema Registry
225+
run: |
226+
cd /tmp && wget https://packages.confluent.io/archive/7.1/confluent-community-7.1.1.tar.gz
227+
sudo tar xzf confluent-community-7.1.1.tar.gz
228+
cd confluent-7.1.1
229+
bin/schema-registry-start ./etc/schema-registry/schema-registry.properties &
230+
231+
- name: Register the protobuf schema
232+
run: |
233+
sleep 10
234+
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data "{\"schemaType\": \"PROTOBUF\",\"schema\": \"syntax = \\\"proto3\\\";\\npackage com.mycorp.mynamespace;\\n\\nmessage MyRecord {\\n string id = 1;\\n float amount = 2;\\n string customer_id = 3;\\n}\\n\"}" http://localhost:8081/subjects/prototopic-value/versions
235+
225236
- name: Test kafka connect upgrade
226237
run: |
227238
echo "Download kafka connect "$CI_OLD_CONNECTOR_VERSION

src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
318318
timestampFormat = getString(TIMESTAMP_FORMAT_CONF).trim();
319319
validateRegexForTimestamp(regex);
320320
queueCapacity = getInt(QUEUE_CAPACITY_CONF);
321-
321+
validateQueueCapacity(queueCapacity);
322322
}
323323

324324

@@ -558,6 +558,12 @@ private void validateRegexForTimestamp(String regex) {
558558
}
559559
}
560560

561+
private void validateQueueCapacity(int queueCapacity) {
562+
if (queueCapacity <= 0) {
563+
throw new ConfigException("queue capacity should be greater than " + queueCapacity);
564+
}
565+
}
566+
561567
private static boolean getNamedGroupCandidates(String regex) {
562568
Matcher m = Pattern.compile("\\(\\?<([a-zA-Z][a-zA-Z0-9]*)>").matcher(regex);
563569
while (m.find()) {

src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,34 @@ public void testInvalidSplunkConfigurationsWithValidationEnabled() {
261261
Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs));
262262
}
263263

264+
@Test
265+
public void testValidQueueCapacity() {
266+
final Map<String, String> configs = new HashMap<>();
267+
addNecessaryConfigs(configs);
268+
SplunkSinkConnector connector = new SplunkSinkConnector();
269+
configs.put("splunk.hec.concurrent.queue.capacity", "100");
270+
configs.put("topics", "b");
271+
configs.put("splunk.indexes", "b");
272+
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
273+
clientInstance.client.setResponse(CloseableHttpClientMock.success);
274+
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
275+
Assertions.assertDoesNotThrow(()->connector.validate(configs));
276+
}
277+
278+
@Test
279+
public void testInvalidQueueCapacity() {
280+
final Map<String, String> configs = new HashMap<>();
281+
addNecessaryConfigs(configs);
282+
SplunkSinkConnector connector = new SplunkSinkConnector();
283+
configs.put("splunk.hec.concurrent.queue.capacity", "-1");
284+
configs.put("topics", "b");
285+
configs.put("splunk.indexes", "b");
286+
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
287+
clientInstance.client.setResponse(CloseableHttpClientMock.success);
288+
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
289+
Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs));
290+
}
291+
264292
private void addNecessaryConfigs(Map<String, String> configs) {
265293
configs.put(URI_CONF, TEST_URI);
266294
configs.put(TOKEN_CONF, "blah");

test/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ def pytest_configure():
7070
producer.send("test_splunk_hec_malformed_events", {})
7171
producer.send("test_splunk_hec_malformed_events", {"&&": "null", "message": ["$$$$****////", 123, None]})
7272
producer.send("record_key",{"timestamp": config['timestamp']},b"{}")
73-
protobuf_producer.send("prototopic",value=b'\x00\x00\x00\x00\x01\x00\n\x011\x12\r10-01-04-3:45\x18\x15%\x00\x00*C*\x02No:\x12\n\x011\x12\x04this\x1a\x07New oneB\x0c\n\x011\x12\x07shampooJ\x04Many')
73+
protobuf_producer.send("prototopic",value=b'\x00\x00\x00\x00\x01\x00\n\x011\x1533\xf3?\x1a\x05Hello')
7474
timestamp_producer.send("date_format",b"{\"id\": \"19\",\"host\":\"host-01\",\"source\":\"bu\",\"fields\":{\"hn\":\"hostname\",\"CLASS\":\"class\",\"cust_id\":\"000013934\",\"time\": \"Jun 13 2010 23:11:52.454 UTC\",\"category\":\"IFdata\",\"ifname\":\"LoopBack7\",\"IFdata.Bits received\":\"0\",\"IFdata.Bits sent\":\"0\"}")
7575
timestamp_producer.send("epoch_format",b"{\"id\": \"19\",\"host\":\"host-01\",\"source\":\"bu\",\"fields\":{\"hn\":\"hostname\",\"CLASS\":\"class\",\"cust_id\":\"000013934\",\"time\": \"1555209605000\",\"category\":\"IFdata\",\"ifname\":\"LoopBack7\",\"IFdata.Bits received\":\"0\",\"IFdata.Bits sent\":\"0\"}")
7676
producer.flush()

test/lib/connect_params.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@
188188
"splunk_hec_raw": False,
189189
"topics": "prototopic",
190190
"value_converter":"io.confluent.connect.protobuf.ProtobufConverter",
191-
"value_converter_schema_registry_url": os.environ["SCHEMA_REGISTRY_URL"],
191+
"value_converter_schema_registry_url": "http://localhost:8081",
192192
"value_converter_schemas_enable":"true"
193193
},
194194
{"name": "test_extracted_timestamp_dateformat",

test/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
pytest
2-
requests
2+
requests == 2.28.2
33
kafka-python
44
pyyaml == 5.4.1
55
jinja2

0 commit comments

Comments
 (0)