From 9f53363cb5e0ef2ea0ea6088d93b7a90fb2cc3e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cdeep-splunk=E2=80=9D?= <“dkapadia@splunk.com”> Date: Fri, 9 Dec 2022 11:11:07 +0530 Subject: [PATCH 1/6] Hec-Functionality --- .vscode/settings.json | 4 + pom.xml | 9 + .../kafka/connect/HecClosableClient.java | 9 + .../splunk/kafka/connect/HecCreateClient.java | 17 ++ .../kafka/connect/SplunkSinkConnector.java | 159 ++++++++++++++++++ .../connect/SplunkSinkConnectorConfig.java | 132 +++++++++++++++ .../splunk/kafka/connect/HecInstanceMock.java | 19 +++ .../connect/SplunkSinkConnecterTest.java | 8 + 8 files changed, 357 insertions(+) create mode 100644 .vscode/settings.json create mode 100644 src/main/java/com/splunk/kafka/connect/HecClosableClient.java create mode 100644 src/main/java/com/splunk/kafka/connect/HecCreateClient.java create mode 100644 src/test/java/com/splunk/kafka/connect/HecInstanceMock.java diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..b84f89c3 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,4 @@ +{ + "java.configuration.updateBuildConfiguration": "interactive", + "java.compile.nullAnalysis.mode": "automatic" +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index d600dae3..b030dd9b 100644 --- a/pom.xml +++ b/pom.xml @@ -21,6 +21,7 @@ + com.fasterxml.jackson.core @@ -41,6 +42,14 @@ 2.8.1 compile + + + com.github.tomakehurst + wiremock-jre8 + 2.35.0 + test + + org.junit.jupiter junit-jupiter-api diff --git a/src/main/java/com/splunk/kafka/connect/HecClosableClient.java b/src/main/java/com/splunk/kafka/connect/HecClosableClient.java new file mode 100644 index 00000000..66c015d7 --- /dev/null +++ b/src/main/java/com/splunk/kafka/connect/HecClosableClient.java @@ -0,0 +1,9 @@ +package com.splunk.kafka.connect; + +import org.apache.http.impl.client.CloseableHttpClient; + +import com.splunk.hecclient.HecConfig; + +public abstract class HecClosableClient { + abstract CloseableHttpClient getClient(HecConfig config); +} diff --git a/src/main/java/com/splunk/kafka/connect/HecCreateClient.java b/src/main/java/com/splunk/kafka/connect/HecCreateClient.java new file mode 100644 index 00000000..e2d8dfb0 --- /dev/null +++ b/src/main/java/com/splunk/kafka/connect/HecCreateClient.java @@ -0,0 +1,17 @@ +package com.splunk.kafka.connect; + +import org.apache.http.impl.client.CloseableHttpClient; + +import com.splunk.hecclient.Hec; +import com.splunk.hecclient.HecConfig; + +public class HecCreateClient extends HecClosableClient { + + @Override + CloseableHttpClient getClient(HecConfig config) { + return Hec.createHttpClient(config); + + } + + +} diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java index 89750e02..82ed1667 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java @@ -17,6 +17,51 @@ import static com.splunk.kafka.connect.SplunkSinkConnectorConfig.KERBEROS_KEYTAB_PATH_CONF; import static com.splunk.kafka.connect.SplunkSinkConnectorConfig.KERBEROS_USER_PRINCIPAL_CONF; +import static com.splunk.kafka.connect.SplunkSinkConnectorConfig.URI_CONF; +import static com.splunk.kafka.connect.SplunkSinkConnectorConfig.TOKEN_CONF; +import static com.splunk.kafka.connect.SplunkSinkConnectorConfig.INDEX_CONF; +// import static com.splunk.kafka.connect.SplunkSinkConnectorConfig.SSL_VALIDATE_CERTIFICATES_CONF; +// import static com.splunk.kafka.connect.SplunkSinkConnectorConfig.SOCKET_TIMEOUT_CONF;; +// import static com.splunk.kafka.connect.SplunkSinkConnectorConfig.MAX_HTTP_CONNECTION_PER_CHANNEL_CONF;; +// import static com.splunk.kafka.connect.SplunkSinkConnectorConfig.TOTAL_HEC_CHANNEL_CONF;; +// import static com.splunk.kafka.connect.SplunkSinkConnectorConfig.EVENT_TIMEOUT_CONF; +// import static com.splunk.kafka.connect.SplunkSinkConnectorConfig.INDEX_CONF; +// import static com.splunk.kafka.connect.SplunkSinkConnectorConfig.INDEX_CONF; +// import static com.splunk.kafka.connect.SplunkSinkConnectorConfig.INDEX_CONF; +// import static com.splunk.kafka.connect.SplunkSinkConnectorConfig.INDEX_CONF; +// import static com.splunk.kafka.connect.SplunkSinkConnectorConfig.INDEX_CONF; + +import com.splunk.hecclient.Event; +import com.splunk.hecclient.EventBatch; +import com.splunk.hecclient.Hec; +import com.splunk.hecclient.HecConfig; +import com.splunk.hecclient.JsonEvent; +import com.splunk.hecclient.JsonEventBatch; + +import org.apache.commons.lang3.StringUtils; +import org.apache.http.Header; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.message.BasicHeader; +import org.apache.http.protocol.HttpContext; +import org.apache.http.util.EntityUtils; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.sink.SinkTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.URL; +import java.net.UnknownHostException; import java.util.function.Function; import java.util.stream.Collectors; @@ -28,6 +73,7 @@ import org.apache.kafka.connect.sink.SinkConnector; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; @@ -39,6 +85,12 @@ public final class SplunkSinkConnector extends SinkConnector { private Map taskConfig; private Map values; private List validations; + private HecClosableClient hecAb = new HecCreateClient(); + + + public void setHecInstance(HecClosableClient hecAb){ + this.hecAb=hecAb; + } @Override public void start(Map taskConfig) { @@ -76,6 +128,7 @@ public ConfigDef config() { return SplunkSinkConnectorConfig.conf(); } + @Override public Config validate(final Map connectorConfigs) { @@ -84,6 +137,7 @@ public Config validate(final Map connectorConfigs) { values = validations.stream().collect(Collectors.toMap(ConfigValue::name, Function.identity())); validateKerberosConfigs(connectorConfigs); + validateHealthCheckForSplunkIndexes(connectorConfigs); return new Config(validations); } @@ -111,4 +165,109 @@ void validateKerberosConfigs(final Map configs) { private void addErrorMessage(String property, String error) { values.get(property).addErrorMessage(error); } + + private static String[] split(String data, String sep) { + if (data != null && !data.trim().isEmpty()) { + return data.trim().split(sep); + } + return null; + } + + + private void validateHealthCheckForSplunkIndexes(final Map configs) { + + throw new ConfigException("check"); + // String splunkURI = configs.getOrDefault(URI_CONF,""); + // String indexes = configs.getOrDefault(INDEX_CONF,""); + // String splunkToken = configs.getOrDefault(TOKEN_CONF,""); + // if (indexes!=""){ + // log.info("started", splunkToken); + // System.out.println("started"); + // String[] topicIndexes = split(indexes, ","); + // for (String index: topicIndexes){ + // healthCheckForSplunkHEC(splunkURI,index,splunkToken,hecAb,configs); + // // throw new ConfigException("encountered exception when post data"); + // } + // } + } + + private void healthCheckForSplunkHEC(String splunkURI,String index,String splunkToken,HecClosableClient clientInstance,final Map configs) { + log.info("healthCheckForSplunkHEC", splunkToken, clientInstance); + Header[] headers; + headers = new Header[1]; + headers[0] = new BasicHeader("Authorization", String.format("Splunk %s", splunkToken)); + String endpoint = "/services/collector"; + String url = splunkURI + endpoint; + final HttpPost httpPost = new HttpPost(url); + httpPost.setHeaders(headers); + EventBatch batch = new JsonEventBatch(); + Event event = new JsonEvent("a:a", null); + event.setIndex(index); + batch.add(event); + httpPost.setEntity(batch.getHttpEntity()); + SplunkSinkConnectorConfig connectorConfig = new SplunkSinkConnectorConfig(configs); + CloseableHttpClient httpClient = clientInstance.getClient(connectorConfig.getHecConfig()); + // if (taskConfig!=null){ + // SplunkSinkConnectorConfig connectorConfig = new SplunkSinkConnectorConfig(taskConfig, hecAb); + // httpClient = clientInstance.getClient(connectorConfig.getHecConfig()); + // } + // else { + // httpClient = clientInstance.getClient(null); + // } + + try { + executeHttpRequest(httpPost,httpClient); + } catch (ConfigException e){ + + } + + // if (s!=null){ + // addErrorMessage(splunkURI, s); + // } + return; + } + + + + private String executeHttpRequest(final HttpUriRequest req,CloseableHttpClient httpClient){ + CloseableHttpResponse resp = null; + HttpContext context; + context = HttpClientContext.create(); + try { + resp = httpClient.execute(req, context); + } + catch (IOException ex) { + throw new ConfigException("Invalid", ex); + // ex.printStackTrace(); + // throw new ConfigException("encountered exception when post data", ex); + } + String respPayload; + if (resp !=null){ + HttpEntity entity = resp.getEntity(); + try { + respPayload = EntityUtils.toString(entity, "utf-8"); + } catch (Exception ex) { + // throw new ConfigException("failed to process http response", ex); + } finally { + try { + resp.close(); + } catch (Exception ex) { + // throw new ConfigException("failed to close http response", ex); + } + } + int status = resp.getStatusLine().getStatusCode(); + log.info(status+""); + if (status==201) { + return null; + // throw new ConfigException(String.format("Bad splunk configurations with status code:%s response:%s",status,respPayload)); + } + }else{ + return "erorrrrr"; + } + + return "erorrrrr"; + } + + + } diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java index dddd3291..dd48590d 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java @@ -15,14 +15,37 @@ */ package com.splunk.kafka.connect; +import com.splunk.hecclient.Event; +import com.splunk.hecclient.EventBatch; +import com.splunk.hecclient.Hec; import com.splunk.hecclient.HecConfig; +import com.splunk.hecclient.JsonEvent; +import com.splunk.hecclient.JsonEventBatch; + import org.apache.commons.lang3.StringUtils; +import org.apache.http.Header; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.message.BasicHeader; +import org.apache.http.protocol.HttpContext; +import org.apache.http.util.EntityUtils; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.sink.SinkTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.net.InetAddress; +import java.net.URL; +import java.net.UnknownHostException; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -34,6 +57,7 @@ import java.util.regex.PatternSyntaxException; public final class SplunkSinkConnectorConfig extends AbstractConfig { + private static final Logger log = LoggerFactory.getLogger(SplunkSinkConnectorConfig.class); // General static final String INDEX = "index"; static final String SOURCE = "source"; @@ -308,8 +332,12 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { regex = getString(REGEX_CONF); timestampFormat = getString(TIMESTAMP_FORMAT_CONF).trim(); validateRegexForTimestamp(regex); + // healthCheckForSplunkIndexes(splunkURI, indexes, splunkToken); + + } + public static ConfigDef conf() { return new ConfigDef() .define(TOKEN_CONF, ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, TOKEN_DOC) @@ -551,4 +579,108 @@ private static boolean getNamedGroupCandidates(String regex) { } return false; } + + + // private void healthCheckForSplunkIndexes(String splunkURI, String indexes,String splunkToken) { + // if (indexes!=""){ + // String[] topicIndexes = split(indexes, ","); + // for (String index: topicIndexes){ + // healthCheckForSplunkHEC(splunkURI,index,splunkToken); + // } + // } + + + // } + + + // // private void healthCheckForSplunkHEC(String splunkURI,String index,String splunkToken) { + // // String endpoint = "/services/collector"; + // // URL urlHealthCheck; + // // try { + // // urlHealthCheck = new URL(splunkURI+endpoint); + // // HttpURLConnection httpConn = (HttpURLConnection) urlHealthCheck.openConnection(); + // // httpConn.setRequestMethod("POST"); + + // // httpConn.setRequestProperty("Authorization", "Splunk " + splunkToken); + // // httpConn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded"); + + // // httpConn.setDoOutput(true); + // // OutputStreamWriter writer = new OutputStreamWriter(httpConn.getOutputStream()); + // // writer.write(String.format("{\"index\":\"%s\"}",index)); + // // writer.flush(); + // // writer.close(); + // // httpConn.getOutputStream().close(); + // // InputStream responseStream = httpConn.getResponseCode() / 100 == 2 + // // ? httpConn.getInputStream() + // // : httpConn.getErrorStream(); + // // Scanner s = new Scanner(responseStream).useDelimiter("\\A"); + // // String response = s.hasNext() ? s.next() : ""; + // // System.out.println(response); + // // if (httpConn.getResponseCode()!=12){ + // // throw new ConfigException(String.format("Bad splunk configurations with status code:%s response:%s",httpConn.getResponseCode(),response)); + // // } + // // } catch (MalformedURLException e) { + // // // TODO Auto-generated catch block + // // throw new ConfigException("Invalid Splunk configuration"); + // // } catch (ProtocolException e) { + // // throw new ConfigException("Invalid Splunk configuration"); + // // // TODO Auto-generated catch block + + // // } catch (IOException e) { + // // throw new ConfigException("Invalid Splunk configuration"); + // // // TODO Auto-generated catch block + + // // } + + // private void healthCheckForSplunkHEC(String splunkURI,String index,String splunkToken) { + // Header[] headers; + // headers = new Header[1]; + // headers[0] = new BasicHeader("Authorization", String.format("Splunk %s", splunkToken)); + // String endpoint = "/services/collector"; + // String url = splunkURI + endpoint; + // final HttpPost httpPost = new HttpPost(url); + // httpPost.setHeaders(headers); + // EventBatch batch = new JsonEventBatch(); + // Event event = new JsonEvent("a:a", null); + // event.setIndex(index); + // batch.add(event); + // httpPost.setEntity(batch.getHttpEntity()); + + // executeHttpRequest(httpPost); + // } + + // public void executeHttpRequest(final HttpUriRequest req){ + // CloseableHttpResponse resp; + // HttpContext context; + // context = HttpClientContext.create(); + // CloseableHttpClient httpClient = Hec.createHttpClient(getHecConfig()); + // try { + // resp = httpClient.execute(req, context); + // } + // catch (IOException ex) { + // ex.printStackTrace(); + // throw new ConfigException("encountered exception when post data", ex); + // } + + // String respPayload; + // HttpEntity entity = resp.getEntity(); + // try { + // respPayload = EntityUtils.toString(entity, "utf-8"); + // } catch (Exception ex) { + // throw new ConfigException("failed to process http response", ex); + // } finally { + // try { + // resp.close(); + // } catch (IOException ex) { + // throw new ConfigException("failed to close http response", ex); + // } + // } + + // int status = resp.getStatusLine().getStatusCode(); + // if (status!= 12) { + // throw new ConfigException(String.format("Bad splunk configurations with status code:%s response:%s",status,respPayload)); + + // } + // } + } diff --git a/src/test/java/com/splunk/kafka/connect/HecInstanceMock.java b/src/test/java/com/splunk/kafka/connect/HecInstanceMock.java new file mode 100644 index 00000000..6c8d7eee --- /dev/null +++ b/src/test/java/com/splunk/kafka/connect/HecInstanceMock.java @@ -0,0 +1,19 @@ +package com.splunk.kafka.connect; + +import org.apache.http.impl.client.CloseableHttpClient; + +import com.splunk.hecclient.CloseableHttpClientMock; +import com.splunk.hecclient.Hec; +import com.splunk.hecclient.HecConfig; + +public class HecInstanceMock extends HecClosableClient{ + + @Override + CloseableHttpClient getClient(HecConfig config) { + // TODO Auto-generated method stub + if (config==null){} + CloseableHttpClientMock client = new CloseableHttpClientMock(); + return client; + } + +} diff --git a/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java b/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java index 045e04ea..d6834d99 100644 --- a/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java +++ b/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java @@ -74,6 +74,8 @@ public void testValidKerberosBothEmpty() { final Map configs = new HashMap<>(); addNecessaryConfigs(configs); SinkConnector connector = new SplunkSinkConnector(); + HecInstanceMock clientInstance = new HecInstanceMock(); + ((SplunkSinkConnector) connector).setHecInstance(clientInstance); Config result = connector.validate(configs); assertNoErrors(result); } @@ -85,6 +87,8 @@ public void testValidKerberosBothSet() { configs.put(KERBEROS_USER_PRINCIPAL_CONF, TEST_KERB_PRINCIPAL); configs.put(KERBEROS_KEYTAB_PATH_CONF, TEST_KERB_KEYTAB_PATH); SinkConnector connector = new SplunkSinkConnector(); + HecInstanceMock clientInstance = new HecInstanceMock(); + ((SplunkSinkConnector) connector).setHecInstance(clientInstance); Config result = connector.validate(configs); assertNoErrors(result); } @@ -95,6 +99,8 @@ public void testInvalidKerberosOnlyPrincipalSet() { addNecessaryConfigs(configs); configs.put(KERBEROS_USER_PRINCIPAL_CONF, TEST_KERB_PRINCIPAL); SplunkSinkConnector connector = new SplunkSinkConnector(); + HecInstanceMock clientInstance = new HecInstanceMock(); + ((SplunkSinkConnector) connector).setHecInstance(clientInstance); Config result = connector.validate(configs); assertHasErrorMessage(result, KERBEROS_USER_PRINCIPAL_CONF, "must be set"); assertHasErrorMessage(result, KERBEROS_KEYTAB_PATH_CONF, "must be set"); @@ -106,6 +112,8 @@ public void testInvalidKerberosOnlyKeytabSet() { addNecessaryConfigs(configs); configs.put(KERBEROS_KEYTAB_PATH_CONF, TEST_KERB_KEYTAB_PATH); SplunkSinkConnector connector = new SplunkSinkConnector(); + HecInstanceMock clientInstance = new HecInstanceMock(); + ((SplunkSinkConnector) connector).setHecInstance(clientInstance); Config result = connector.validate(configs); assertHasErrorMessage(result, KERBEROS_USER_PRINCIPAL_CONF, "must be set"); assertHasErrorMessage(result, KERBEROS_KEYTAB_PATH_CONF, "must be set"); From 7e1dc392706105f8f8b2979be0696f6fe270f09f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cdeep-splunk=E2=80=9D?= <“dkapadia@splunk.com”> Date: Fri, 9 Dec 2022 15:23:24 +0530 Subject: [PATCH 2/6] UpdatedFunctionality-SplunkHealthCheck --- ...Client.java => AbstractClientWrapper.java} | 2 +- ...reateClient.java => HecClientWrapper.java} | 2 +- .../kafka/connect/SplunkSinkConnector.java | 172 ++++++------------ .../connect/SplunkSinkConnectorConfig.java | 132 +------------- .../hecclient/CloseableHttpClientMock.java | 6 + ...nceMock.java => MockHecClientWrapper.java} | 5 +- .../connect/SplunkSinkConnecterTest.java | 59 +++++- 7 files changed, 124 insertions(+), 254 deletions(-) rename src/main/java/com/splunk/kafka/connect/{HecClosableClient.java => AbstractClientWrapper.java} (80%) rename src/main/java/com/splunk/kafka/connect/{HecCreateClient.java => HecClientWrapper.java} (82%) rename src/test/java/com/splunk/kafka/connect/{HecInstanceMock.java => MockHecClientWrapper.java} (72%) diff --git a/src/main/java/com/splunk/kafka/connect/HecClosableClient.java b/src/main/java/com/splunk/kafka/connect/AbstractClientWrapper.java similarity index 80% rename from src/main/java/com/splunk/kafka/connect/HecClosableClient.java rename to src/main/java/com/splunk/kafka/connect/AbstractClientWrapper.java index 66c015d7..d73630de 100644 --- a/src/main/java/com/splunk/kafka/connect/HecClosableClient.java +++ b/src/main/java/com/splunk/kafka/connect/AbstractClientWrapper.java @@ -4,6 +4,6 @@ import com.splunk.hecclient.HecConfig; -public abstract class HecClosableClient { +public abstract class AbstractClientWrapper { abstract CloseableHttpClient getClient(HecConfig config); } diff --git a/src/main/java/com/splunk/kafka/connect/HecCreateClient.java b/src/main/java/com/splunk/kafka/connect/HecClientWrapper.java similarity index 82% rename from src/main/java/com/splunk/kafka/connect/HecCreateClient.java rename to src/main/java/com/splunk/kafka/connect/HecClientWrapper.java index e2d8dfb0..d4358d18 100644 --- a/src/main/java/com/splunk/kafka/connect/HecCreateClient.java +++ b/src/main/java/com/splunk/kafka/connect/HecClientWrapper.java @@ -5,7 +5,7 @@ import com.splunk.hecclient.Hec; import com.splunk.hecclient.HecConfig; -public class HecCreateClient extends HecClosableClient { +public class HecClientWrapper extends AbstractClientWrapper { @Override CloseableHttpClient getClient(HecConfig config) { diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java index 82ed1667..449f7fe2 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java @@ -17,79 +17,52 @@ import static com.splunk.kafka.connect.SplunkSinkConnectorConfig.KERBEROS_KEYTAB_PATH_CONF; import static com.splunk.kafka.connect.SplunkSinkConnectorConfig.KERBEROS_USER_PRINCIPAL_CONF; -import static com.splunk.kafka.connect.SplunkSinkConnectorConfig.URI_CONF; -import static com.splunk.kafka.connect.SplunkSinkConnectorConfig.TOKEN_CONF; -import static com.splunk.kafka.connect.SplunkSinkConnectorConfig.INDEX_CONF; -// import static com.splunk.kafka.connect.SplunkSinkConnectorConfig.SSL_VALIDATE_CERTIFICATES_CONF; -// import static com.splunk.kafka.connect.SplunkSinkConnectorConfig.SOCKET_TIMEOUT_CONF;; -// import static com.splunk.kafka.connect.SplunkSinkConnectorConfig.MAX_HTTP_CONNECTION_PER_CHANNEL_CONF;; -// import static com.splunk.kafka.connect.SplunkSinkConnectorConfig.TOTAL_HEC_CHANNEL_CONF;; -// import static com.splunk.kafka.connect.SplunkSinkConnectorConfig.EVENT_TIMEOUT_CONF; -// import static com.splunk.kafka.connect.SplunkSinkConnectorConfig.INDEX_CONF; -// import static com.splunk.kafka.connect.SplunkSinkConnectorConfig.INDEX_CONF; -// import static com.splunk.kafka.connect.SplunkSinkConnectorConfig.INDEX_CONF; -// import static com.splunk.kafka.connect.SplunkSinkConnectorConfig.INDEX_CONF; -// import static com.splunk.kafka.connect.SplunkSinkConnectorConfig.INDEX_CONF; -import com.splunk.hecclient.Event; -import com.splunk.hecclient.EventBatch; -import com.splunk.hecclient.Hec; -import com.splunk.hecclient.HecConfig; -import com.splunk.hecclient.JsonEvent; -import com.splunk.hecclient.JsonEventBatch; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.avro.Protocol; import org.apache.commons.lang3.StringUtils; import org.apache.http.Header; -import org.apache.http.HttpEntity; +import org.apache.http.client.ClientProtocolException; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.client.protocol.HttpClientContext; import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; import org.apache.http.message.BasicHeader; import org.apache.http.protocol.HttpContext; import org.apache.http.util.EntityUtils; -import org.apache.kafka.common.config.AbstractConfig; -import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.connect.sink.SinkConnector; -import org.apache.kafka.connect.sink.SinkTask; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.URL; -import java.net.UnknownHostException; - -import java.util.function.Function; -import java.util.stream.Collectors; -import org.apache.commons.lang3.StringUtils; import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.sink.SinkConnector; +import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.splunk.hecclient.Event; +import com.splunk.hecclient.EventBatch; +import com.splunk.hecclient.JsonEvent; +import com.splunk.hecclient.JsonEventBatch; + public final class SplunkSinkConnector extends SinkConnector { private static final Logger log = LoggerFactory.getLogger(SplunkSinkConnector.class); private Map taskConfig; private Map values; private List validations; - private HecClosableClient hecAb = new HecCreateClient(); + private AbstractClientWrapper hecAb = new HecClientWrapper(); - public void setHecInstance(HecClosableClient hecAb){ - this.hecAb=hecAb; + public void setHecInstance(AbstractClientWrapper hecAb) { + this.hecAb = hecAb; } @Override @@ -129,7 +102,6 @@ public ConfigDef config() { } - @Override public Config validate(final Map connectorConfigs) { Config config = super.validate(connectorConfigs); @@ -154,9 +126,9 @@ void validateKerberosConfigs(final Map configs) { } String errorMessage = String.format( - "Either both or neither '%s' and '%s' must be set for Kerberos authentication. ", - KERBEROS_KEYTAB_PATH_CONF, - KERBEROS_USER_PRINCIPAL_CONF + "Either both or neither '%s' and '%s' must be set for Kerberos authentication. ", + KERBEROS_KEYTAB_PATH_CONF, + KERBEROS_USER_PRINCIPAL_CONF ); addErrorMessage(KERBEROS_KEYTAB_PATH_CONF, errorMessage); addErrorMessage(KERBEROS_USER_PRINCIPAL_CONF, errorMessage); @@ -174,30 +146,24 @@ private static String[] split(String data, String sep) { } - private void validateHealthCheckForSplunkIndexes(final Map configs) { - - throw new ConfigException("check"); - // String splunkURI = configs.getOrDefault(URI_CONF,""); - // String indexes = configs.getOrDefault(INDEX_CONF,""); - // String splunkToken = configs.getOrDefault(TOKEN_CONF,""); - // if (indexes!=""){ - // log.info("started", splunkToken); - // System.out.println("started"); - // String[] topicIndexes = split(indexes, ","); - // for (String index: topicIndexes){ - // healthCheckForSplunkHEC(splunkURI,index,splunkToken,hecAb,configs); - // // throw new ConfigException("encountered exception when post data"); - // } - // } + private void validateHealthCheckForSplunkIndexes(final Map configs) throws ConfigException { + SplunkSinkConnectorConfig connectorConfig = new SplunkSinkConnectorConfig(configs); + String[] indexes = split(connectorConfig.indexes, ","); + if(indexes.length == 0) { + healthCheckForSplunkHEC(connectorConfig, ""); + } else { + for (String index : indexes) { + healthCheckForSplunkHEC(connectorConfig, index); + } + } } - private void healthCheckForSplunkHEC(String splunkURI,String index,String splunkToken,HecClosableClient clientInstance,final Map configs) { - log.info("healthCheckForSplunkHEC", splunkToken, clientInstance); + private void healthCheckForSplunkHEC(SplunkSinkConnectorConfig connectorConfig, String index) throws ConfigException { Header[] headers; headers = new Header[1]; - headers[0] = new BasicHeader("Authorization", String.format("Splunk %s", splunkToken)); + headers[0] = new BasicHeader("Authorization", String.format("Splunk %s", connectorConfig.splunkToken)); String endpoint = "/services/collector"; - String url = splunkURI + endpoint; + String url = connectorConfig.splunkURI + endpoint; final HttpPost httpPost = new HttpPost(url); httpPost.setHeaders(headers); EventBatch batch = new JsonEventBatch(); @@ -205,69 +171,45 @@ private void healthCheckForSplunkHEC(String splunkURI,String index,String splunk event.setIndex(index); batch.add(event); httpPost.setEntity(batch.getHttpEntity()); - SplunkSinkConnectorConfig connectorConfig = new SplunkSinkConnectorConfig(configs); - CloseableHttpClient httpClient = clientInstance.getClient(connectorConfig.getHecConfig()); - // if (taskConfig!=null){ - // SplunkSinkConnectorConfig connectorConfig = new SplunkSinkConnectorConfig(taskConfig, hecAb); - // httpClient = clientInstance.getClient(connectorConfig.getHecConfig()); - // } - // else { - // httpClient = clientInstance.getClient(null); - // } - - try { - executeHttpRequest(httpPost,httpClient); - } catch (ConfigException e){ - } - - // if (s!=null){ - // addErrorMessage(splunkURI, s); - // } - return; + CloseableHttpClient httpClient = hecAb.getClient(connectorConfig.getHecConfig()); + executeHttpRequest(httpPost, httpClient); } - - private String executeHttpRequest(final HttpUriRequest req,CloseableHttpClient httpClient){ + private void executeHttpRequest(final HttpUriRequest req, CloseableHttpClient httpClient) throws ConfigException { CloseableHttpResponse resp = null; HttpContext context; context = HttpClientContext.create(); - try { - resp = httpClient.execute(req, context); - } - catch (IOException ex) { - throw new ConfigException("Invalid", ex); - // ex.printStackTrace(); - // throw new ConfigException("encountered exception when post data", ex); + try { + resp = httpClient.execute(req, context); + } catch (ClientProtocolException ex) { + throw new ConfigException("Invalid splunk SSL configuration detected while validating configuration",ex); + } catch (IOException ex) { + throw new ConfigException("Invalid Splunk Configurations",ex); + } + try { + String respPayload = EntityUtils.toString(resp.getEntity(), "utf-8"); + if (respPayload.contains("Incorrect index")){ + throw new ConfigException("Incorrect Index detected while validating configuration"); } - String respPayload; - if (resp !=null){ - HttpEntity entity = resp.getEntity(); + else if (respPayload.contains("Invalid token")){ + throw new ConfigException("Incorrect HEC token detected while validating configuration"); + } + } catch(ConfigException ex){ + throw ex; + } + catch (Exception ex) { + throw new ConfigException("failed to process http payload",ex); + } finally { try { - respPayload = EntityUtils.toString(entity, "utf-8"); + resp.close(); } catch (Exception ex) { - // throw new ConfigException("failed to process http response", ex); - } finally { - try { - resp.close(); - } catch (Exception ex) { - // throw new ConfigException("failed to close http response", ex); - } + throw new ConfigException("failed to close http response",ex); } - int status = resp.getStatusLine().getStatusCode(); - log.info(status+""); - if (status==201) { - return null; - // throw new ConfigException(String.format("Bad splunk configurations with status code:%s response:%s",status,respPayload)); - } - }else{ - return "erorrrrr"; + } - - return "erorrrrr"; } - -} +} \ No newline at end of file diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java index dd48590d..23cec5a2 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java @@ -15,37 +15,14 @@ */ package com.splunk.kafka.connect; -import com.splunk.hecclient.Event; -import com.splunk.hecclient.EventBatch; -import com.splunk.hecclient.Hec; import com.splunk.hecclient.HecConfig; -import com.splunk.hecclient.JsonEvent; -import com.splunk.hecclient.JsonEventBatch; - import org.apache.commons.lang3.StringUtils; -import org.apache.http.Header; -import org.apache.http.HttpEntity; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.client.methods.HttpUriRequest; -import org.apache.http.client.protocol.HttpClientContext; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.message.BasicHeader; -import org.apache.http.protocol.HttpContext; -import org.apache.http.util.EntityUtils; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.sink.SinkTask; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.net.InetAddress; -import java.net.URL; -import java.net.UnknownHostException; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -57,7 +34,6 @@ import java.util.regex.PatternSyntaxException; public final class SplunkSinkConnectorConfig extends AbstractConfig { - private static final Logger log = LoggerFactory.getLogger(SplunkSinkConnectorConfig.class); // General static final String INDEX = "index"; static final String SOURCE = "source"; @@ -332,9 +308,7 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig { regex = getString(REGEX_CONF); timestampFormat = getString(TIMESTAMP_FORMAT_CONF).trim(); validateRegexForTimestamp(regex); - // healthCheckForSplunkIndexes(splunkURI, indexes, splunkToken); - - + } @@ -579,108 +553,4 @@ private static boolean getNamedGroupCandidates(String regex) { } return false; } - - - // private void healthCheckForSplunkIndexes(String splunkURI, String indexes,String splunkToken) { - // if (indexes!=""){ - // String[] topicIndexes = split(indexes, ","); - // for (String index: topicIndexes){ - // healthCheckForSplunkHEC(splunkURI,index,splunkToken); - // } - // } - - - // } - - - // // private void healthCheckForSplunkHEC(String splunkURI,String index,String splunkToken) { - // // String endpoint = "/services/collector"; - // // URL urlHealthCheck; - // // try { - // // urlHealthCheck = new URL(splunkURI+endpoint); - // // HttpURLConnection httpConn = (HttpURLConnection) urlHealthCheck.openConnection(); - // // httpConn.setRequestMethod("POST"); - - // // httpConn.setRequestProperty("Authorization", "Splunk " + splunkToken); - // // httpConn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded"); - - // // httpConn.setDoOutput(true); - // // OutputStreamWriter writer = new OutputStreamWriter(httpConn.getOutputStream()); - // // writer.write(String.format("{\"index\":\"%s\"}",index)); - // // writer.flush(); - // // writer.close(); - // // httpConn.getOutputStream().close(); - // // InputStream responseStream = httpConn.getResponseCode() / 100 == 2 - // // ? httpConn.getInputStream() - // // : httpConn.getErrorStream(); - // // Scanner s = new Scanner(responseStream).useDelimiter("\\A"); - // // String response = s.hasNext() ? s.next() : ""; - // // System.out.println(response); - // // if (httpConn.getResponseCode()!=12){ - // // throw new ConfigException(String.format("Bad splunk configurations with status code:%s response:%s",httpConn.getResponseCode(),response)); - // // } - // // } catch (MalformedURLException e) { - // // // TODO Auto-generated catch block - // // throw new ConfigException("Invalid Splunk configuration"); - // // } catch (ProtocolException e) { - // // throw new ConfigException("Invalid Splunk configuration"); - // // // TODO Auto-generated catch block - - // // } catch (IOException e) { - // // throw new ConfigException("Invalid Splunk configuration"); - // // // TODO Auto-generated catch block - - // // } - - // private void healthCheckForSplunkHEC(String splunkURI,String index,String splunkToken) { - // Header[] headers; - // headers = new Header[1]; - // headers[0] = new BasicHeader("Authorization", String.format("Splunk %s", splunkToken)); - // String endpoint = "/services/collector"; - // String url = splunkURI + endpoint; - // final HttpPost httpPost = new HttpPost(url); - // httpPost.setHeaders(headers); - // EventBatch batch = new JsonEventBatch(); - // Event event = new JsonEvent("a:a", null); - // event.setIndex(index); - // batch.add(event); - // httpPost.setEntity(batch.getHttpEntity()); - - // executeHttpRequest(httpPost); - // } - - // public void executeHttpRequest(final HttpUriRequest req){ - // CloseableHttpResponse resp; - // HttpContext context; - // context = HttpClientContext.create(); - // CloseableHttpClient httpClient = Hec.createHttpClient(getHecConfig()); - // try { - // resp = httpClient.execute(req, context); - // } - // catch (IOException ex) { - // ex.printStackTrace(); - // throw new ConfigException("encountered exception when post data", ex); - // } - - // String respPayload; - // HttpEntity entity = resp.getEntity(); - // try { - // respPayload = EntityUtils.toString(entity, "utf-8"); - // } catch (Exception ex) { - // throw new ConfigException("failed to process http response", ex); - // } finally { - // try { - // resp.close(); - // } catch (IOException ex) { - // throw new ConfigException("failed to close http response", ex); - // } - // } - - // int status = resp.getStatusLine().getStatusCode(); - // if (status!= 12) { - // throw new ConfigException(String.format("Bad splunk configurations with status code:%s response:%s",status,respPayload)); - - // } - // } - } diff --git a/src/test/java/com/splunk/hecclient/CloseableHttpClientMock.java b/src/test/java/com/splunk/hecclient/CloseableHttpClientMock.java index 6d180256..d573abc7 100644 --- a/src/test/java/com/splunk/hecclient/CloseableHttpClientMock.java +++ b/src/test/java/com/splunk/hecclient/CloseableHttpClientMock.java @@ -31,6 +31,8 @@ public class CloseableHttpClientMock extends CloseableHttpClient { public static final String serverBusy = "{\"text\":\"Server busy\",\"code\":1}"; public static final String noDataError = "{\"text\":\"No data\",\"code\":5}"; public static final String invalidDataFormat = "{\"text\":\"Invalid data format\",\"code\":6}"; + public static final String inValidToken = "{\"text\":\"Invalid token\",\"code\":4}"; + public static final String inValidIndex = "{\"text\":\"Incorrect index\",\"code\":4,\"invalid-event-number\":1}"; public static final String exception = "excpetion"; private String resp = ""; @@ -49,6 +51,10 @@ protected CloseableHttpResponse doExecute(HttpHost target, HttpRequest request, return createResponse(resp, 503); } else if (resp.equals(noDataError)) { return createResponse(resp, 400); + }else if (resp.equals(inValidToken)) { + return createResponse(resp, 400); + }else if (resp.equals(inValidIndex)) { + return createResponse(resp, 400); } else { return createResponse(success, 201); } diff --git a/src/test/java/com/splunk/kafka/connect/HecInstanceMock.java b/src/test/java/com/splunk/kafka/connect/MockHecClientWrapper.java similarity index 72% rename from src/test/java/com/splunk/kafka/connect/HecInstanceMock.java rename to src/test/java/com/splunk/kafka/connect/MockHecClientWrapper.java index 6c8d7eee..2666c800 100644 --- a/src/test/java/com/splunk/kafka/connect/HecInstanceMock.java +++ b/src/test/java/com/splunk/kafka/connect/MockHecClientWrapper.java @@ -6,13 +6,14 @@ import com.splunk.hecclient.Hec; import com.splunk.hecclient.HecConfig; -public class HecInstanceMock extends HecClosableClient{ +public class MockHecClientWrapper extends AbstractClientWrapper{ + public CloseableHttpClientMock client = new CloseableHttpClientMock(); @Override CloseableHttpClient getClient(HecConfig config) { // TODO Auto-generated method stub if (config==null){} - CloseableHttpClientMock client = new CloseableHttpClientMock(); + return client; } diff --git a/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java b/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java index d6834d99..58308348 100644 --- a/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java +++ b/src/test/java/com/splunk/kafka/connect/SplunkSinkConnecterTest.java @@ -22,14 +22,18 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import org.apache.http.impl.client.CloseableHttpClient; import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.sink.SinkConnector; import org.junit.Assert; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import com.splunk.hecclient.CloseableHttpClientMock; import java.util.*; @@ -74,7 +78,9 @@ public void testValidKerberosBothEmpty() { final Map configs = new HashMap<>(); addNecessaryConfigs(configs); SinkConnector connector = new SplunkSinkConnector(); - HecInstanceMock clientInstance = new HecInstanceMock(); + configs.put("topics", "b"); + configs.put("splunk.indexes", "b"); + MockHecClientWrapper clientInstance = new MockHecClientWrapper(); ((SplunkSinkConnector) connector).setHecInstance(clientInstance); Config result = connector.validate(configs); assertNoErrors(result); @@ -87,7 +93,9 @@ public void testValidKerberosBothSet() { configs.put(KERBEROS_USER_PRINCIPAL_CONF, TEST_KERB_PRINCIPAL); configs.put(KERBEROS_KEYTAB_PATH_CONF, TEST_KERB_KEYTAB_PATH); SinkConnector connector = new SplunkSinkConnector(); - HecInstanceMock clientInstance = new HecInstanceMock(); + configs.put("topics", "b"); + configs.put("splunk.indexes", "b"); + MockHecClientWrapper clientInstance = new MockHecClientWrapper(); ((SplunkSinkConnector) connector).setHecInstance(clientInstance); Config result = connector.validate(configs); assertNoErrors(result); @@ -99,7 +107,9 @@ public void testInvalidKerberosOnlyPrincipalSet() { addNecessaryConfigs(configs); configs.put(KERBEROS_USER_PRINCIPAL_CONF, TEST_KERB_PRINCIPAL); SplunkSinkConnector connector = new SplunkSinkConnector(); - HecInstanceMock clientInstance = new HecInstanceMock(); + configs.put("topics", "b"); + configs.put("splunk.indexes", "b"); + MockHecClientWrapper clientInstance = new MockHecClientWrapper(); ((SplunkSinkConnector) connector).setHecInstance(clientInstance); Config result = connector.validate(configs); assertHasErrorMessage(result, KERBEROS_USER_PRINCIPAL_CONF, "must be set"); @@ -112,13 +122,54 @@ public void testInvalidKerberosOnlyKeytabSet() { addNecessaryConfigs(configs); configs.put(KERBEROS_KEYTAB_PATH_CONF, TEST_KERB_KEYTAB_PATH); SplunkSinkConnector connector = new SplunkSinkConnector(); - HecInstanceMock clientInstance = new HecInstanceMock(); + configs.put("topics", "b"); + configs.put("splunk.indexes", "b"); + MockHecClientWrapper clientInstance = new MockHecClientWrapper(); ((SplunkSinkConnector) connector).setHecInstance(clientInstance); Config result = connector.validate(configs); assertHasErrorMessage(result, KERBEROS_USER_PRINCIPAL_CONF, "must be set"); assertHasErrorMessage(result, KERBEROS_KEYTAB_PATH_CONF, "must be set"); } + @Test + public void testInvalidToken() { + final Map configs = new HashMap<>(); + addNecessaryConfigs(configs); + SplunkSinkConnector connector = new SplunkSinkConnector(); + configs.put("topics", "b"); + configs.put("splunk.indexes", "b"); + MockHecClientWrapper clientInstance = new MockHecClientWrapper(); + clientInstance.client.setResponse(CloseableHttpClientMock.inValidToken); + ((SplunkSinkConnector) connector).setHecInstance(clientInstance); + Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs)); + } + + @Test + public void testInvalidIndex() { + final Map configs = new HashMap<>(); + addNecessaryConfigs(configs); + SplunkSinkConnector connector = new SplunkSinkConnector(); + configs.put("topics", "b"); + configs.put("splunk.indexes", "b"); + MockHecClientWrapper clientInstance = new MockHecClientWrapper(); + clientInstance.client.setResponse(CloseableHttpClientMock.inValidIndex); + ((SplunkSinkConnector) connector).setHecInstance(clientInstance); + Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs)); + } + + @Test + public void testValidSplunkConfigurations() { + final Map configs = new HashMap<>(); + addNecessaryConfigs(configs); + SplunkSinkConnector connector = new SplunkSinkConnector(); + configs.put("topics", "b"); + configs.put("splunk.indexes", "b"); + MockHecClientWrapper clientInstance = new MockHecClientWrapper(); + clientInstance.client.setResponse(CloseableHttpClientMock.success); + ((SplunkSinkConnector) connector).setHecInstance(clientInstance); + Assertions.assertDoesNotThrow(()->connector.validate(configs)); + } + private void addNecessaryConfigs(Map configs) { configs.put(URI_CONF, TEST_URI); configs.put(TOKEN_CONF, "blah"); From 57a362eb6eea4f9205c18bdf4de0dd9df80a3370 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cdeep-splunk=E2=80=9D?= <“dkapadia@splunk.com”> Date: Fri, 9 Dec 2022 15:27:53 +0530 Subject: [PATCH 3/6] Removal of WireMock Dependency --- pom.xml | 8 -------- 1 file changed, 8 deletions(-) diff --git a/pom.xml b/pom.xml index b030dd9b..759a45c9 100644 --- a/pom.xml +++ b/pom.xml @@ -42,14 +42,6 @@ 2.8.1 compile - - - com.github.tomakehurst - wiremock-jre8 - 2.35.0 - test - - org.junit.jupiter junit-jupiter-api From bbe27ef9dc88a3b574e112808f5ee309e0a78c31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cdeep-splunk=E2=80=9D?= <“dkapadia@splunk.com”> Date: Mon, 12 Dec 2022 15:57:17 +0530 Subject: [PATCH 4/6] review-changes-executehttprequest,rename variables --- .vscode/settings.json | 4 -- .../kafka/connect/SplunkSinkConnector.java | 52 +++++++++---------- 2 files changed, 25 insertions(+), 31 deletions(-) delete mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index b84f89c3..00000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,4 +0,0 @@ -{ - "java.configuration.updateBuildConfiguration": "interactive", - "java.compile.nullAnalysis.mode": "automatic" -} \ No newline at end of file diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java index 449f7fe2..bbd64c27 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java @@ -21,7 +21,6 @@ import java.util.function.Function; import java.util.stream.Collectors; -import org.apache.avro.Protocol; import org.apache.commons.lang3.StringUtils; import org.apache.http.Header; import org.apache.http.client.ClientProtocolException; @@ -58,11 +57,11 @@ public final class SplunkSinkConnector extends SinkConnector { private Map taskConfig; private Map values; private List validations; - private AbstractClientWrapper hecAb = new HecClientWrapper(); + private AbstractClientWrapper abstractClientWrapper = new HecClientWrapper(); - public void setHecInstance(AbstractClientWrapper hecAb) { - this.hecAb = hecAb; + public void setHecInstance(AbstractClientWrapper abstractClientWrapper) { + this.abstractClientWrapper = abstractClientWrapper; } @Override @@ -109,7 +108,7 @@ public Config validate(final Map connectorConfigs) { values = validations.stream().collect(Collectors.toMap(ConfigValue::name, Function.identity())); validateKerberosConfigs(connectorConfigs); - validateHealthCheckForSplunkIndexes(connectorConfigs); + validateSplunkConfigurations(connectorConfigs); return new Config(validations); } @@ -146,19 +145,19 @@ private static String[] split(String data, String sep) { } - private void validateHealthCheckForSplunkIndexes(final Map configs) throws ConfigException { + private void validateSplunkConfigurations(final Map configs) throws ConfigException { SplunkSinkConnectorConfig connectorConfig = new SplunkSinkConnectorConfig(configs); String[] indexes = split(connectorConfig.indexes, ","); - if(indexes.length == 0) { - healthCheckForSplunkHEC(connectorConfig, ""); + if(indexes == null || indexes.length == 0) { + preparePayloadAndExecuteRequest(connectorConfig, ""); } else { for (String index : indexes) { - healthCheckForSplunkHEC(connectorConfig, index); + preparePayloadAndExecuteRequest(connectorConfig, index); } } } - private void healthCheckForSplunkHEC(SplunkSinkConnectorConfig connectorConfig, String index) throws ConfigException { + private void preparePayloadAndExecuteRequest(SplunkSinkConnectorConfig connectorConfig, String index) throws ConfigException { Header[] headers; headers = new Header[1]; headers[0] = new BasicHeader("Authorization", String.format("Splunk %s", connectorConfig.splunkToken)); @@ -167,47 +166,46 @@ private void healthCheckForSplunkHEC(SplunkSinkConnectorConfig connectorConfig, final HttpPost httpPost = new HttpPost(url); httpPost.setHeaders(headers); EventBatch batch = new JsonEventBatch(); - Event event = new JsonEvent("a:a", null); + Event event = new JsonEvent("Splunk HEC Configuration Check", null); event.setIndex(index); + event.setSource("kafka-connect"); + event.setSourcetype("kafka-connect"); batch.add(event); httpPost.setEntity(batch.getHttpEntity()); - - CloseableHttpClient httpClient = hecAb.getClient(connectorConfig.getHecConfig()); + CloseableHttpClient httpClient = abstractClientWrapper.getClient(connectorConfig.getHecConfig()); executeHttpRequest(httpPost, httpClient); } + private void executeHttpRequest(final HttpUriRequest req, CloseableHttpClient httpClient) throws ConfigException { CloseableHttpResponse resp = null; HttpContext context; context = HttpClientContext.create(); try { resp = httpClient.execute(req, context); + int status = resp.getStatusLine().getStatusCode(); + + String respPayload = EntityUtils.toString(resp.getEntity(), "utf-8"); + if (status > 299){ + throw new ConfigException(String.format("Bad splunk configurations with status code:%s response:%s",status,respPayload)); + } } catch (ClientProtocolException ex) { throw new ConfigException("Invalid splunk SSL configuration detected while validating configuration",ex); } catch (IOException ex) { throw new ConfigException("Invalid Splunk Configurations",ex); - } - try { - String respPayload = EntityUtils.toString(resp.getEntity(), "utf-8"); - if (respPayload.contains("Incorrect index")){ - throw new ConfigException("Incorrect Index detected while validating configuration"); - } - else if (respPayload.contains("Invalid token")){ - throw new ConfigException("Incorrect HEC token detected while validating configuration"); - } - } catch(ConfigException ex){ + } catch (ConfigException ex) { throw ex; - } - catch (Exception ex) { + } catch (Exception ex) { throw new ConfigException("failed to process http payload",ex); } finally { try { - resp.close(); + if (resp!= null) { + resp.close(); + } } catch (Exception ex) { throw new ConfigException("failed to close http response",ex); } - } } From ce919d264f93818d34126be5f1329027f7962411 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cdeep-splunk=E2=80=9D?= <“dkapadia@splunk.com”> Date: Tue, 13 Dec 2022 11:03:32 +0530 Subject: [PATCH 5/6] Inline-array for header --- .../java/com/splunk/kafka/connect/SplunkSinkConnector.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java index bbd64c27..99f248d5 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java @@ -159,8 +159,7 @@ private void validateSplunkConfigurations(final Map configs) thr private void preparePayloadAndExecuteRequest(SplunkSinkConnectorConfig connectorConfig, String index) throws ConfigException { Header[] headers; - headers = new Header[1]; - headers[0] = new BasicHeader("Authorization", String.format("Splunk %s", connectorConfig.splunkToken)); + headers = new Header[]{new BasicHeader("Authorization", String.format("Splunk %s", connectorConfig.splunkToken))}; String endpoint = "/services/collector"; String url = connectorConfig.splunkURI + endpoint; final HttpPost httpPost = new HttpPost(url); From 4872510f9b4a1a2e8c1c8fe33d44ac84ca6a0348 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cdeep-splunk=E2=80=9D?= <“dkapadia@splunk.com”> Date: Tue, 13 Dec 2022 11:54:38 +0530 Subject: [PATCH 6/6] removal of header declaration --- .../java/com/splunk/kafka/connect/SplunkSinkConnector.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java index 99f248d5..ee0b28f8 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java @@ -158,8 +158,7 @@ private void validateSplunkConfigurations(final Map configs) thr } private void preparePayloadAndExecuteRequest(SplunkSinkConnectorConfig connectorConfig, String index) throws ConfigException { - Header[] headers; - headers = new Header[]{new BasicHeader("Authorization", String.format("Splunk %s", connectorConfig.splunkToken))}; + Header[] headers = new Header[]{new BasicHeader("Authorization", String.format("Splunk %s", connectorConfig.splunkToken))}; String endpoint = "/services/collector"; String url = connectorConfig.splunkURI + endpoint; final HttpPost httpPost = new HttpPost(url);