Skip to content

Commit bda2d58

Browse files
author
David Roberts
authored
[ML] Add a limit on line merging in find_file_structure (#42501)
When analysing a semi-structured text file the find_file_structure endpoint merges lines to form multi-line messages using the assumption that the first line in each message contains the timestamp. However, if the timestamp is misdetected then this can lead to excessive numbers of lines being merged to form massive messages. This commit adds a line_merge_size_limit setting (default 10000 characters) that halts the analysis if a message bigger than this is created. This prevents significant CPU time being spent subsequently trying to determine the internal structure of the huge bogus messages.
1 parent 481b359 commit bda2d58

File tree

21 files changed

+187
-61
lines changed

21 files changed

+187
-61
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/ml/FindFileStructureRequest.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
public class FindFileStructureRequest implements Validatable, ToXContentFragment {
3838

3939
public static final ParseField LINES_TO_SAMPLE = new ParseField("lines_to_sample");
40+
public static final ParseField LINE_MERGE_SIZE_LIMIT = new ParseField("line_merge_size_limit");
4041
public static final ParseField TIMEOUT = new ParseField("timeout");
4142
public static final ParseField CHARSET = FileStructure.CHARSET;
4243
public static final ParseField FORMAT = FileStructure.FORMAT;
@@ -52,6 +53,7 @@ public class FindFileStructureRequest implements Validatable, ToXContentFragment
5253
public static final ParseField EXPLAIN = new ParseField("explain");
5354

5455
private Integer linesToSample;
56+
private Integer lineMergeSizeLimit;
5557
private TimeValue timeout;
5658
private String charset;
5759
private FileStructure.Format format;
@@ -77,6 +79,14 @@ public void setLinesToSample(Integer linesToSample) {
7779
this.linesToSample = linesToSample;
7880
}
7981

82+
public Integer getLineMergeSizeLimit() {
83+
return lineMergeSizeLimit;
84+
}
85+
86+
public void setLineMergeSizeLimit(Integer lineMergeSizeLimit) {
87+
this.lineMergeSizeLimit = lineMergeSizeLimit;
88+
}
89+
8090
public TimeValue getTimeout() {
8191
return timeout;
8292
}
@@ -228,6 +238,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
228238
if (linesToSample != null) {
229239
builder.field(LINES_TO_SAMPLE.getPreferredName(), linesToSample);
230240
}
241+
if (lineMergeSizeLimit != null) {
242+
builder.field(LINE_MERGE_SIZE_LIMIT.getPreferredName(), lineMergeSizeLimit);
243+
}
231244
if (timeout != null) {
232245
builder.field(TIMEOUT.getPreferredName(), timeout);
233246
}
@@ -270,8 +283,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
270283

271284
@Override
272285
public int hashCode() {
273-
return Objects.hash(linesToSample, timeout, charset, format, columnNames, hasHeaderRow, delimiter, grokPattern, timestampFormat,
274-
timestampField, explain, sample);
286+
return Objects.hash(linesToSample, lineMergeSizeLimit, timeout, charset, format, columnNames, hasHeaderRow, delimiter, grokPattern,
287+
timestampFormat, timestampField, explain, sample);
275288
}
276289

277290
@Override
@@ -287,6 +300,7 @@ public boolean equals(Object other) {
287300

288301
FindFileStructureRequest that = (FindFileStructureRequest) other;
289302
return Objects.equals(this.linesToSample, that.linesToSample) &&
303+
Objects.equals(this.lineMergeSizeLimit, that.lineMergeSizeLimit) &&
290304
Objects.equals(this.timeout, that.timeout) &&
291305
Objects.equals(this.charset, that.charset) &&
292306
Objects.equals(this.format, that.format) &&

client/rest-high-level/src/test/java/org/elasticsearch/client/ml/FindFileStructureRequestTests.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public class FindFileStructureRequestTests extends AbstractXContentTestCase<Find
3535

3636
static {
3737
PARSER.declareInt(FindFileStructureRequest::setLinesToSample, FindFileStructureRequest.LINES_TO_SAMPLE);
38+
PARSER.declareInt(FindFileStructureRequest::setLineMergeSizeLimit, FindFileStructureRequest.LINE_MERGE_SIZE_LIMIT);
3839
PARSER.declareString((p, c) -> p.setTimeout(TimeValue.parseTimeValue(c, FindFileStructureRequest.TIMEOUT.getPreferredName())),
3940
FindFileStructureRequest.TIMEOUT);
4041
PARSER.declareString(FindFileStructureRequest::setCharset, FindFileStructureRequest.CHARSET);
@@ -72,6 +73,9 @@ public static FindFileStructureRequest createTestRequestWithoutSample() {
7273
if (randomBoolean()) {
7374
findFileStructureRequest.setLinesToSample(randomIntBetween(1000, 2000));
7475
}
76+
if (randomBoolean()) {
77+
findFileStructureRequest.setLineMergeSizeLimit(randomIntBetween(10000, 20000));
78+
}
7579
if (randomBoolean()) {
7680
findFileStructureRequest.setTimeout(TimeValue.timeValueSeconds(randomIntBetween(10, 20)));
7781
}

docs/reference/ml/apis/find-file-structure.asciidoc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,13 @@ chosen.
9292
parameter is not specified, the structure finder guesses based on the similarity of
9393
the first row of the file to other rows.
9494

95+
`line_merge_size_limit`::
96+
(unsigned integer) The maximum number of characters in a message when lines are
97+
merged to form messages while analyzing semi-structured files. The default
98+
is 10000. If you have extremely long messages you may need to increase this, but
99+
be aware that this may lead to very long processing times if the way to group
100+
lines into messages is misdetected.
101+
95102
`lines_to_sample`::
96103
(unsigned integer) The number of lines to include in the structural analysis,
97104
starting from the beginning of the file. The minimum is 2; the default

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FindFileStructureAction.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66
package org.elasticsearch.xpack.core.ml.action;
77

8+
import org.elasticsearch.Version;
89
import org.elasticsearch.action.Action;
910
import org.elasticsearch.action.ActionRequest;
1011
import org.elasticsearch.action.ActionRequestBuilder;
@@ -113,6 +114,7 @@ public boolean equals(Object other) {
113114
public static class Request extends ActionRequest {
114115

115116
public static final ParseField LINES_TO_SAMPLE = new ParseField("lines_to_sample");
117+
public static final ParseField LINE_MERGE_SIZE_LIMIT = new ParseField("line_merge_size_limit");
116118
public static final ParseField TIMEOUT = new ParseField("timeout");
117119
public static final ParseField CHARSET = FileStructure.CHARSET;
118120
public static final ParseField FORMAT = FileStructure.FORMAT;
@@ -130,6 +132,7 @@ public static class Request extends ActionRequest {
130132
"[%s] may only be specified if [" + FORMAT.getPreferredName() + "] is [%s]";
131133

132134
private Integer linesToSample;
135+
private Integer lineMergeSizeLimit;
133136
private TimeValue timeout;
134137
private String charset;
135138
private FileStructure.Format format;
@@ -154,6 +157,14 @@ public void setLinesToSample(Integer linesToSample) {
154157
this.linesToSample = linesToSample;
155158
}
156159

160+
public Integer getLineMergeSizeLimit() {
161+
return lineMergeSizeLimit;
162+
}
163+
164+
public void setLineMergeSizeLimit(Integer lineMergeSizeLimit) {
165+
this.lineMergeSizeLimit = lineMergeSizeLimit;
166+
}
167+
157168
public TimeValue getTimeout() {
158169
return timeout;
159170
}
@@ -291,6 +302,10 @@ public ActionRequestValidationException validate() {
291302
validationException =
292303
addValidationError("[" + LINES_TO_SAMPLE.getPreferredName() + "] must be positive if specified", validationException);
293304
}
305+
if (lineMergeSizeLimit != null && lineMergeSizeLimit <= 0) {
306+
validationException = addValidationError("[" + LINE_MERGE_SIZE_LIMIT.getPreferredName() + "] must be positive if specified",
307+
validationException);
308+
}
294309
if (format != FileStructure.Format.DELIMITED) {
295310
if (columnNames != null) {
296311
validationException = addIncompatibleArgError(COLUMN_NAMES, FileStructure.Format.DELIMITED, validationException);
@@ -324,6 +339,9 @@ public ActionRequestValidationException validate() {
324339
public void readFrom(StreamInput in) throws IOException {
325340
super.readFrom(in);
326341
linesToSample = in.readOptionalVInt();
342+
if (in.getVersion().onOrAfter(Version.CURRENT)) {
343+
lineMergeSizeLimit = in.readOptionalVInt();
344+
}
327345
timeout = in.readOptionalTimeValue();
328346
charset = in.readOptionalString();
329347
format = in.readBoolean() ? in.readEnum(FileStructure.Format.class) : null;
@@ -342,6 +360,9 @@ public void readFrom(StreamInput in) throws IOException {
342360
public void writeTo(StreamOutput out) throws IOException {
343361
super.writeTo(out);
344362
out.writeOptionalVInt(linesToSample);
363+
if (out.getVersion().onOrAfter(Version.CURRENT)) {
364+
out.writeOptionalVInt(lineMergeSizeLimit);
365+
}
345366
out.writeOptionalTimeValue(timeout);
346367
out.writeOptionalString(charset);
347368
if (format == null) {
@@ -378,8 +399,8 @@ public void writeTo(StreamOutput out) throws IOException {
378399

379400
@Override
380401
public int hashCode() {
381-
return Objects.hash(linesToSample, timeout, charset, format, columnNames, hasHeaderRow, delimiter, grokPattern, timestampFormat,
382-
timestampField, sample);
402+
return Objects.hash(linesToSample, lineMergeSizeLimit, timeout, charset, format, columnNames, hasHeaderRow, delimiter,
403+
grokPattern, timestampFormat, timestampField, sample);
383404
}
384405

385406
@Override
@@ -395,6 +416,7 @@ public boolean equals(Object other) {
395416

396417
Request that = (Request) other;
397418
return Objects.equals(this.linesToSample, that.linesToSample) &&
419+
Objects.equals(this.lineMergeSizeLimit, that.lineMergeSizeLimit) &&
398420
Objects.equals(this.timeout, that.timeout) &&
399421
Objects.equals(this.charset, that.charset) &&
400422
Objects.equals(this.format, that.format) &&

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/FindFileStructureActionRequestTests.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ protected FindFileStructureAction.Request createTestInstance() {
2626
request.setLinesToSample(randomIntBetween(10, 2000));
2727
}
2828

29+
if (randomBoolean()) {
30+
request.setLineMergeSizeLimit(randomIntBetween(1000, 20000));
31+
}
32+
2933
if (randomBoolean()) {
3034
request.setCharset(randomAlphaOfLength(10));
3135
}
@@ -85,6 +89,18 @@ public void testValidateLinesToSample() {
8589
assertThat(e.getMessage(), containsString(" [lines_to_sample] must be positive if specified"));
8690
}
8791

92+
public void testValidateLineMergeSizeLimit() {
93+
94+
FindFileStructureAction.Request request = new FindFileStructureAction.Request();
95+
request.setLineMergeSizeLimit(randomIntBetween(-1, 0));
96+
request.setSample(new BytesArray("foo\n"));
97+
98+
ActionRequestValidationException e = request.validate();
99+
assertNotNull(e);
100+
assertThat(e.getMessage(), startsWith("Validation Failed: "));
101+
assertThat(e.getMessage(), containsString(" [line_merge_size_limit] must be positive if specified"));
102+
}
103+
88104
public void testValidateNonDelimited() {
89105

90106
FindFileStructureAction.Request request = new FindFileStructureAction.Request();

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFindFileStructureAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ private FindFileStructureAction.Response buildFileStructureResponse(FindFileStru
4949
FileStructureFinderManager structureFinderManager = new FileStructureFinderManager(threadPool.scheduler());
5050

5151
FileStructureFinder fileStructureFinder = structureFinderManager.findFileStructure(request.getLinesToSample(),
52-
request.getSample().streamInput(), new FileStructureOverrides(request), request.getTimeout());
52+
request.getLineMergeSizeLimit(), request.getSample().streamInput(), new FileStructureOverrides(request), request.getTimeout());
5353

5454
return new FindFileStructureAction.Response(fileStructureFinder.getStructure());
5555
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/DelimitedFileStructureFinderFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ public boolean canCreateFromSample(List<String> explanation, String sample) {
6262

6363
@Override
6464
public FileStructureFinder createFromSample(List<String> explanation, String sample, String charsetName, Boolean hasByteOrderMarker,
65-
FileStructureOverrides overrides, TimeoutChecker timeoutChecker) throws IOException {
65+
int lineMergeSizeLimit, FileStructureOverrides overrides, TimeoutChecker timeoutChecker)
66+
throws IOException {
6667
return DelimitedFileStructureFinder.makeDelimitedFileStructureFinder(explanation, sample, charsetName, hasByteOrderMarker,
6768
csvPreference, trimFields, overrides, timeoutChecker);
6869
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/FileStructureFinderFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,14 @@ public interface FileStructureFinderFactory {
3737
* @param sample A sample from the file to be ingested.
3838
* @param charsetName The name of the character set in which the sample was provided.
3939
* @param hasByteOrderMarker Did the sample have a byte order marker? <code>null</code> means "not relevant".
40+
* @param lineMergeSizeLimit Maximum number of characters permitted when lines are merged to create messages.
4041
* @param overrides Stores structure decisions that have been made by the end user, and should
4142
* take precedence over anything the {@link FileStructureFinder} may decide.
4243
* @param timeoutChecker Will abort the operation if its timeout is exceeded.
4344
* @return A {@link FileStructureFinder} object suitable for determining the structure of the supplied sample.
4445
* @throws Exception if something goes wrong during creation.
4546
*/
4647
FileStructureFinder createFromSample(List<String> explanation, String sample, String charsetName, Boolean hasByteOrderMarker,
47-
FileStructureOverrides overrides, TimeoutChecker timeoutChecker) throws Exception;
48+
int lineMergeSizeLimit, FileStructureOverrides overrides,
49+
TimeoutChecker timeoutChecker) throws Exception;
4850
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/FileStructureFinderManager.java

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public final class FileStructureFinderManager {
4242

4343
public static final int MIN_SAMPLE_LINE_COUNT = 2;
4444
public static final int DEFAULT_IDEAL_SAMPLE_LINE_COUNT = 1000;
45+
public static final int DEFAULT_LINE_MERGE_SIZE_LIMIT = 10000;
4546

4647
static final Set<String> FILEBEAT_SUPPORTED_ENCODINGS = Set.of(
4748
"866",
@@ -294,8 +295,9 @@ public FileStructureFinderManager(ScheduledExecutorService scheduler) {
294295
this.scheduler = Objects.requireNonNull(scheduler);
295296
}
296297

297-
public FileStructureFinder findFileStructure(Integer idealSampleLineCount, InputStream fromFile) throws Exception {
298-
return findFileStructure(idealSampleLineCount, fromFile, FileStructureOverrides.EMPTY_OVERRIDES, null);
298+
public FileStructureFinder findFileStructure(Integer idealSampleLineCount, Integer lineMergeSizeLimit,
299+
InputStream fromFile) throws Exception {
300+
return findFileStructure(idealSampleLineCount, lineMergeSizeLimit, fromFile, FileStructureOverrides.EMPTY_OVERRIDES, null);
299301
}
300302

301303
/**
@@ -304,6 +306,8 @@ public FileStructureFinder findFileStructure(Integer idealSampleLineCount, Input
304306
* If the stream has fewer lines then an attempt will still be made, providing at
305307
* least {@link #MIN_SAMPLE_LINE_COUNT} lines can be read. If <code>null</code>
306308
* the value of {@link #DEFAULT_IDEAL_SAMPLE_LINE_COUNT} will be used.
309+
* @param lineMergeSizeLimit Maximum number of characters permitted when lines are merged to create messages.
310+
* If <code>null</code> the value of {@link #DEFAULT_LINE_MERGE_SIZE_LIMIT} will be used.
307311
* @param fromFile A stream from which the sample will be read.
308312
* @param overrides Aspects of the file structure that are known in advance. These take precedence over
309313
* values determined by structure analysis. An exception will be thrown if the file structure
@@ -314,20 +318,21 @@ public FileStructureFinder findFileStructure(Integer idealSampleLineCount, Input
314318
* @return A {@link FileStructureFinder} object from which the structure and messages can be queried.
315319
* @throws Exception A variety of problems could occur at various stages of the structure finding process.
316320
*/
317-
public FileStructureFinder findFileStructure(Integer idealSampleLineCount, InputStream fromFile, FileStructureOverrides overrides,
318-
TimeValue timeout)
319-
throws Exception {
321+
public FileStructureFinder findFileStructure(Integer idealSampleLineCount, Integer lineMergeSizeLimit, InputStream fromFile,
322+
FileStructureOverrides overrides, TimeValue timeout) throws Exception {
320323
return findFileStructure(new ArrayList<>(), (idealSampleLineCount == null) ? DEFAULT_IDEAL_SAMPLE_LINE_COUNT : idealSampleLineCount,
321-
fromFile, overrides, timeout);
324+
(lineMergeSizeLimit == null) ? DEFAULT_LINE_MERGE_SIZE_LIMIT : lineMergeSizeLimit, fromFile, overrides, timeout);
322325
}
323326

324-
public FileStructureFinder findFileStructure(List<String> explanation, int idealSampleLineCount, InputStream fromFile)
325-
throws Exception {
326-
return findFileStructure(explanation, idealSampleLineCount, fromFile, FileStructureOverrides.EMPTY_OVERRIDES, null);
327+
public FileStructureFinder findFileStructure(List<String> explanation, int idealSampleLineCount, int lineMergeSizeLimit,
328+
InputStream fromFile) throws Exception {
329+
return findFileStructure(explanation, idealSampleLineCount, lineMergeSizeLimit, fromFile, FileStructureOverrides.EMPTY_OVERRIDES,
330+
null);
327331
}
328332

329-
public FileStructureFinder findFileStructure(List<String> explanation, int idealSampleLineCount, InputStream fromFile,
330-
FileStructureOverrides overrides, TimeValue timeout) throws Exception {
333+
public FileStructureFinder findFileStructure(List<String> explanation, int idealSampleLineCount, int lineMergeSizeLimit,
334+
InputStream fromFile, FileStructureOverrides overrides,
335+
TimeValue timeout) throws Exception {
331336

332337
try (TimeoutChecker timeoutChecker = new TimeoutChecker("structure analysis", timeout, scheduler)) {
333338

@@ -346,7 +351,8 @@ public FileStructureFinder findFileStructure(List<String> explanation, int ideal
346351
Tuple<String, Boolean> sampleInfo = sampleFile(sampleReader, charsetName, MIN_SAMPLE_LINE_COUNT,
347352
Math.max(MIN_SAMPLE_LINE_COUNT, idealSampleLineCount), timeoutChecker);
348353

349-
return makeBestStructureFinder(explanation, sampleInfo.v1(), charsetName, sampleInfo.v2(), overrides, timeoutChecker);
354+
return makeBestStructureFinder(explanation, sampleInfo.v1(), charsetName, sampleInfo.v2(), lineMergeSizeLimit, overrides,
355+
timeoutChecker);
350356
} catch (Exception e) {
351357
// Add a dummy exception containing the explanation so far - this can be invaluable for troubleshooting as incorrect
352358
// decisions made early on in the structure analysis can result in seemingly crazy decisions or timeouts later on
@@ -461,7 +467,8 @@ CharsetMatch findCharset(List<String> explanation, InputStream inputStream, Time
461467
}
462468

463469
FileStructureFinder makeBestStructureFinder(List<String> explanation, String sample, String charsetName, Boolean hasByteOrderMarker,
464-
FileStructureOverrides overrides, TimeoutChecker timeoutChecker) throws Exception {
470+
int lineMergeSizeLimit, FileStructureOverrides overrides,
471+
TimeoutChecker timeoutChecker) throws Exception {
465472

466473
Character delimiter = overrides.getDelimiter();
467474
Character quote = overrides.getQuote();
@@ -493,7 +500,8 @@ FileStructureFinder makeBestStructureFinder(List<String> explanation, String sam
493500
for (FileStructureFinderFactory factory : factories) {
494501
timeoutChecker.check("high level format detection");
495502
if (factory.canCreateFromSample(explanation, sample)) {
496-
return factory.createFromSample(explanation, sample, charsetName, hasByteOrderMarker, overrides, timeoutChecker);
503+
return factory.createFromSample(explanation, sample, charsetName, hasByteOrderMarker, lineMergeSizeLimit, overrides,
504+
timeoutChecker);
497505
}
498506
}
499507

0 commit comments

Comments
 (0)