diff --git a/src/main/java/com/splunk/hecclient/HecAckPoller.java b/src/main/java/com/splunk/hecclient/HecAckPoller.java index 3f05bfd0..0b43d3ba 100644 --- a/src/main/java/com/splunk/hecclient/HecAckPoller.java +++ b/src/main/java/com/splunk/hecclient/HecAckPoller.java @@ -33,7 +33,7 @@ public final class HecAckPoller implements Poller { private static final Logger log = LoggerFactory.getLogger(HecAckPoller.class); private static final ObjectMapper jsonMapper = new ObjectMapper(); - private static final String ackEndpoint = "/services/collector/ack"; + private static final String ACK_ENDPOINT = "/services/collector/ack"; private ConcurrentHashMap> outstandingEventBatches; private AtomicLong totalOutstandingEventBatches; @@ -382,7 +382,7 @@ private static HttpUriRequest createAckPollHttpRequest(HecChannel ch, Set entity.setContentType("application/json; profile=urn:splunk:event:1.0; charset=utf-8"); - String url = ch.getIndexer().getBaseUrl() + ackEndpoint; + String url = ch.getIndexer().getBaseUrl() + ACK_ENDPOINT; final HttpPost httpPost = new HttpPost(url); httpPost.setHeaders(ch.getIndexer().getHeaders()); httpPost.setEntity(entity); diff --git a/src/main/java/com/splunk/hecclient/ResponsePoller.java b/src/main/java/com/splunk/hecclient/ResponsePoller.java index 1f5a5467..8fb69e03 100644 --- a/src/main/java/com/splunk/hecclient/ResponsePoller.java +++ b/src/main/java/com/splunk/hecclient/ResponsePoller.java @@ -32,14 +32,18 @@ public ResponsePoller(PollerCallback callback) { } @Override - public void stickySessionHandler(HecChannel channel) {} + public void stickySessionHandler(HecChannel channel) { + // Only required while acknowledgement=true + } @Override public void start() { + // Only required while acknowledgement=true } @Override public void stop() { + // Only required while acknowledgement=true } @Override @@ -68,7 +72,7 @@ public void add(HecChannel channel, EventBatch batch, String resp) { fail(channel, batch, new HecException(response.getText())); return; } - if (response.getText() == "Invalid data format") { + if (response.getText().equals("Invalid data format")) { log.warn("Invalid Splunk HEC data format. Ignoring events. channel={} index={} events={}", channel, channel.getIndexer(), batch.toString()); } } catch (Exception ex) { diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java index 9b4fdcbc..3fb58e07 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java @@ -217,7 +217,7 @@ private void executeHttpRequest(final HttpUriRequest req, CloseableHttpClient ht resp.close(); } } catch (Exception ex) { - throw new ConfigException("failed to close http response",ex); + throw new ConfigException("failed to close http response",ex); // NOSONAR } } } diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java index b84dd277..f90db541 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java @@ -360,10 +360,10 @@ public static ConfigDef conf() { .define(MAX_BATCH_SIZE_CONF, ConfigDef.Type.INT, 500, ConfigDef.Importance.MEDIUM, MAX_BATCH_SIZE_DOC) .define(HEADER_SUPPORT_CONF, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.MEDIUM, HEADER_SUPPORT_DOC) .define(HEADER_CUSTOM_CONF, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, HEADER_CUSTOM_DOC) - .define(HEADER_INDEX_CONF, ConfigDef.Type.STRING, "splunk.header.index", ConfigDef.Importance.MEDIUM, HEADER_INDEX_DOC) - .define(HEADER_SOURCE_CONF, ConfigDef.Type.STRING, "splunk.header.source", ConfigDef.Importance.MEDIUM, HEADER_SOURCE_DOC) - .define(HEADER_SOURCETYPE_CONF, ConfigDef.Type.STRING, "splunk.header.sourcetype", ConfigDef.Importance.MEDIUM, HEADER_SOURCETYPE_DOC) - .define(HEADER_HOST_CONF, ConfigDef.Type.STRING, "splunk.header.host", ConfigDef.Importance.MEDIUM, HEADER_HOST_DOC) + .define(HEADER_INDEX_CONF, ConfigDef.Type.STRING, HEADER_INDEX_CONF, ConfigDef.Importance.MEDIUM, HEADER_INDEX_DOC) + .define(HEADER_SOURCE_CONF, ConfigDef.Type.STRING, HEADER_SOURCE_CONF, ConfigDef.Importance.MEDIUM, HEADER_SOURCE_DOC) + .define(HEADER_SOURCETYPE_CONF, ConfigDef.Type.STRING, HEADER_SOURCETYPE_CONF, ConfigDef.Importance.MEDIUM, HEADER_SOURCETYPE_DOC) + .define(HEADER_HOST_CONF, ConfigDef.Type.STRING, HEADER_HOST_CONF, ConfigDef.Importance.MEDIUM, HEADER_HOST_DOC) .define(LB_POLL_INTERVAL_CONF, ConfigDef.Type.INT, 120, ConfigDef.Importance.LOW, LB_POLL_INTERVAL_DOC) .define(ENABLE_COMPRESSSION_CONF, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.MEDIUM, ENABLE_COMPRESSSION_DOC) .define(DISABLE_VALIDATION, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.MEDIUM, DISABLE_VALIDATION_DOC) diff --git a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java index c14f70b5..c6ac25f1 100644 --- a/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java +++ b/src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java @@ -224,40 +224,32 @@ public String headerId(SinkRecord sinkRecord) { if(indexHeader != null) { headerString.append(indexHeader.value().toString()); - } else { - if(metas != null) { - headerString.append(metas.get("index")); - } + } else if (metas != null) { + headerString.append(metas.get("index")); } headerString.append(insertHeaderToken()); if(hostHeader != null) { headerString.append(hostHeader.value().toString()); - } else { - if(metas != null) { - headerString.append("default-host"); - } + } else if (metas != null) { + headerString.append("default-host"); } headerString.append(insertHeaderToken()); if(sourceHeader != null) { headerString.append(sourceHeader.value().toString()); - } else { - if(metas != null) { - headerString.append(metas.get("source")); - } + } else if (metas != null) { + headerString.append(metas.get("source")); } headerString.append(insertHeaderToken()); if(sourcetypeHeader != null) { headerString.append(sourcetypeHeader.value().toString()); - } else { - if(metas != null) { - headerString.append(metas.get("sourcetype")); - } + } else if (metas != null) { + headerString.append(metas.get("sourcetype")); } headerString.append(insertHeaderToken()); @@ -441,7 +433,7 @@ private Event createHecEventFrom(final SinkRecord record) { private Event addHeaders(Event event, SinkRecord record) { Headers headers = record.headers(); - if(headers.isEmpty() && connectorConfig.headerCustom.isEmpty()) { + if (headers.isEmpty() && connectorConfig.headerCustom.isEmpty()) { return event; } @@ -469,13 +461,10 @@ private Event addHeaders(Event event, SinkRecord record) { String[] customHeaders = connectorConfig.headerCustom.split(",\\s?"); Map headerMap = new HashMap<>(); for (String header : customHeaders) { - Header customHeader = headers.lastWithName(header); + Header customHeader = headers.lastWithName(header); if (customHeader != null) { - if (customHeader.value() == null) { - headerMap.put(header, null); - } else { - headerMap.put(header, customHeader.value().toString()); - } + String value = customHeader.value() == null ? null : customHeader.value().toString(); + headerMap.put(header, value); } } event.addFields(headerMap); diff --git a/src/test/java/com/splunk/hecclient/PollerMock.java b/src/test/java/com/splunk/hecclient/PollerMock.java index 2d040070..1845c279 100644 --- a/src/test/java/com/splunk/hecclient/PollerMock.java +++ b/src/test/java/com/splunk/hecclient/PollerMock.java @@ -63,7 +63,9 @@ public void add(HecChannel channel, EventBatch batch, String resp) { } @Override - public void stickySessionHandler(HecChannel channel) {} + public void stickySessionHandler(HecChannel channel) { + // Not required for mock + } public boolean isStarted() { return started; diff --git a/target/site/jacoco/jacoco.csv b/target/site/jacoco/jacoco.csv index 64a38123..5c536750 100644 --- a/target/site/jacoco/jacoco.csv +++ b/target/site/jacoco/jacoco.csv @@ -7,7 +7,7 @@ splunk-kafka-connect,com.splunk.hecclient,Hec,38,298,4,14,10,74,4,20,0,15 splunk-kafka-connect,com.splunk.hecclient,ConcurrentHec,23,196,1,13,6,50,1,16,0,10 splunk-kafka-connect,com.splunk.hecclient,HecException,0,9,0,0,0,4,0,2,0,2 splunk-kafka-connect,com.splunk.hecclient,LoadBalancer,48,362,8,24,12,87,7,23,1,13 -splunk-kafka-connect,com.splunk.hecclient,HecAckPoller,122,711,11,45,34,160,9,44,0,25 +splunk-kafka-connect,com.splunk.hecclient,HecAckPoller,122,711,12,44,34,160,10,43,0,25 splunk-kafka-connect,com.splunk.hecclient,HecAckPoller.RunAckQuery,21,26,0,0,3,8,0,2,0,2 splunk-kafka-connect,com.splunk.hecclient,HttpClientBuilder,69,132,0,4,16,46,2,13,2,11 splunk-kafka-connect,com.splunk.hecclient,RawEvent,11,63,0,8,3,17,0,8,0,4 @@ -38,4 +38,4 @@ splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkRecord,74,188,14,14,22,4 splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkConnectorConfig,5,1255,6,72,1,195,6,47,0,14 splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkConnector,57,338,3,25,11,79,4,26,1,15 splunk-kafka-connect,com.splunk.kafka.connect,AbstractClientWrapper,0,3,0,0,0,1,0,1,0,1 -splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkTask,670,858,66,60,135,194,49,47,8,25 +splunk-kafka-connect,com.splunk.kafka.connect,SplunkSinkTask,668,858,66,60,134,194,49,47,8,25 diff --git a/target/site/jacoco/jacoco.xml b/target/site/jacoco/jacoco.xml index f2ff3122..33b69a67 100644 --- a/target/site/jacoco/jacoco.xml +++ b/target/site/jacoco/jacoco.xml @@ -1 +1 @@ - \ No newline at end of file + \ No newline at end of file