diff --git a/pom.xml b/pom.xml index d600dae3..759a45c9 100644 --- a/pom.xml +++ b/pom.xml @@ -21,6 +21,7 @@ + com.fasterxml.jackson.core diff --git a/src/main/java/com/splunk/kafka/connect/AbstractClientWrapper.java b/src/main/java/com/splunk/kafka/connect/AbstractClientWrapper.java new file mode 100644 index 00000000..d73630de --- /dev/null +++ b/src/main/java/com/splunk/kafka/connect/AbstractClientWrapper.java @@ -0,0 +1,9 @@ +package com.splunk.kafka.connect; + +import org.apache.http.impl.client.CloseableHttpClient; + +import com.splunk.hecclient.HecConfig; + +public abstract class AbstractClientWrapper { + abstract CloseableHttpClient getClient(HecConfig config); +} diff --git a/src/main/java/com/splunk/kafka/connect/HecClientWrapper.java b/src/main/java/com/splunk/kafka/connect/HecClientWrapper.java new file mode 100644 index 00000000..d4358d18 --- /dev/null +++ b/src/main/java/com/splunk/kafka/connect/HecClientWrapper.java @@ -0,0 +1,17 @@ +package com.splunk.kafka.connect; + +import org.apache.http.impl.client.CloseableHttpClient; + +import com.splunk.hecclient.Hec; +import com.splunk.hecclient.HecConfig; + +public class HecClientWrapper extends AbstractClientWrapper { + + @Override + CloseableHttpClient getClient(HecConfig config) { + return Hec.createHttpClient(config); + + } + + +} diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java index 89750e02..ee0b28f8 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java @@ -20,13 +20,26 @@ import java.util.function.Function; import java.util.stream.Collectors; + import org.apache.commons.lang3.StringUtils; +import org.apache.http.Header; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.message.BasicHeader; +import org.apache.http.protocol.HttpContext; +import org.apache.http.util.EntityUtils; import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.sink.SinkConnector; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -34,11 +47,22 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.splunk.hecclient.Event; +import com.splunk.hecclient.EventBatch; +import com.splunk.hecclient.JsonEvent; +import com.splunk.hecclient.JsonEventBatch; + public final class SplunkSinkConnector extends SinkConnector { private static final Logger log = LoggerFactory.getLogger(SplunkSinkConnector.class); private Map taskConfig; private Map values; private List validations; + private AbstractClientWrapper abstractClientWrapper = new HecClientWrapper(); + + + public void setHecInstance(AbstractClientWrapper abstractClientWrapper) { + this.abstractClientWrapper = abstractClientWrapper; + } @Override public void start(Map taskConfig) { @@ -76,7 +100,7 @@ public ConfigDef config() { return SplunkSinkConnectorConfig.conf(); } - + @Override public Config validate(final Map connectorConfigs) { Config config = super.validate(connectorConfigs); @@ -84,6 +108,7 @@ public Config validate(final Map connectorConfigs) { values = validations.stream().collect(Collectors.toMap(ConfigValue::name, Function.identity())); validateKerberosConfigs(connectorConfigs); + validateSplunkConfigurations(connectorConfigs); return new Config(validations); } @@ -100,9 +125,9 @@ void validateKerberosConfigs(final Map configs) { } String errorMessage = String.format( - "Either both or neither '%s' and '%s' must be set for Kerberos authentication. ", - KERBEROS_KEYTAB_PATH_CONF, - KERBEROS_USER_PRINCIPAL_CONF + "Either both or neither '%s' and '%s' must be set for Kerberos authentication. ", + KERBEROS_KEYTAB_PATH_CONF, + KERBEROS_USER_PRINCIPAL_CONF ); addErrorMessage(KERBEROS_KEYTAB_PATH_CONF, errorMessage); addErrorMessage(KERBEROS_USER_PRINCIPAL_CONF, errorMessage); @@ -111,4 +136,76 @@ void validateKerberosConfigs(final Map configs) { private void addErrorMessage(String property, String error) { values.get(property).addErrorMessage(error); } -} + + private static String[] split(String data, String sep) { + if (data != null && !data.trim().isEmpty()) { + return data.trim().split(sep); + } + return null; + } + + + private void validateSplunkConfigurations(final Map configs) throws ConfigException { + SplunkSinkConnectorConfig connectorConfig = new SplunkSinkConnectorConfig(configs); + String[] indexes = split(connectorConfig.indexes, ","); + if(indexes == null || indexes.length == 0) { + preparePayloadAndExecuteRequest(connectorConfig, ""); + } else { + for (String index : indexes) { + preparePayloadAndExecuteRequest(connectorConfig, index); + } + } + } + + private void preparePayloadAndExecuteRequest(SplunkSinkConnectorConfig connectorConfig, String index) throws ConfigException { + Header[] headers = new Header[]{new BasicHeader("Authorization", String.format("Splunk %s", connectorConfig.splunkToken))}; + String endpoint = "/services/collector"; + String url = connectorConfig.splunkURI + endpoint; + final HttpPost httpPost = new HttpPost(url); + httpPost.setHeaders(headers); + EventBatch batch = new JsonEventBatch(); + Event event = new JsonEvent("Splunk HEC Configuration Check", null); + event.setIndex(index); + event.setSource("kafka-connect"); + event.setSourcetype("kafka-connect"); + batch.add(event); + httpPost.setEntity(batch.getHttpEntity()); + CloseableHttpClient httpClient = abstractClientWrapper.getClient(connectorConfig.getHecConfig()); + executeHttpRequest(httpPost, httpClient); + } + + + + private void executeHttpRequest(final HttpUriRequest req, CloseableHttpClient httpClient) throws ConfigException { + CloseableHttpResponse resp = null; + HttpContext context; + context = HttpClientContext.create(); + try { + resp = httpClient.execute(req, context); + int status = resp.getStatusLine().getStatusCode(); + + String respPayload = EntityUtils.toString(resp.getEntity(), "utf-8"); + if (status > 299){ + throw new ConfigException(String.format("Bad splunk configurations with status code:%s response:%s",status,respPayload)); + } + } catch (ClientProtocolException ex) { + throw new ConfigException("Invalid splunk SSL configuration detected while validating configuration",ex); + } catch (IOException ex) { + throw new ConfigException("Invalid Splunk Configurations",ex); + } catch (ConfigException ex) { + throw ex; + } catch (Exception ex) { + throw new ConfigException("failed to process http payload",ex); + } finally { + try { + if (resp!= null) { + resp.close(); + } + } catch (Exception ex) { + throw new ConfigException("failed to close http response",ex); + } + } + } + + +} \ No newline at end of file diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java index dddd3291..23cec5a2 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java @@ -308,8 +308,10 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { regex = getString(REGEX_CONF); timestampFormat = getString(TIMESTAMP_FORMAT_CONF).trim(); validateRegexForTimestamp(regex); + } + public static ConfigDef conf() { return new ConfigDef() .define(TOKEN_CONF, ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, TOKEN_DOC) diff --git a/src/test/java/com/splunk/hecclient/CloseableHttpClientMock.java b/src/test/java/com/splunk/hecclient/CloseableHttpClientMock.java index 6d180256..d573abc7 100644 --- a/src/test/java/com/splunk/hecclient/CloseableHttpClientMock.java +++ b/src/test/java/com/splunk/hecclient/CloseableHttpClientMock.java @@ -31,6 +31,8 @@ public class CloseableHttpClientMock extends CloseableHttpClient { public static final String serverBusy = "{\"text\":\"Server busy\",\"code\":1}"; public static final String noDataError = "{\"text\":\"No data\",\"code\":5}"; public static final String invalidDataFormat = "{\"text\":\"Invalid data format\",\"code\":6}"; + public static final String inValidToken = "{\"text\":\"Invalid token\",\"code\":4}"; + public static final String inValidIndex = "{\"text\":\"Incorrect index\",\"code\":4,\"invalid-event-number\":1}"; public static final String exception = "excpetion"; private String resp = ""; @@ -49,6 +51,10 @@ protected CloseableHttpResponse doExecute(HttpHost target, HttpRequest request, return createResponse(resp, 503); } else if (resp.equals(noDataError)) { return createResponse(resp, 400); + }else if (resp.equals(inValidToken)) { + return createResponse(resp, 400); + }else if (resp.equals(inValidIndex)) { + return createResponse(resp, 400); } else { return createResponse(success, 201); } diff --git a/src/test/java/com/splunk/kafka/connect/MockHecClientWrapper.java b/src/test/java/com/splunk/kafka/connect/MockHecClientWrapper.java new file mode 100644 index 00000000..2666c800 --- /dev/null +++ b/src/test/java/com/splunk/kafka/connect/MockHecClientWrapper.java @@ -0,0 +1,20 @@ +package com.splunk.kafka.connect; + +import org.apache.http.impl.client.CloseableHttpClient; + +import com.splunk.hecclient.CloseableHttpClientMock; +import com.splunk.hecclient.Hec; +import com.splunk.hecclient.HecConfig; + +public class MockHecClientWrapper extends AbstractClientWrapper{ + public CloseableHttpClientMock client = new CloseableHttpClientMock(); + + @Override + CloseableHttpClient getClient(HecConfig config) { + // TODO Auto-generated method stub + if (config==null){} + + return client; + } + +} diff --git a/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java b/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java index 045e04ea..58308348 100644 --- a/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java +++ b/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java @@ -22,14 +22,18 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import org.apache.http.impl.client.CloseableHttpClient; import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.sink.SinkConnector; import org.junit.Assert; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import com.splunk.hecclient.CloseableHttpClientMock; import java.util.*; @@ -74,6 +78,10 @@ public void testValidKerberosBothEmpty() { final Map configs = new HashMap<>(); addNecessaryConfigs(configs); SinkConnector connector = new SplunkSinkConnector(); + configs.put("topics", "b"); + configs.put("splunk.indexes", "b"); + MockHecClientWrapper clientInstance = new MockHecClientWrapper(); + ((SplunkSinkConnector) connector).setHecInstance(clientInstance); Config result = connector.validate(configs); assertNoErrors(result); } @@ -85,6 +93,10 @@ public void testValidKerberosBothSet() { configs.put(KERBEROS_USER_PRINCIPAL_CONF, TEST_KERB_PRINCIPAL); configs.put(KERBEROS_KEYTAB_PATH_CONF, TEST_KERB_KEYTAB_PATH); SinkConnector connector = new SplunkSinkConnector(); + configs.put("topics", "b"); + configs.put("splunk.indexes", "b"); + MockHecClientWrapper clientInstance = new MockHecClientWrapper(); + ((SplunkSinkConnector) connector).setHecInstance(clientInstance); Config result = connector.validate(configs); assertNoErrors(result); } @@ -95,6 +107,10 @@ public void testInvalidKerberosOnlyPrincipalSet() { addNecessaryConfigs(configs); configs.put(KERBEROS_USER_PRINCIPAL_CONF, TEST_KERB_PRINCIPAL); SplunkSinkConnector connector = new SplunkSinkConnector(); + configs.put("topics", "b"); + configs.put("splunk.indexes", "b"); + MockHecClientWrapper clientInstance = new MockHecClientWrapper(); + ((SplunkSinkConnector) connector).setHecInstance(clientInstance); Config result = connector.validate(configs); assertHasErrorMessage(result, KERBEROS_USER_PRINCIPAL_CONF, "must be set"); assertHasErrorMessage(result, KERBEROS_KEYTAB_PATH_CONF, "must be set"); @@ -106,11 +122,54 @@ public void testInvalidKerberosOnlyKeytabSet() { addNecessaryConfigs(configs); configs.put(KERBEROS_KEYTAB_PATH_CONF, TEST_KERB_KEYTAB_PATH); SplunkSinkConnector connector = new SplunkSinkConnector(); + configs.put("topics", "b"); + configs.put("splunk.indexes", "b"); + MockHecClientWrapper clientInstance = new MockHecClientWrapper(); + ((SplunkSinkConnector) connector).setHecInstance(clientInstance); Config result = connector.validate(configs); assertHasErrorMessage(result, KERBEROS_USER_PRINCIPAL_CONF, "must be set"); assertHasErrorMessage(result, KERBEROS_KEYTAB_PATH_CONF, "must be set"); } + @Test + public void testInvalidToken() { + final Map configs = new HashMap<>(); + addNecessaryConfigs(configs); + SplunkSinkConnector connector = new SplunkSinkConnector(); + configs.put("topics", "b"); + configs.put("splunk.indexes", "b"); + MockHecClientWrapper clientInstance = new MockHecClientWrapper(); + clientInstance.client.setResponse(CloseableHttpClientMock.inValidToken); + ((SplunkSinkConnector) connector).setHecInstance(clientInstance); + Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs)); + } + + @Test + public void testInvalidIndex() { + final Map configs = new HashMap<>(); + addNecessaryConfigs(configs); + SplunkSinkConnector connector = new SplunkSinkConnector(); + configs.put("topics", "b"); + configs.put("splunk.indexes", "b"); + MockHecClientWrapper clientInstance = new MockHecClientWrapper(); + clientInstance.client.setResponse(CloseableHttpClientMock.inValidIndex); + ((SplunkSinkConnector) connector).setHecInstance(clientInstance); + Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs)); + } + + @Test + public void testValidSplunkConfigurations() { + final Map configs = new HashMap<>(); + addNecessaryConfigs(configs); + SplunkSinkConnector connector = new SplunkSinkConnector(); + configs.put("topics", "b"); + configs.put("splunk.indexes", "b"); + MockHecClientWrapper clientInstance = new MockHecClientWrapper(); + clientInstance.client.setResponse(CloseableHttpClientMock.success); + ((SplunkSinkConnector) connector).setHecInstance(clientInstance); + Assertions.assertDoesNotThrow(()->connector.validate(configs)); + } + private void addNecessaryConfigs(Map configs) { configs.put(URI_CONF, TEST_URI); configs.put(TOKEN_CONF, "blah");