Skip to content

Commit f561c03

Browse files
Merge pull request #319 from splunk/line-breaker_fix
Fix special character problem in line-breaker rule
2 parents 22cdcbb + 03bedfb commit f561c03

File tree

4 files changed

+28
-1
lines changed

4 files changed

+28
-1
lines changed

src/main/java/com/splunk/kafka/connect/SplunkSinkConnectorConfig.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,11 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
252252
useRecordTimestamp = getBoolean(USE_RECORD_TIMESTAMP_CONF);
253253
maxBatchSize = getInt(MAX_BATCH_SIZE_CONF);
254254
numberOfThreads = getInt(HEC_THREDS_CONF);
255-
lineBreaker = getString(LINE_BREAKER_CONF);
255+
if (taskConfig.get(LINE_BREAKER_CONF) != null && taskConfig.get(LINE_BREAKER_CONF).length() == 1) {
256+
lineBreaker = taskConfig.get(LINE_BREAKER_CONF);
257+
} else {
258+
lineBreaker = getString(LINE_BREAKER_CONF);
259+
}
256260
maxOutstandingEvents = getInt(MAX_OUTSTANDING_EVENTS_CONF);
257261
maxRetries = getInt(MAX_RETRIES_CONF);
258262
backoffThresholdSeconds = getInt(HEC_BACKOFF_PRESSURE_THRESHOLD);

src/test/java/com/splunk/kafka/connect/ConfigProfile.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public class ConfigProfile {
3636
private String headerSource;
3737
private String headerSourcetype;
3838
private String headerHost;
39+
private String lineBreaker;
3940

4041
public ConfigProfile() {
4142
this(0);
@@ -88,6 +89,7 @@ public ConfigProfile buildProfileDefault() {
8889
this.trackData = true;
8990
this.maxBatchSize = 1;
9091
this.numOfThreads = 1;
92+
this.lineBreaker = "\n";
9193
return this;
9294
}
9395

@@ -222,6 +224,10 @@ public ConfigProfile buildProfileFour() {
222224
return this;
223225
}
224226

227+
public String getLineBreaker() {
228+
return lineBreaker;
229+
}
230+
225231
public String getTopics() {
226232
return topics;
227233
}

src/test/java/com/splunk/kafka/connect/SplunkSinkConnectorConfigTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,22 @@ public void testEmptyTopicsAndTopicsRegexCombination() {
266266
SplunkSinkConnectorConfig connectorConfig = new SplunkSinkConnectorConfig(config);
267267
}
268268

269+
@Test
270+
public void testSpecialCharLineBreaker() {
271+
UnitUtil uu = new UnitUtil(0);
272+
Map<String, String> config = uu.createTaskConfig();
273+
SplunkSinkConnectorConfig connectorConfig = new SplunkSinkConnectorConfig(config);
274+
Assert.assertEquals("\n", connectorConfig.lineBreaker);
275+
276+
config.put(SplunkSinkConnectorConfig.LINE_BREAKER_CONF, "\r");
277+
connectorConfig = new SplunkSinkConnectorConfig(config);
278+
Assert.assertEquals("\r", connectorConfig.lineBreaker);
279+
280+
config.put(SplunkSinkConnectorConfig.LINE_BREAKER_CONF, "\t");
281+
connectorConfig = new SplunkSinkConnectorConfig(config);
282+
Assert.assertEquals("\t", connectorConfig.lineBreaker);
283+
}
284+
269285
@Test
270286
public void toStr() {
271287
UnitUtil uu = new UnitUtil(0);

src/test/java/com/splunk/kafka/connect/UnitUtil.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public Map<String, String> createTaskConfig() {
5858
config.put(SplunkSinkConnectorConfig.TRACK_DATA_CONF, String.valueOf(configProfile.isTrackData()));
5959
config.put(SplunkSinkConnectorConfig.MAX_BATCH_SIZE_CONF, String.valueOf(configProfile.getMaxBatchSize()));
6060
config.put(SplunkSinkConnectorConfig.HEC_THREDS_CONF, String.valueOf(configProfile.getNumOfThreads()));
61+
config.put(SplunkSinkConnectorConfig.LINE_BREAKER_CONF, configProfile.getLineBreaker());
6162
return config;
6263
}
6364

0 commit comments

Comments
 (0)