Skip to content

Commit a5a2e4e

Browse files
author
David Roberts
authored
[ML] Use CSV ingest processor in find_file_structure ingest pipeline (#51492)
Changes the find_file_structure response to include a CSV ingest processor in the ingest pipeline it suggests. Previously the Kibana file upload functionality parsed CSV in the browser, but by parsing CSV in the ingest pipeline it makes the Kibana file upload functionality more easily interchangable with Filebeat such that the configurations it creates can more easily be used to import data with the same structure repeatedly in production.
1 parent 7f37ac8 commit a5a2e4e

File tree

8 files changed

+403
-41
lines changed

8 files changed

+403
-41
lines changed

docs/reference/ml/anomaly-detection/apis/find-file-structure.asciidoc

Lines changed: 120 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,8 @@ to request analysis of 100000 lines to achieve some variety.
145145
value is `true`. Otherwise, the default value is `false`.
146146

147147
`timeout`::
148-
(Optional, <<time-units,time units>>) Sets the maximum amount of time that the
149-
structure analysis make take. If the analysis is still running when the
148+
(Optional, <<time-units,time units>>) Sets the maximum amount of time that the
149+
structure analysis make take. If the analysis is still running when the
150150
timeout expires then it will be aborted. The default value is 25 seconds.
151151

152152
`timestamp_field`::
@@ -163,8 +163,8 @@ also specified.
163163
For structured file formats, if you specify this parameter, the field must exist
164164
within the file.
165165

166-
If this parameter is not specified, the structure finder makes a decision about
167-
which field (if any) is the primary timestamp field. For structured file
166+
If this parameter is not specified, the structure finder makes a decision about
167+
which field (if any) is the primary timestamp field. For structured file
168168
formats, it is not compulsory to have a timestamp in the file.
169169
--
170170

@@ -213,14 +213,14 @@ format from a built-in set.
213213
The following table provides the appropriate `timeformat` values for some example timestamps:
214214

215215
|===
216-
| Timeformat | Presentation
216+
| Timeformat | Presentation
217217

218218
| yyyy-MM-dd HH:mm:ssZ | 2019-04-20 13:15:22+0000
219-
| EEE, d MMM yyyy HH:mm:ss Z | Sat, 20 Apr 2019 13:15:22 +0000
219+
| EEE, d MMM yyyy HH:mm:ss Z | Sat, 20 Apr 2019 13:15:22 +0000
220220
| dd.MM.yy HH:mm:ss.SSS | 20.04.19 13:15:22.285
221221
|===
222222

223-
See
223+
See
224224
https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html[the Java date/time format documentation]
225225
for more information about date and time format syntax.
226226

@@ -675,6 +675,30 @@ If the request does not encounter errors, you receive the following result:
675675
"ingest_pipeline" : {
676676
"description" : "Ingest pipeline created by file structure finder",
677677
"processors" : [
678+
{
679+
"csv" : {
680+
"field" : "message",
681+
"target_fields" : [
682+
"VendorID",
683+
"tpep_pickup_datetime",
684+
"tpep_dropoff_datetime",
685+
"passenger_count",
686+
"trip_distance",
687+
"RatecodeID",
688+
"store_and_fwd_flag",
689+
"PULocationID",
690+
"DOLocationID",
691+
"payment_type",
692+
"fare_amount",
693+
"extra",
694+
"mta_tax",
695+
"tip_amount",
696+
"tolls_amount",
697+
"improvement_surcharge",
698+
"total_amount"
699+
]
700+
}
701+
},
678702
{
679703
"date" : {
680704
"field" : "tpep_pickup_datetime",
@@ -683,6 +707,95 @@ If the request does not encounter errors, you receive the following result:
683707
"yyyy-MM-dd HH:mm:ss"
684708
]
685709
}
710+
},
711+
{
712+
"convert" : {
713+
"field" : "DOLocationID",
714+
"type" : "long"
715+
}
716+
},
717+
{
718+
"convert" : {
719+
"field" : "PULocationID",
720+
"type" : "long"
721+
}
722+
},
723+
{
724+
"convert" : {
725+
"field" : "RatecodeID",
726+
"type" : "long"
727+
}
728+
},
729+
{
730+
"convert" : {
731+
"field" : "VendorID",
732+
"type" : "long"
733+
}
734+
},
735+
{
736+
"convert" : {
737+
"field" : "extra",
738+
"type" : "double"
739+
}
740+
},
741+
{
742+
"convert" : {
743+
"field" : "fare_amount",
744+
"type" : "double"
745+
}
746+
},
747+
{
748+
"convert" : {
749+
"field" : "improvement_surcharge",
750+
"type" : "double"
751+
}
752+
},
753+
{
754+
"convert" : {
755+
"field" : "mta_tax",
756+
"type" : "double"
757+
}
758+
},
759+
{
760+
"convert" : {
761+
"field" : "passenger_count",
762+
"type" : "long"
763+
}
764+
},
765+
{
766+
"convert" : {
767+
"field" : "payment_type",
768+
"type" : "long"
769+
}
770+
},
771+
{
772+
"convert" : {
773+
"field" : "tip_amount",
774+
"type" : "double"
775+
}
776+
},
777+
{
778+
"convert" : {
779+
"field" : "tolls_amount",
780+
"type" : "double"
781+
}
782+
},
783+
{
784+
"convert" : {
785+
"field" : "total_amount",
786+
"type" : "double"
787+
}
788+
},
789+
{
790+
"convert" : {
791+
"field" : "trip_distance",
792+
"type" : "double"
793+
}
794+
},
795+
{
796+
"remove" : {
797+
"field" : "message"
798+
}
686799
}
687800
]
688801
},

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/DelimitedFileStructureFinder.java

