Skip to content

Commit e4a764a

Browse files
committed
fix(plugin): include no more more than skip.headers headers
Resolves: GH-36
1 parent 08b5a9d commit e4a764a

File tree

3 files changed

+90
-31
lines changed

3 files changed

+90
-31
lines changed

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/reader/RowFileInputIterator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public RecordsIterable<FileRecord<TypedStruct>> next() {
143143
initializeIfNeeded();
144144
mayWaitForLinesToBeAvailable();
145145
List<FileRecord<TypedStruct>> records = new LinkedList<>();
146-
List<TextBlock> lines = reader.readLines(minNumReadRecords);
146+
List<TextBlock> lines = reader.readLines(minNumReadRecords, false);
147147
if (lines != null) {
148148
for (TextBlock line : lines) {
149149
offsetLines++;
@@ -281,7 +281,7 @@ private void mayReadHeaders() {
281281
LOG.info("Starting to read header lines ({}) from file {}", skipHeaders, fileName);
282282
try (final NonBlockingBufferReader sequentialReader =
283283
new NonBlockingBufferReader(new File(path), charset)) {
284-
headers = sequentialReader.readLines(skipHeaders);
284+
headers = sequentialReader.readLines(skipHeaders, true);
285285
headerStrings = headers
286286
.stream()
287287
.map(TextBlock::data)

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/reader/internal/NonBlockingBufferReader.java

Lines changed: 49 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public class NonBlockingBufferReader implements AutoCloseable {
6464
// Number of bytes read during last iteration.
6565
private int nread = -1;
6666

67-
private boolean autoFlush = true;
67+
private boolean isAutoFlushOnEOF = true;
6868

6969
/**
7070
* Creates a new {@link NonBlockingBufferReader} instance.
@@ -113,35 +113,30 @@ public long position() {
113113
* flush all remaining buffered bytes as a single line when EOF is reached.
114114
*/
115115
public void enableAutoFlush() {
116-
this.autoFlush = true;
116+
this.isAutoFlushOnEOF = true;
117117
}
118118

119119
/**
120120
* Disable auto-flush; Reader will not automatically
121121
* flush all remaining buffered bytes when EOF is reached.
122122
*/
123123
public void disableAutoFlush() {
124-
this.autoFlush = false;
124+
this.isAutoFlushOnEOF = false;
125125
}
126126

127-
public List<TextBlock> readLines(int minRecords) throws IOException {
127+
public List<TextBlock> readLines(final int minRecords, final boolean strict) throws IOException {
128128
// Unfortunately we can't just use readLine() because it blocks in an uninterruptible way.
129129
// Instead we have to manage splitting lines ourselves, using simple backoff when no new value
130130
// is available.
131131
final List<TextBlock> records = new LinkedList<>();
132132
nread = 0;
133+
134+
boolean maxNumRecordsNotReached = true;
133135
while (reader.ready() && (records.isEmpty() || records.size() < minRecords)) {
134136
nread = reader.read(buffer, bufferOffset, buffer.length - bufferOffset);
135137
if (nread > 0) {
136138
bufferOffset += nread;
137-
TextBlock line;
138-
do {
139-
line = tryToExtractLine();
140-
if (line != null) {
141-
records.add(line);
142-
}
143-
} while (line != null);
144-
139+
maxNumRecordsNotReached = fillWithBufferedLinesUntil(records, minRecords, strict);
145140
if (records.isEmpty() && bufferOffset == buffer.length) {
146141
char[] newbuf = new char[buffer.length * 2];
147142
System.arraycopy(buffer, 0, newbuf, 0, buffer.length);
@@ -150,16 +145,52 @@ public List<TextBlock> readLines(int minRecords) throws IOException {
150145
}
151146
}
152147

153-
if (!reader.ready() && remaining() && autoFlush) {
154-
LOG.info("End of file reached - flushing remaining bytes from reader buffer.");
155-
final String line = new String(buffer, 0, bufferOffset);
156-
records.add(new TextBlock(line, charset, offset, offset + bufferOffset, bufferOffset));
157-
offset+=bufferOffset;
158-
bufferOffset = 0;
148+
final boolean isEOF = !reader.ready();
149+
150+
// When strict is true, we may reach end of file and still have valid lines in buffer.
151+
if (isEOF && maxNumRecordsNotReached && strict) {
152+
maxNumRecordsNotReached = fillWithBufferedLinesUntil(records, minRecords, true);
153+
}
154+
155+
// If EOF and maximum number of records is not reached then
156+
// attempt to flush remaining bytes as a single line.
157+
if (isEOF && maxNumRecordsNotReached && remaining()) {
158+
LOG.info("End of file reached - flushing remaining bytes from reader buffer ({}).", isAutoFlushOnEOF);
159+
if (isAutoFlushOnEOF) {
160+
final String line = new String(buffer, 0, bufferOffset);
161+
records.add(new TextBlock(line, charset, offset, offset + bufferOffset, bufferOffset));
162+
offset += bufferOffset;
163+
bufferOffset = 0;
164+
}
159165
}
160166
return records;
161167
}
162168

169+
/**
170+
* Fills the given list of records with the lines already present into the read-buffer.
171+
*
172+
* @param records the list of records to fill.
173+
* @param minRecords the minimum records to try to read from buffer.
174+
* @param strict is no more than {@literal minRecords} lines should be read.
175+
* @return {@code true} if the maximum number of records is reached
176+
* and {@literal string} is {@code true}, otherwise {@code false}.
177+
*/
178+
private boolean fillWithBufferedLinesUntil(final List<TextBlock> records,
179+
int minRecords,
180+
boolean strict) {
181+
boolean maxNumRecordsNotReached;
182+
TextBlock line;
183+
do {
184+
line = tryToExtractLine();
185+
if (line != null) {
186+
records.add(line);
187+
}
188+
// when strict is true we should not try to extract more lines than the minimum requested.
189+
maxNumRecordsNotReached = !strict || records.size() < minRecords;
190+
} while (line != null && maxNumRecordsNotReached);
191+
return maxNumRecordsNotReached;
192+
}
193+
163194
/**
164195
* Checks whether there is still remaining bytes in the internal buffer.
165196
*

connect-file-pulse-plugin/src/test/java/io/streamthoughts/kafka/connect/filepulse/reader/internal/NonBlockingBufferReaderTest.java

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import java.util.ArrayList;
3434
import java.util.List;
3535

36+
import static io.streamthoughts.kafka.connect.filepulse.reader.internal.ReversedInputFileReader.*;
37+
3638
public class NonBlockingBufferReaderTest {
3739

3840
private static final String LF = "\n";
@@ -56,50 +58,75 @@ public void setUp() throws IOException {
5658
@Test
5759
public void shouldReadAllLinesGivenHigherInitialCapacityThanFileSize() throws Exception {
5860
final List<TextBlock> expected = generateLines(writer, NLINES, LF);
59-
NonBlockingBufferReader reader = createReaderWithCapacity(file, ReversedInputFileReader.DEFAULT_INITIAL_CAPACITY);
60-
readAllAndAssert(expected, reader);
61+
NonBlockingBufferReader reader = createReaderWithCapacity(file, DEFAULT_INITIAL_CAPACITY);
62+
readAllAndAssert(expected, reader, false);
6163
}
6264

6365
@Test
6466
public void shouldReadAllLinesGivenHigherInitialCapacityThanFileSizeAndCRLF() throws Exception {
6567
List<TextBlock> expected = generateLines(writer, NLINES, CR + LF);
66-
NonBlockingBufferReader reader = createReaderWithCapacity(file, ReversedInputFileReader.DEFAULT_INITIAL_CAPACITY);
67-
readAllAndAssert(expected, reader);
68+
NonBlockingBufferReader reader = createReaderWithCapacity(file, DEFAULT_INITIAL_CAPACITY);
69+
readAllAndAssert(expected, reader, false);
6870
}
6971

7072
@Test
7173
public void shouldReadAllLinesGivenSmallerInitialCapacityThanFileSize() throws Exception {
7274
List<TextBlock> expected = generateLines(writer, NLINES, LF);
7375
NonBlockingBufferReader reader = createReaderWithCapacity(file, 16);
74-
readAllAndAssert(expected, reader);
76+
readAllAndAssert(expected, reader, false);
7577
}
7678

7779
@Test
7880
public void shouldReadAllLinesGivenSmallerInitialCapacityThanFileSizeCRLF() throws Exception {
7981
List<TextBlock> expected = generateLines(writer, NLINES, CR + LF);
8082
NonBlockingBufferReader reader = createReaderWithCapacity(file, 16);
81-
readAllAndAssert(expected, reader);
83+
readAllAndAssert(expected, reader, false);
8284
}
8385

8486
@Test
8587
public void shouldReadAllLinesGivenSmallerInitialCapacityThanLineSize() throws Exception {
8688
List<TextBlock> expected = generateLines(writer, NLINES, LF);
8789
NonBlockingBufferReader reader = createReaderWithCapacity(file, 4);
88-
readAllAndAssert(expected, reader);
90+
readAllAndAssert(expected, reader, false);
8991
}
9092

9193
@Test
9294
public void shouldReadAllLinesGivenSmallerInitialCapacityThanLineSizeCRLF() throws Exception {
9395
List<TextBlock> expected = generateLines(writer, NLINES, CR + LF );
9496
NonBlockingBufferReader reader = createReaderWithCapacity(file, 4);
95-
readAllAndAssert(expected, reader);
97+
readAllAndAssert(expected, reader, false);
9698
}
9799

98100
@Test
99101
public void shouldReadAllLinesGivenFileNotEndingWithNewLine() throws Exception {
100102
List<TextBlock> expected = generateLines(writer, NLINES, CR + LF, false);
101103
NonBlockingBufferReader reader = createReaderWithCapacity(file, 1024);
102-
readAllAndAssert(expected, reader);
104+
readAllAndAssert(expected, reader, false);
105+
}
106+
107+
@Test
108+
public void shouldAttemptToReadMoreLinesThanMinimumGivenStrictEqualsFalse() throws Exception {
109+
generateLines(writer, NLINES, CR + LF, false);
110+
try(NonBlockingBufferReader reader = createReaderWithCapacity(file, DEFAULT_INITIAL_CAPACITY)) {
111+
List<TextBlock> records = reader.readLines(1, false);
112+
Assert.assertTrue(records.size() > 1);
113+
}
114+
}
115+
116+
@Test
117+
public void shouldNotAttemptToReadMoreLinesThanMinimumGivenStrictEqualsTrue() throws Exception {
118+
generateLines(writer, NLINES, CR + LF, false);
119+
try(NonBlockingBufferReader reader = createReaderWithCapacity(file, DEFAULT_INITIAL_CAPACITY)) {
120+
List<TextBlock> records = reader.readLines(1, true);
121+
Assert.assertEquals(1, records.size());
122+
}
123+
}
124+
125+
@Test
126+
public void shouldReadAllLinesGivenStrictEqualsTrue()throws Exception {
127+
final List<TextBlock> expected = generateLines(writer, NLINES, LF);
128+
NonBlockingBufferReader reader = createReaderWithCapacity(file, DEFAULT_INITIAL_CAPACITY);
129+
readAllAndAssert(expected, reader, true);
103130
}
104131

105132
private static NonBlockingBufferReader createReaderWithCapacity(final File file,
@@ -111,10 +138,11 @@ private static NonBlockingBufferReader createReaderWithCapacity(final File file,
111138
}
112139

113140
private void readAllAndAssert(final List<TextBlock> expected,
114-
final NonBlockingBufferReader reader) throws Exception {
141+
final NonBlockingBufferReader reader,
142+
final boolean strict) throws Exception {
115143
List<TextBlock> results = new ArrayList<>();
116144
while (reader.hasNext()) {
117-
List<TextBlock> l = reader.readLines(1);
145+
List<TextBlock> l = reader.readLines(1, strict);
118146
results.addAll(l);
119147
}
120148
assertResult(expected, results);

0 commit comments

Comments
 (0)