Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
</properties>

<dependencies>

<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.splunk.kafka.connect;

import org.apache.http.impl.client.CloseableHttpClient;

import com.splunk.hecclient.HecConfig;

public abstract class AbstractClientWrapper {
abstract CloseableHttpClient getClient(HecConfig config);
}
17 changes: 17 additions & 0 deletions src/main/java/com/splunk/kafka/connect/HecClientWrapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.splunk.kafka.connect;

import org.apache.http.impl.client.CloseableHttpClient;

import com.splunk.hecclient.Hec;
import com.splunk.hecclient.HecConfig;

public class HecClientWrapper extends AbstractClientWrapper {

@Override
CloseableHttpClient getClient(HecConfig config) {
return Hec.createHttpClient(config);

}


}
107 changes: 102 additions & 5 deletions src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,49 @@

import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.message.BasicHeader;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.splunk.hecclient.Event;
import com.splunk.hecclient.EventBatch;
import com.splunk.hecclient.JsonEvent;
import com.splunk.hecclient.JsonEventBatch;

public final class SplunkSinkConnector extends SinkConnector {
private static final Logger log = LoggerFactory.getLogger(SplunkSinkConnector.class);
private Map<String, String> taskConfig;
private Map<String, ConfigValue> values;
private List<ConfigValue> validations;
private AbstractClientWrapper abstractClientWrapper = new HecClientWrapper();


public void setHecInstance(AbstractClientWrapper abstractClientWrapper) {
this.abstractClientWrapper = abstractClientWrapper;
}

@Override
public void start(Map<String, String> taskConfig) {
Expand Down Expand Up @@ -76,14 +100,15 @@ public ConfigDef config() {
return SplunkSinkConnectorConfig.conf();
}


@Override
public Config validate(final Map<String, String> connectorConfigs) {
Config config = super.validate(connectorConfigs);
validations = config.configValues();
values = validations.stream().collect(Collectors.toMap(ConfigValue::name, Function.identity()));

validateKerberosConfigs(connectorConfigs);
validateSplunkConfigurations(connectorConfigs);
return new Config(validations);
}

Expand All @@ -100,9 +125,9 @@ void validateKerberosConfigs(final Map<String, String> configs) {
}

String errorMessage = String.format(
"Either both or neither '%s' and '%s' must be set for Kerberos authentication. ",
KERBEROS_KEYTAB_PATH_CONF,
KERBEROS_USER_PRINCIPAL_CONF
"Either both or neither '%s' and '%s' must be set for Kerberos authentication. ",
KERBEROS_KEYTAB_PATH_CONF,
KERBEROS_USER_PRINCIPAL_CONF
);
addErrorMessage(KERBEROS_KEYTAB_PATH_CONF, errorMessage);
addErrorMessage(KERBEROS_USER_PRINCIPAL_CONF, errorMessage);
Expand All @@ -111,4 +136,76 @@ void validateKerberosConfigs(final Map<String, String> configs) {
private void addErrorMessage(String property, String error) {
values.get(property).addErrorMessage(error);
}
}

private static String[] split(String data, String sep) {
if (data != null && !data.trim().isEmpty()) {
return data.trim().split(sep);
}
return null;
}


private void validateSplunkConfigurations(final Map<String, String> configs) throws ConfigException {
SplunkSinkConnectorConfig connectorConfig = new SplunkSinkConnectorConfig(configs);
String[] indexes = split(connectorConfig.indexes, ",");
if(indexes == null || indexes.length == 0) {
preparePayloadAndExecuteRequest(connectorConfig, "");
} else {
for (String index : indexes) {
preparePayloadAndExecuteRequest(connectorConfig, index);
}
}
}

private void preparePayloadAndExecuteRequest(SplunkSinkConnectorConfig connectorConfig, String index) throws ConfigException {
Header[] headers = new Header[]{new BasicHeader("Authorization", String.format("Splunk %s", connectorConfig.splunkToken))};
String endpoint = "/services/collector";
String url = connectorConfig.splunkURI + endpoint;
final HttpPost httpPost = new HttpPost(url);
httpPost.setHeaders(headers);
EventBatch batch = new JsonEventBatch();
Event event = new JsonEvent("Splunk HEC Configuration Check", null);
event.setIndex(index);
event.setSource("kafka-connect");
event.setSourcetype("kafka-connect");
batch.add(event);
httpPost.setEntity(batch.getHttpEntity());
CloseableHttpClient httpClient = abstractClientWrapper.getClient(connectorConfig.getHecConfig());
executeHttpRequest(httpPost, httpClient);
}



private void executeHttpRequest(final HttpUriRequest req, CloseableHttpClient httpClient) throws ConfigException {
CloseableHttpResponse resp = null;
HttpContext context;
context = HttpClientContext.create();
try {
resp = httpClient.execute(req, context);
int status = resp.getStatusLine().getStatusCode();

String respPayload = EntityUtils.toString(resp.getEntity(), "utf-8");
if (status > 299){
throw new ConfigException(String.format("Bad splunk configurations with status code:%s response:%s",status,respPayload));
}
} catch (ClientProtocolException ex) {
throw new ConfigException("Invalid splunk SSL configuration detected while validating configuration",ex);
} catch (IOException ex) {
throw new ConfigException("Invalid Splunk Configurations",ex);
} catch (ConfigException ex) {
throw ex;
} catch (Exception ex) {
throw new ConfigException("failed to process http payload",ex);
} finally {
try {
if (resp!= null) {
resp.close();
}
} catch (Exception ex) {
throw new ConfigException("failed to close http response",ex);
}
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,10 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
regex = getString(REGEX_CONF);
timestampFormat = getString(TIMESTAMP_FORMAT_CONF).trim();
validateRegexForTimestamp(regex);

}


public static ConfigDef conf() {
return new ConfigDef()
.define(TOKEN_CONF, ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, TOKEN_DOC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class CloseableHttpClientMock extends CloseableHttpClient {
public static final String serverBusy = "{\"text\":\"Server busy\",\"code\":1}";
public static final String noDataError = "{\"text\":\"No data\",\"code\":5}";
public static final String invalidDataFormat = "{\"text\":\"Invalid data format\",\"code\":6}";
public static final String inValidToken = "{\"text\":\"Invalid token\",\"code\":4}";
public static final String inValidIndex = "{\"text\":\"Incorrect index\",\"code\":4,\"invalid-event-number\":1}";
public static final String exception = "excpetion";

private String resp = "";
Expand All @@ -49,6 +51,10 @@ protected CloseableHttpResponse doExecute(HttpHost target, HttpRequest request,
return createResponse(resp, 503);
} else if (resp.equals(noDataError)) {
return createResponse(resp, 400);
}else if (resp.equals(inValidToken)) {
return createResponse(resp, 400);
}else if (resp.equals(inValidIndex)) {
return createResponse(resp, 400);
} else {
return createResponse(success, 201);
}
Expand Down
20 changes: 20 additions & 0 deletions src/test/java/com/splunk/kafka/connect/MockHecClientWrapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.splunk.kafka.connect;

import org.apache.http.impl.client.CloseableHttpClient;

import com.splunk.hecclient.CloseableHttpClientMock;
import com.splunk.hecclient.Hec;
import com.splunk.hecclient.HecConfig;

public class MockHecClientWrapper extends AbstractClientWrapper{
public CloseableHttpClientMock client = new CloseableHttpClientMock();

@Override
CloseableHttpClient getClient(HecConfig config) {
// TODO Auto-generated method stub
if (config==null){}

return client;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,18 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import com.splunk.hecclient.CloseableHttpClientMock;

import java.util.*;

Expand Down Expand Up @@ -74,6 +78,10 @@ public void testValidKerberosBothEmpty() {
final Map<String, String> configs = new HashMap<>();
addNecessaryConfigs(configs);
SinkConnector connector = new SplunkSinkConnector();
configs.put("topics", "b");
configs.put("splunk.indexes", "b");
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
Config result = connector.validate(configs);
assertNoErrors(result);
}
Expand All @@ -85,6 +93,10 @@ public void testValidKerberosBothSet() {
configs.put(KERBEROS_USER_PRINCIPAL_CONF, TEST_KERB_PRINCIPAL);
configs.put(KERBEROS_KEYTAB_PATH_CONF, TEST_KERB_KEYTAB_PATH);
SinkConnector connector = new SplunkSinkConnector();
configs.put("topics", "b");
configs.put("splunk.indexes", "b");
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
Config result = connector.validate(configs);
assertNoErrors(result);
}
Expand All @@ -95,6 +107,10 @@ public void testInvalidKerberosOnlyPrincipalSet() {
addNecessaryConfigs(configs);
configs.put(KERBEROS_USER_PRINCIPAL_CONF, TEST_KERB_PRINCIPAL);
SplunkSinkConnector connector = new SplunkSinkConnector();
configs.put("topics", "b");
configs.put("splunk.indexes", "b");
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
Config result = connector.validate(configs);
assertHasErrorMessage(result, KERBEROS_USER_PRINCIPAL_CONF, "must be set");
assertHasErrorMessage(result, KERBEROS_KEYTAB_PATH_CONF, "must be set");
Expand All @@ -106,11 +122,54 @@ public void testInvalidKerberosOnlyKeytabSet() {
addNecessaryConfigs(configs);
configs.put(KERBEROS_KEYTAB_PATH_CONF, TEST_KERB_KEYTAB_PATH);
SplunkSinkConnector connector = new SplunkSinkConnector();
configs.put("topics", "b");
configs.put("splunk.indexes", "b");
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
Config result = connector.validate(configs);
assertHasErrorMessage(result, KERBEROS_USER_PRINCIPAL_CONF, "must be set");
assertHasErrorMessage(result, KERBEROS_KEYTAB_PATH_CONF, "must be set");
}

@Test
public void testInvalidToken() {
final Map<String, String> configs = new HashMap<>();
addNecessaryConfigs(configs);
SplunkSinkConnector connector = new SplunkSinkConnector();
configs.put("topics", "b");
configs.put("splunk.indexes", "b");
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
clientInstance.client.setResponse(CloseableHttpClientMock.inValidToken);
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs));
}

@Test
public void testInvalidIndex() {
final Map<String, String> configs = new HashMap<>();
addNecessaryConfigs(configs);
SplunkSinkConnector connector = new SplunkSinkConnector();
configs.put("topics", "b");
configs.put("splunk.indexes", "b");
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
clientInstance.client.setResponse(CloseableHttpClientMock.inValidIndex);
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs));
}

@Test
public void testValidSplunkConfigurations() {
final Map<String, String> configs = new HashMap<>();
addNecessaryConfigs(configs);
SplunkSinkConnector connector = new SplunkSinkConnector();
configs.put("topics", "b");
configs.put("splunk.indexes", "b");
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
clientInstance.client.setResponse(CloseableHttpClientMock.success);
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
Assertions.assertDoesNotThrow(()->connector.validate(configs));
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add tests for an invalid host, and invalid SSL configuration(i.e. HTTP request on HTTPS server and vice-versa)?

private void addNecessaryConfigs(Map<String, String> configs) {
configs.put(URI_CONF, TEST_URI);
configs.put(TOKEN_CONF, "blah");
Expand Down