Lines changed: 49 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,18 @@ static DelimitedFileStructureFinder makeDelimitedFileStructureFinder(List<String
9393
// null to allow GC before timestamp search
9494
sampleLines = null;
9595

96+
Tuple<SortedMap<String, Object>, SortedMap<String, FieldStats>> mappingsAndFieldStats =
97+
FileStructureUtils.guessMappingsAndCalculateFieldStats(explanation, sampleRecords, timeoutChecker);
98+
99+
SortedMap<String, Object> mappings = mappingsAndFieldStats.v1();
100+
101+
List<String> columnNamesList = Arrays.asList(columnNames);
96102
char delimiter = (char) csvPreference.getDelimiterChar();
103+
char quoteChar = csvPreference.getQuoteChar();
104+
105+
Map<String, Object> csvProcessorSettings = makeCsvProcessorSettings("message", columnNamesList, delimiter, quoteChar,
106+
trimFields);
107+
97108
FileStructure.Builder structureBuilder = new FileStructure.Builder(FileStructure.Format.DELIMITED)
98109
.setCharset(charsetName)
99110
.setHasByteOrderMarker(hasByteOrderMarker)
@@ -102,8 +113,19 @@ static DelimitedFileStructureFinder makeDelimitedFileStructureFinder(List<String
102113
.setNumMessagesAnalyzed(sampleRecords.size())
103114
.setHasHeaderRow(isHeaderInFile)
104115
.setDelimiter(delimiter)
105-
.setQuote(csvPreference.getQuoteChar())
106-
.setColumnNames(Arrays.stream(columnNames).collect(Collectors.toList()));
116+
.setQuote(quoteChar)
117+
.setColumnNames(columnNamesList);
118+
119+
if (isHeaderInFile) {
120+
String quote = String.valueOf(quoteChar);
121+
String twoQuotes = quote + quote;
122+
String optQuote = quote.replaceAll(REGEX_NEEDS_ESCAPE_PATTERN, "\\\\$1") + "?";
123+
String delimiterMatcher =
124+
(delimiter == '\t') ? "\\t" : String.valueOf(delimiter).replaceAll(REGEX_NEEDS_ESCAPE_PATTERN, "\\\\$1");
125+
structureBuilder.setExcludeLinesPattern("^" + Arrays.stream(header)
126+
.map(column -> optQuote + column.replace(quote, twoQuotes).replaceAll(REGEX_NEEDS_ESCAPE_PATTERN, "\\\\$1") + optQuote)
127+
.collect(Collectors.joining(delimiterMatcher)));
128+
}
107129

108130
if (trimFields) {
109131
structureBuilder.setShouldTrimFields(true);
@@ -135,32 +157,20 @@ static DelimitedFileStructureFinder makeDelimitedFileStructureFinder(List<String
135157
}
136158
}
137159

138-
if (isHeaderInFile) {
139-
String quote = String.valueOf(csvPreference.getQuoteChar());
140-
String twoQuotes = quote + quote;
141-
String optQuote = quote.replaceAll(REGEX_NEEDS_ESCAPE_PATTERN, "\\\\$1") + "?";
142-
String delimiterMatcher =
143-
(delimiter == '\t') ? "\\t" : String.valueOf(delimiter).replaceAll(REGEX_NEEDS_ESCAPE_PATTERN, "\\\\$1");
144-
structureBuilder.setExcludeLinesPattern("^" + Arrays.stream(header)
145-
.map(column -> optQuote + column.replace(quote, twoQuotes).replaceAll(REGEX_NEEDS_ESCAPE_PATTERN, "\\\\$1") + optQuote)
146-
.collect(Collectors.joining(delimiterMatcher)));
147-
}
148-
149160
boolean needClientTimeZone = timeField.v2().hasTimezoneDependentParsing();
150161

151162
structureBuilder.setTimestampField(timeField.v1())
152163
.setJodaTimestampFormats(timeField.v2().getJodaTimestampFormats())
153164
.setJavaTimestampFormats(timeField.v2().getJavaTimestampFormats())
154165
.setNeedClientTimezone(needClientTimeZone)
155-
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), timeField.v1(),
156-
timeField.v2().getJavaTimestampFormats(), needClientTimeZone))
166+
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), csvProcessorSettings,
167+
mappings, timeField.v1(), timeField.v2().getJavaTimestampFormats(), needClientTimeZone))
157168
.setMultilineStartPattern(timeLineRegex);
169+
} else {
170+
structureBuilder.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(),
171+
csvProcessorSettings, mappings, null, null, false));
158172
}
159173

