Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
<jackson.version>2.14.2</jackson.version>
<kafka.version>3.4.0</kafka.version>
<slf4j.version>2.0.7</slf4j.version>
<sonar.coverage.jacoco.xmlReportPaths>${project.build.directory}/coverage-reports/jacoco-ut/jacoco.xml</sonar.coverage.jacoco.xmlReportPaths>
</properties>

<dependencies>
Expand Down Expand Up @@ -194,6 +195,11 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>0.8.9</version>
</dependency>

</dependencies>

Expand Down Expand Up @@ -321,6 +327,12 @@
</executions>
</plugin>

<plugin>
<groupId>org.sonarsource.scanner.maven</groupId>
<artifactId>sonar-maven-plugin</artifactId>
<version>3.10.0.2594</version>
</plugin>

<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0</version>
Expand Down
2 changes: 2 additions & 0 deletions sonar-project.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ sonar.projectKey=github-mirrors.kafka-connect-splunk
sonar.sources=src/main/java/
sonar.language=java
sonar.java.binaries=.
sonar.exclusions=src/java/test/**,**/examples/**
sonar.java.coveragePlugin=jacoco
1 change: 0 additions & 1 deletion src/main/java/com/splunk/hecclient/ConcurrentHec.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ public final void close() {
}

stopped = true;
// executorService.shutdownNow();
executorService.shutdown();
}

Expand Down
3 changes: 2 additions & 1 deletion src/main/java/com/splunk/hecclient/DoubleSerializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.io.IOException;
import java.math.BigDecimal;
import java.math.RoundingMode;

/*
* Copyright 2017 Splunk, Inc..
Expand All @@ -26,7 +27,7 @@
public class DoubleSerializer extends JsonSerializer<Double> {
@Override
public void serialize(Double value, JsonGenerator jgen, SerializerProvider provider) throws IOException {
String d = new BigDecimal(value).setScale(6, BigDecimal.ROUND_HALF_UP).toPlainString();
String d = BigDecimal.valueOf(value).setScale(6, RoundingMode.HALF_UP).toPlainString();
jgen.writeNumber(d);
}
}
2 changes: 1 addition & 1 deletion src/main/java/com/splunk/hecclient/Hec.java
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ public static SSLContext loadTrustManagerFactory(KeyStore keyStore) {
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(keyStore);

SSLContext sslContext = SSLContext.getInstance("SSL");
SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
sslContext.init(null, tmf.getTrustManagers(), new SecureRandom());

return sslContext;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/splunk/hecclient/HecAckPoller.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void add(HecChannel channel, EventBatch batch, String response) {
return;
}

if (resp.getText() == "Invalid data format") {
if (resp.getText().equals("Invalid data format")) {
log.warn("Invalid Splunk HEC data format. Ignoring events. channel={} index={} events={}", channel, channel.getIndexer(), batch.toString());
batch.commit();
List<EventBatch> committedBatches = new ArrayList<>();
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/com/splunk/hecclient/Indexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,12 @@ final class Indexer implements IndexerInf {
private Poller poller;
private long backPressure;
private long lastBackPressure;
private long backPressureThreshold = 60 * 1000; // 1 min
private long backPressureThreshold = Long.valueOf(60) * 1000; // 1 min

// Indexer doesn't own client, ack poller
public Indexer(String baseUrl,CloseableHttpClient client,Poller poller,HecConfig config) {
this.httpClient = client;
this.baseUrl = baseUrl;
this.hecToken = hecToken;
this.hecConfig = config;
this.hecToken = config.getToken();
this.poller = poller;
Expand Down Expand Up @@ -273,7 +272,7 @@ private String readAndCloseResponse(CloseableHttpResponse resp) {

String respText = (jsonNode.has("text")) ? jsonNode.get("text").asText() : null;

if (respText == "Invalid data format") {
if (respText.equals("Invalid data format")) {
ObjectNode objNode = jsonMapper.createObjectNode();
objNode.put("text", "Invalid data format");
objNode.put("code", 0); // Mark it as success
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/com/splunk/hecclient/JsonEventBatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import org.apache.commons.lang3.builder.HashCodeBuilder;

public final class JsonEventBatch extends EventBatch {
public static final String endpoint = "/services/collector/event";
public static final String contentType = "application/json; profile=urn:splunk:event:1.0; charset=utf-8";
public static final String ENDPOINT = "/services/collector/event";
public static final String CONTENT_TYPE = "application/json; profile=urn:splunk:event:1.0; charset=utf-8";

@Override
public void add(Event event) {
Expand All @@ -33,12 +33,12 @@ public void add(Event event) {

@Override
public final String getRestEndpoint() {
return endpoint;
return ENDPOINT;
}

@Override
public String getContentType() {
return contentType;
return CONTENT_TYPE;
}

@Override
Expand All @@ -49,15 +49,15 @@ public EventBatch createFromThis() {
@Override
public int hashCode() {
return new HashCodeBuilder()
.append(endpoint)
.append(ENDPOINT)
.toHashCode();
}

@Override
public boolean equals(Object obj) {
if (obj instanceof JsonEventBatch) {
final JsonEventBatch other = (JsonEventBatch) obj;
return endpoint.equals(endpoint);
return obj.equals(other);
}
return false;
}
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/com/splunk/hecclient/RawEventBatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import org.apache.http.client.utils.URIBuilder;

public final class RawEventBatch extends EventBatch {
public static final String endpoint = "/services/collector/raw";
public static final String contentType = "text/plain; profile=urn:splunk:event:1.0; charset=utf-8";
public static final String ENDPOINT = "/services/collector/raw";
public static final String CONTENT_TYPE = "text/plain; profile=urn:splunk:event:1.0; charset=utf-8";

private String index;
private String source;
Expand Down Expand Up @@ -111,12 +111,12 @@ public void add(Event event) throws HecException {

@Override
public final String getRestEndpoint() {
return endpoint + getMetadataParams();
return ENDPOINT + getMetadataParams();
}

@Override
public String getContentType() {
return contentType;
return CONTENT_TYPE;
}

@Override
Expand Down
11 changes: 5 additions & 6 deletions src/main/java/com/splunk/kafka/connect/SplunkSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

public final class SplunkSinkTask extends SinkTask implements PollerCallback {
private static final Logger log = LoggerFactory.getLogger(SplunkSinkTask.class);
private static long flushWindow = 30 * 1000; // 30 seconds
private static long flushWindow = Long.valueOf(30) * 1000; // 30 seconds
private static final String HEADERTOKEN = "$$$";

private HecInf hec;
Expand Down Expand Up @@ -71,7 +71,7 @@ public void start(Map<String, String> taskConfig) {
tracker = new KafkaRecordTracker();
bufferedRecords = new ArrayList<>();
if(connectorConfig.flushWindow > 0) {
flushWindow = connectorConfig.flushWindow * 1000; // Flush window set to user configured value (Multiply by 1000 as all the calculations are done in milliseconds)
flushWindow = connectorConfig.flushWindow * Long.valueOf(1000); // Flush window set to user configured value (Multiply by 1000 as all the calculations are done in milliseconds)
}

log.info("kafka-connect-splunk task starts with config={}", connectorConfig);
Expand Down Expand Up @@ -569,10 +569,9 @@ private void timestampExtraction(Event event) {

if (connectorConfig.timestampFormat.equalsIgnoreCase("epoch")) {
try {
double epoch;
epoch = ((Double.parseDouble(timestamp)));
long long_epoch = (new Double(epoch)).longValue();
event.setTime(epoch / (Math.pow(10, Long.toString(long_epoch).length()-10)));
double epoch = (Double.parseDouble(timestamp));
long long_epoch = Double.valueOf(epoch).longValue();
event.setTime(epoch / (Math.pow(10, Long.toString(long_epoch).length()-10.00)));

} catch (Exception e) {
log.warn("Could not set the time", e);
Expand Down
28 changes: 14 additions & 14 deletions src/test/java/com/splunk/hecclient/CloseableHttpClientMock.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,36 +27,36 @@

@SuppressWarnings( "deprecation")
public class CloseableHttpClientMock extends CloseableHttpClient {
public static final String success = "{\"text\":\"Success\",\"code\":0,\"ackId\":2}";
public static final String serverBusy = "{\"text\":\"Server busy\",\"code\":1}";
public static final String noDataError = "{\"text\":\"No data\",\"code\":5}";
public static final String invalidDataFormat = "{\"text\":\"Invalid data format\",\"code\":6}";
public static final String inValidToken = "{\"text\":\"Invalid token\",\"code\":4}";
public static final String inValidIndex = "{\"text\":\"Incorrect index\",\"code\":4,\"invalid-event-number\":1}";
public static final String exception = "excpetion";
public static final String SUCCESS = "{\"text\":\"Success\",\"code\":0,\"ackId\":2}";
public static final String SERVER_BUSY = "{\"text\":\"Server busy\",\"code\":1}";
public static final String NO_DATA_ERROR = "{\"text\":\"No data\",\"code\":5}";
public static final String INVALID_DATA_FORMAT = "{\"text\":\"Invalid data format\",\"code\":6}";
public static final String INVALID_TOKEN = "{\"text\":\"Invalid token\",\"code\":4}";
public static final String INVALID_INDEX = "{\"text\":\"Incorrect index\",\"code\":4,\"invalid-event-number\":1}";
public static final String EXCEPTION = "excpetion";

private String resp = "";
private boolean throwOnClose = false;
private boolean throwOnGetContent = false;

protected CloseableHttpResponse doExecute(HttpHost target, HttpRequest request,
HttpContext context) throws IOException {
if (resp == exception) {
if (resp == EXCEPTION) {
throw new IOException("mocked up");
}

if (resp.equals(success)) {
if (resp.equals(SUCCESS)) {
return createResponse(resp, 200);
} else if (resp.equals(serverBusy)) {
} else if (resp.equals(SERVER_BUSY)) {
return createResponse(resp, 503);
} else if (resp.equals(noDataError)) {
} else if (resp.equals(NO_DATA_ERROR)) {
return createResponse(resp, 400);
}else if (resp.equals(inValidToken)) {
}else if (resp.equals(INVALID_TOKEN)) {
return createResponse(resp, 400);
}else if (resp.equals(inValidIndex)) {
}else if (resp.equals(INVALID_INDEX)) {
return createResponse(resp, 400);
} else {
return createResponse(success, 201);
return createResponse(SUCCESS, 201);
}
}

Expand Down
26 changes: 13 additions & 13 deletions src/test/java/com/splunk/hecclient/IndexerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void sendWithSuccess() {
for (int i = 0; i < 2; i++) {
CloseableHttpClientMock client = new CloseableHttpClientMock();
if (i == 0) {
client.setResponse(CloseableHttpClientMock.success);
client.setResponse(CloseableHttpClientMock.SUCCESS);
}
PollerMock poller = new PollerMock();

Expand All @@ -112,14 +112,14 @@ public void sendWithSuccess() {
Assert.assertNull(poller.getFailedBatch());
Assert.assertNull(poller.getException());
Assert.assertEquals(indexer.getChannel(), poller.getChannel());
Assert.assertEquals(CloseableHttpClientMock.success, poller.getResponse());
Assert.assertEquals(CloseableHttpClientMock.SUCCESS, poller.getResponse());
}
}

@Test
public void sendWithInvalidData() {
CloseableHttpClientMock client = new CloseableHttpClientMock();
client.setResponse(CloseableHttpClientMock.invalidDataFormat);
client.setResponse(CloseableHttpClientMock.INVALID_DATA_FORMAT);
PollerMock poller = new PollerMock();

Indexer indexer = new Indexer(baseUrl, client, poller, hecConfig);
Expand All @@ -130,14 +130,14 @@ public void sendWithInvalidData() {
Assert.assertNull(poller.getFailedBatch());
Assert.assertNull(poller.getException());
Assert.assertEquals(indexer.getChannel(), poller.getChannel());
Assert.assertEquals(CloseableHttpClientMock.success, poller.getResponse());
Assert.assertEquals(CloseableHttpClientMock.SUCCESS, poller.getResponse());
}


@Test
public void sendWithServerBusy() {
CloseableHttpClientMock client = new CloseableHttpClientMock();
client.setResponse(CloseableHttpClientMock.serverBusy);
client.setResponse(CloseableHttpClientMock.SERVER_BUSY);

Indexer indexer = assertFailure(client);
Assert.assertTrue(indexer.hasBackPressure());
Expand All @@ -151,7 +151,7 @@ public void sendWithServerBusy() {
@Test
public void ConfirmShortBackPressureConfig() {
CloseableHttpClientMock client = new CloseableHttpClientMock();
client.setResponse(CloseableHttpClientMock.serverBusy);
client.setResponse(CloseableHttpClientMock.SERVER_BUSY);

Indexer indexer = assertFailure(client);
Assert.assertTrue(indexer.hasBackPressure());
Expand All @@ -165,22 +165,22 @@ public void ConfirmShortBackPressureConfig() {
@Test
public void sendWithIOError() {
CloseableHttpClientMock client = new CloseableHttpClientMock();
client.setResponse(CloseableHttpClientMock.exception);
client.setResponse(CloseableHttpClientMock.EXCEPTION);
assertFailure(client);
}

@Test
public void sendWithCloseError() {
CloseableHttpClientMock client = new CloseableHttpClientMock();
client.setResponse(CloseableHttpClientMock.success);
client.setResponse(CloseableHttpClientMock.SUCCESS);
client.setThrowOnClose(true);
assertFailure(client);
}

@Test
public void sendWithReadError() {
CloseableHttpClientMock client = new CloseableHttpClientMock();
client.setResponse(CloseableHttpClientMock.success);
client.setResponse(CloseableHttpClientMock.SUCCESS);
client.setThrowOnGetContent(true);
assertFailure(client);
}
Expand All @@ -204,7 +204,7 @@ public void sendCompressedBatchWithSuccess() {
for (int i = 0; i < 2; i++) {
CloseableHttpClientMock client = new CloseableHttpClientMock();
if (i == 0) {
client.setResponse(CloseableHttpClientMock.success);
client.setResponse(CloseableHttpClientMock.SUCCESS);
}
PollerMock poller = new PollerMock();

Expand All @@ -217,7 +217,7 @@ public void sendCompressedBatchWithSuccess() {
Assert.assertNull(poller.getFailedBatch());
Assert.assertNull(poller.getException());
Assert.assertEquals(indexer.getChannel(), poller.getChannel());
Assert.assertEquals(CloseableHttpClientMock.success, poller.getResponse());
Assert.assertEquals(CloseableHttpClientMock.SUCCESS, poller.getResponse());
}
}

Expand All @@ -226,7 +226,7 @@ public void sendCompressedRawBatchWithSuccess() {
for (int i = 0; i < 2; i++) {
CloseableHttpClientMock client = new CloseableHttpClientMock();
if (i == 0) {
client.setResponse(CloseableHttpClientMock.success);
client.setResponse(CloseableHttpClientMock.SUCCESS);
}
PollerMock poller = new PollerMock();

Expand All @@ -239,7 +239,7 @@ public void sendCompressedRawBatchWithSuccess() {
Assert.assertNull(poller.getFailedBatch());
Assert.assertNull(poller.getException());
Assert.assertEquals(indexer.getChannel(), poller.getChannel());
Assert.assertEquals(CloseableHttpClientMock.success, poller.getResponse());
Assert.assertEquals(CloseableHttpClientMock.SUCCESS, poller.getResponse());
}
}
}
4 changes: 2 additions & 2 deletions src/test/java/com/splunk/hecclient/JsonEvenBatchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ public void addWithFailure() {
@Test
public void getRestEndpoint() {
EventBatch batch = new JsonEventBatch();
Assert.assertEquals(batch.getRestEndpoint(), JsonEventBatch.endpoint);
Assert.assertEquals(batch.getRestEndpoint(), JsonEventBatch.ENDPOINT);
}

@Test
public void getContentType() {
EventBatch batch = new JsonEventBatch();
Assert.assertEquals(batch.getContentType(), JsonEventBatch.contentType);
Assert.assertEquals(batch.getContentType(), JsonEventBatch.CONTENT_TYPE);
}

@Test
Expand Down
Loading