Skip to content

Commit 99b0606

Browse files
author
Vihas Splunk
committed
Add test cases
1 parent 8c71590 commit 99b0606

File tree

2 files changed

+35
-1
lines changed

2 files changed

+35
-1
lines changed

src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
318318
timestampFormat = getString(TIMESTAMP_FORMAT_CONF).trim();
319319
validateRegexForTimestamp(regex);
320320
queueCapacity = getInt(QUEUE_CAPACITY_CONF);
321-
321+
validateQueueCapacity(queueCapacity);
322322
}
323323

324324

@@ -558,6 +558,12 @@ private void validateRegexForTimestamp(String regex) {
558558
}
559559
}
560560

561+
private void validateQueueCapacity(int queueCapacity) {
562+
if (queueCapacity <= 0) {
563+
throw new ConfigException("queue capacity should be greater than " + queueCapacity);
564+
}
565+
}
566+
561567
private static boolean getNamedGroupCandidates(String regex) {
562568
Matcher m = Pattern.compile("\\(\\?<([a-zA-Z][a-zA-Z0-9]*)>").matcher(regex);
563569
while (m.find()) {

src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,34 @@ public void testInvalidSplunkConfigurationsWithValidationEnabled() {
261261
Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs));
262262
}
263263

264+
@Test
265+
public void testValidQueueCapacity() {
266+
final Map<String, String> configs = new HashMap<>();
267+
addNecessaryConfigs(configs);
268+
SplunkSinkConnector connector = new SplunkSinkConnector();
269+
configs.put("splunk.hec.concurrent.queue.capacity", "100");
270+
configs.put("topics", "b");
271+
configs.put("splunk.indexes", "b");
272+
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
273+
clientInstance.client.setResponse(CloseableHttpClientMock.success);
274+
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
275+
Assertions.assertDoesNotThrow(()->connector.validate(configs));
276+
}
277+
278+
@Test
279+
public void testInvalidQueueCapacity() {
280+
final Map<String, String> configs = new HashMap<>();
281+
addNecessaryConfigs(configs);
282+
SplunkSinkConnector connector = new SplunkSinkConnector();
283+
configs.put("splunk.hec.concurrent.queue.capacity", "-1");
284+
configs.put("topics", "b");
285+
configs.put("splunk.indexes", "b");
286+
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
287+
clientInstance.client.setResponse(CloseableHttpClientMock.success);
288+
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
289+
Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs));
290+
}
291+
264292
private void addNecessaryConfigs(Map<String, String> configs) {
265293
configs.put(URI_CONF, TEST_URI);
266294
configs.put(TOKEN_CONF, "blah");

0 commit comments

Comments
 (0)