160-
Tuple<SortedMap<String, Object>, SortedMap<String, FieldStats>> mappingsAndFieldStats =
161-
FileStructureUtils.guessMappingsAndCalculateFieldStats(explanation, sampleRecords, timeoutChecker);
162-
163-
SortedMap<String, Object> mappings = mappingsAndFieldStats.v1();
164174
if (timeField != null) {
165175
mappings.put(FileStructureUtils.DEFAULT_TIMESTAMP_FIELD, FileStructureUtils.DATE_MAPPING_WITHOUT_FORMAT);
166176
}
@@ -579,4 +589,24 @@ static boolean canCreateFromSample(List<String> explanation, String sample, int
579589
private static boolean notUnexpectedEndOfFile(SuperCsvException e) {
580590
return e.getMessage().startsWith("unexpected end of file while reading quoted column") == false;
581591
}
592+
593+
static Map<String, Object> makeCsvProcessorSettings(String field, List<String> targetFields, char separator, char quote, boolean trim) {
594+
595+
Map<String, Object> csvProcessorSettings = new LinkedHashMap<>();
596+
csvProcessorSettings.put("field", field);
597+
csvProcessorSettings.put("target_fields", Collections.unmodifiableList(targetFields));
598+
if (separator != ',') {
599+
// The value must be String, not Character, as XContent only works with String
600+
csvProcessorSettings.put("separator", String.valueOf(separator));
601+
}
602+
if (quote != '"') {
603+
// The value must be String, not Character, as XContent only works with String
604+
csvProcessorSettings.put("quote", String.valueOf(quote));
605+
}
606+
csvProcessorSettings.put("ignore_missing", false);
607+
if (trim) {
608+
csvProcessorSettings.put("trim", true);
609+
}
610+
return Collections.unmodifiableMap(csvProcessorSettings);
611+
}
582612
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/FileStructureUtils.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.elasticsearch.xpack.ml.filestructurefinder;
77

88
import org.elasticsearch.common.collect.Tuple;
9+
import org.elasticsearch.common.util.set.Sets;
910
import org.elasticsearch.grok.Grok;
1011
import org.elasticsearch.ingest.Pipeline;
1112
import org.elasticsearch.xpack.core.ml.filestructurefinder.FieldStats;
@@ -31,6 +32,8 @@ public final class FileStructureUtils {
3132
public static final String MAPPING_PROPERTIES_SETTING = "properties";
3233
public static final Map<String, String> DATE_MAPPING_WITHOUT_FORMAT =
3334
Collections.singletonMap(MAPPING_TYPE_SETTING, "date");
35+
public static final Set<String> CONVERTIBLE_TYPES =
36+
Collections.unmodifiableSet(Sets.newHashSet("integer", "long", "float", "double", "boolean"));
3437

3538
private static final int NUM_TOP_HITS = 10;
3639
// NUMBER Grok pattern doesn't support scientific notation, so we extend it
@@ -352,6 +355,9 @@ static boolean isMoreLikelyTextThanKeyword(String str) {
352355
* @param grokPattern The Grok pattern used for parsing semi-structured text formats. <code>null</code> for
353356
* fully structured formats.
354357
* @param customGrokPatternDefinitions The definitions for any custom patterns that {@code grokPattern} uses.
358+
* @param csvProcessorSettings The CSV processor settings for delimited formats. <code>null</code> for
359+
* non-delimited formats.
360+
* @param mappingsForConversions Mappings (or partial mappings) that will be considered for field type conversions.
355361
* @param timestampField The input field containing the timestamp to be parsed into <code>@timestamp</code>.
356362
* <code>null</code> if there is no timestamp.
357363
* @param timestampFormats Timestamp formats to be used for parsing {@code timestampField}.
@@ -360,10 +366,12 @@ static boolean isMoreLikelyTextThanKeyword(String str) {
360366
* @return The ingest pipeline definition, or <code>null</code> if none is required.
361367
*/
362368
public static Map<String, Object> makeIngestPipelineDefinition(String grokPattern, Map<String, String> customGrokPatternDefinitions,
369+
Map<String, Object> csvProcessorSettings,
370+
Map<String, Object> mappingsForConversions,
363371
String timestampField, List<String> timestampFormats,
364372
boolean needClientTimezone) {
365373

366-
if (grokPattern == null && timestampField == null) {
374+
if (grokPattern == null && csvProcessorSettings == null && timestampField == null) {
367375
return null;
368376
}
369377

@@ -384,6 +392,10 @@ public static Map<String, Object> makeIngestPipelineDefinition(String grokPatter
384392
assert customGrokPatternDefinitions.isEmpty();
385393
}
386394

395+
if (csvProcessorSettings != null) {
396+
processors.add(Collections.singletonMap("csv", csvProcessorSettings));
397+
}
398+
387399
if (timestampField != null) {
388400
Map<String, Object> dateProcessorSettings = new LinkedHashMap<>();
389401
dateProcessorSettings.put("field", timestampField);
@@ -394,6 +406,32 @@ public static Map<String, Object> makeIngestPipelineDefinition(String grokPatter
394406
processors.add(Collections.singletonMap("date", dateProcessorSettings));
395407
}
396408

409+
for (Map.Entry<String, Object> mapping : mappingsForConversions.entrySet()) {
410+
String fieldName = mapping.getKey();
411+
Object values = mapping.getValue();
412+
if (values instanceof Map) {
413+
Object type = ((Map<?, ?>) values).get(MAPPING_TYPE_SETTING);
414+
if (CONVERTIBLE_TYPES.contains(type)) {
415+
Map<String, Object> convertProcessorSettings = new LinkedHashMap<>();
416+
convertProcessorSettings.put("field", fieldName);
417+
convertProcessorSettings.put("type", type);
418+
convertProcessorSettings.put("ignore_missing", true);
419+
processors.add(Collections.singletonMap("convert", convertProcessorSettings));
420+
}
421+
}
422+
}
423+
424+
// This removes the unparsed message field for delimited formats (unless the same field name is used for one of the columns)
425+
if (csvProcessorSettings != null) {
426+
Object field = csvProcessorSettings.get("field");
427+
assert field != null;
428+
Object targetFields = csvProcessorSettings.get("target_fields");
429+
assert targetFields instanceof List;
430+
if (((List<?>) targetFields).contains(field) == false) {
431+
processors.add(Collections.singletonMap("remove", Collections.singletonMap("field", field)));
432+
}
433+
}
434+
397435
// This removes the interim timestamp field used for semi-structured text formats
398436
if (grokPattern != null && timestampField != null) {
399437
processors.add(Collections.singletonMap("remove", Collections.singletonMap("field", timestampField)));

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/NdJsonFileStructureFinder.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,10 @@ static NdJsonFileStructureFinder makeNdJsonFileStructureFinder(List<String> expl
6161
.setJodaTimestampFormats(timeField.v2().getJodaTimestampFormats())
6262
.setJavaTimestampFormats(timeField.v2().getJavaTimestampFormats())
6363
.setNeedClientTimezone(needClientTimeZone)
64-
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), timeField.v1(),
65-
timeField.v2().getJavaTimestampFormats(), needClientTimeZone));
64+
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), null,
65+
// Note: no convert processors are added based on mappings for NDJSON input
66+
// because it's reasonable that _source matches the supplied JSON precisely
67+
Collections.emptyMap(), timeField.v1(), timeField.v2().getJavaTimestampFormats(), needClientTimeZone));
6668
}
6769

6870
Tuple<SortedMap<String, Object>, SortedMap<String, FieldStats>> mappingsAndFieldStats =

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/TextLogFileStructureFinder.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,9 +150,8 @@ static TextLogFileStructureFinder makeTextLogFileStructureFinder(List<String> ex
150150
.setJavaTimestampFormats(timestampFormatFinder.getJavaTimestampFormats())
151151
.setNeedClientTimezone(needClientTimeZone)
152152
.setGrokPattern(grokPattern)
153-
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(grokPattern,
154-
customGrokPatternDefinitions, interimTimestampField,
155-
timestampFormatFinder.getJavaTimestampFormats(), needClientTimeZone))
153+
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(grokPattern, customGrokPatternDefinitions, null, mappings,
154+
interimTimestampField, timestampFormatFinder.getJavaTimestampFormats(), needClientTimeZone))
156155
.setMappings(mappings)
157156
.setFieldStats(fieldStats)
158157
.setExplanation(explanation)

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/XmlFileStructureFinder.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,9 @@ static XmlFileStructureFinder makeXmlFileStructureFinder(List<String> explanatio
102102
.setJodaTimestampFormats(timeField.v2().getJodaTimestampFormats())
103103
.setJavaTimestampFormats(timeField.v2().getJavaTimestampFormats())
104104
.setNeedClientTimezone(needClientTimeZone)
105-
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(),
106-
topLevelTag + "." + timeField.v1(), timeField.v2().getJavaTimestampFormats(), needClientTimeZone));
105+
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), null,
106+
Collections.emptyMap(), topLevelTag + "." + timeField.v1(), timeField.v2().getJavaTimestampFormats(),
107+
needClientTimeZone));
107108
}
108109

109110
Tuple<SortedMap<String, Object>, SortedMap<String, FieldStats>> mappingsAndFieldStats =

0 commit comments

Comments
 (0)