1212import org .elasticsearch .client .Request ;
1313import org .elasticsearch .client .Response ;
1414import org .elasticsearch .client .ResponseException ;
15+ import org .elasticsearch .common .Strings ;
1516import org .elasticsearch .common .network .NetworkAddress ;
1617import org .elasticsearch .common .time .DateFormatter ;
1718import org .elasticsearch .common .time .FormatNames ;
1819import org .elasticsearch .common .xcontent .XContentHelper ;
1920import org .elasticsearch .test .rest .ObjectPath ;
21+ import org .elasticsearch .xcontent .XContentBuilder ;
22+ import org .elasticsearch .xcontent .XContentFactory ;
2023import org .elasticsearch .xcontent .XContentType ;
2124
2225import java .io .IOException ;
2629import java .util .List ;
2730import java .util .Locale ;
2831import java .util .Map ;
32+ import java .util .TreeMap ;
2933
3034import static org .elasticsearch .upgrades .StandardToLogsDbIndexModeRollingUpgradeIT .enableLogsdbByDefault ;
3135import static org .elasticsearch .upgrades .StandardToLogsDbIndexModeRollingUpgradeIT .getWriteBackingIndex ;
@@ -58,15 +62,7 @@ public abstract class AbstractStringTypeRollingUpgradeIT extends AbstractRolling
5862 "method": {
5963 "type": "keyword"
6064 },
61- "message": {
62- "type": "$STRING_TYPE",
63- "fields": {
64- "keyword": {
65- "ignore_above": $IGNORE_ABOVE,
66- "type": "keyword"
67- }
68- }
69- },
65+ "message": $MESSAGE_MAPPING,
7066 "ip": {
7167 "type": "ip"
7268 },
@@ -81,7 +77,7 @@ public abstract class AbstractStringTypeRollingUpgradeIT extends AbstractRolling
8177 }""" ;
8278
8379 // when sorted, this message will appear at the top and hence can be used to validate query results
84- private static String smallestMessage ;
80+ private static Map < String , String > smallestMessageMap = new TreeMap <>() ;
8581
8682 public AbstractStringTypeRollingUpgradeIT (@ Name ("upgradedNodes" ) int upgradedNodes ) {
8783 super (upgradedNodes );
@@ -90,68 +86,87 @@ public AbstractStringTypeRollingUpgradeIT(@Name("upgradedNodes") int upgradedNod
9086 abstract String stringType ();
9187
9288 public void testIndexing () throws Exception {
89+ testIndexing (false );
90+ }
91+
92+ public void testIndexingWithMultifield () throws Exception {
93+ testIndexing (true );
94+ }
95+
96+ protected void testIndexing (boolean shouldIncludeKeywordMultifield ) throws Exception {
97+ String dataStreamName = DATA_STREAM + (shouldIncludeKeywordMultifield ? "-multifield" : "" );
9398 if (isOldCluster ()) {
99+ smallestMessageMap .remove (dataStreamName );
100+
94101 // given - enable logsdb and create a template
95102 startTrial ();
96103 enableLogsdbByDefault ();
97- String templateId = getClass ().getSimpleName ().toLowerCase (Locale .ROOT );
98- createTemplate (DATA_STREAM , templateId , prepareTemplate ());
104+ String templateId = getClass ().getSimpleName ().toLowerCase (Locale .ROOT ) + ( shouldIncludeKeywordMultifield ? "-multifield" : "" ) ;
105+ createTemplate (dataStreamName , templateId , prepareTemplate (shouldIncludeKeywordMultifield ));
99106
100107 // when - index some documents
101- bulkIndex (NUM_REQUESTS , NUM_DOCS_PER_REQUEST );
108+ bulkIndex (dataStreamName , NUM_REQUESTS , NUM_DOCS_PER_REQUEST );
102109
103110 // then - verify that logsdb and synthetic source are both enabled
104- String firstBackingIndex = getWriteBackingIndex (client (), DATA_STREAM , 0 );
111+ String firstBackingIndex = getWriteBackingIndex (client (), dataStreamName , 0 );
105112 var settings = (Map <?, ?>) getIndexSettingsWithDefaults (firstBackingIndex ).get (firstBackingIndex );
106113 assertThat (((Map <?, ?>) settings .get ("settings" )).get ("index.mode" ), equalTo ("logsdb" ));
107114 assertThat (((Map <?, ?>) settings .get ("defaults" )).get ("index.mapping.source.mode" ), equalTo ("SYNTHETIC" ));
108115
109116 // then continued - verify that the created data stream using the created template
110- LogsdbIndexingRollingUpgradeIT .assertDataStream (DATA_STREAM , templateId );
117+ LogsdbIndexingRollingUpgradeIT .assertDataStream (dataStreamName , templateId );
111118
112119 // when/then - run some queries and verify results
113- ensureGreen (DATA_STREAM );
114- search (DATA_STREAM );
115- phraseSearch (DATA_STREAM );
116- query (DATA_STREAM );
120+ ensureGreen (dataStreamName );
121+ search (dataStreamName );
122+ phraseSearch (dataStreamName );
123+ query (dataStreamName );
117124 } else if (isMixedCluster ()) {
118125 // when
119- bulkIndex (NUM_REQUESTS , NUM_DOCS_PER_REQUEST );
126+ bulkIndex (dataStreamName , NUM_REQUESTS , NUM_DOCS_PER_REQUEST );
120127
121128 // when/then
122- ensureGreen (DATA_STREAM );
123- search (DATA_STREAM );
124- phraseSearch (DATA_STREAM );
125- query (DATA_STREAM );
129+ ensureGreen (dataStreamName );
130+ search (dataStreamName );
131+ phraseSearch (dataStreamName );
132+ query (dataStreamName );
126133 } else if (isUpgradedCluster ()) {
127134 // when/then
128- ensureGreen (DATA_STREAM );
129- bulkIndex (NUM_REQUESTS , NUM_DOCS_PER_REQUEST );
130- search (DATA_STREAM );
131- phraseSearch (DATA_STREAM );
132- query (DATA_STREAM );
135+ ensureGreen (dataStreamName );
136+ bulkIndex (dataStreamName , NUM_REQUESTS , NUM_DOCS_PER_REQUEST );
137+ search (dataStreamName );
138+ phraseSearch (dataStreamName );
139+ query (dataStreamName );
133140
134141 // when/then continued - force merge all shard segments into one
135- var forceMergeRequest = new Request ("POST" , "/" + DATA_STREAM + "/_forcemerge" );
142+ var forceMergeRequest = new Request ("POST" , "/" + dataStreamName + "/_forcemerge" );
136143 forceMergeRequest .addParameter ("max_num_segments" , "1" );
137144 assertOK (client ().performRequest (forceMergeRequest ));
138145
139146 // then continued
140- ensureGreen (DATA_STREAM );
141- search (DATA_STREAM );
142- query (DATA_STREAM );
147+ ensureGreen (dataStreamName );
148+ search (dataStreamName );
149+ query (dataStreamName );
143150 }
144151 }
145152
146- private String prepareTemplate () {
147- boolean shouldSetIgnoreAbove = randomBoolean ();
148- String templateWithType = TEMPLATE .replace ("$STRING_TYPE" , stringType ());
149- if (shouldSetIgnoreAbove ) {
150- return templateWithType .replace ("$IGNORE_ABOVE" , String .valueOf (randomInt (IGNORE_ABOVE_MAX )));
153+ private String prepareTemplate (boolean shouldIncludeKeywordMultifield ) throws IOException {
154+ XContentBuilder b = XContentFactory .jsonBuilder ();
155+ b .startObject ();
156+ b .field ("type" , stringType ());
157+
158+ if (shouldIncludeKeywordMultifield ) {
159+ b .startObject ("fields" ).startObject ("keyword" );
160+ b .field ("type" , "keyword" );
161+ boolean shouldSetIgnoreAbove = randomBoolean ();
162+ if (shouldSetIgnoreAbove ) {
163+ b .field ("ignore_above" , randomInt (IGNORE_ABOVE_MAX ));
164+ }
165+ b .endObject ().endObject ();
151166 }
167+ b .endObject ();
152168
153- // removes the entire line that defines ignore_above
154- return templateWithType .replaceAll ("(?m)^\\ s*\" ignore_above\" :\\ s*\\ $IGNORE_ABOVE\\ s*,?\\ s*\\ n?" , "" );
169+ return TEMPLATE .replace ("$MESSAGE_MAPPING" , Strings .toString (b ));
155170 }
156171
157172 static void createTemplate (String dataStreamName , String id , String template ) throws IOException {
@@ -168,13 +183,13 @@ static void createTemplate(String dataStreamName, String id, String template) th
168183 assertOK (client ().performRequest (putIndexTemplateRequest ));
169184 }
170185
171- private void bulkIndex (int numRequest , int numDocs ) throws Exception {
186+ private void bulkIndex (String dataStreamName , int numRequest , int numDocs ) throws Exception {
172187 String firstIndex = null ;
173188 Instant startTime = Instant .now ().minusSeconds (60 * 60 );
174189
175190 for (int i = 0 ; i < numRequest ; i ++) {
176- var bulkRequest = new Request ("POST" , "/" + DATA_STREAM + "/_bulk" );
177- bulkRequest .setJsonEntity (bulkIndexRequestBody (numDocs , startTime ));
191+ var bulkRequest = new Request ("POST" , "/" + dataStreamName + "/_bulk" );
192+ bulkRequest .setJsonEntity (bulkIndexRequestBody (dataStreamName , numDocs , startTime ));
178193 bulkRequest .addParameter ("refresh" , "true" );
179194
180195 var response = client ().performRequest (bulkRequest );
@@ -188,15 +203,15 @@ private void bulkIndex(int numRequest, int numDocs) throws Exception {
188203 }
189204 }
190205
191- private String bulkIndexRequestBody (int numDocs , Instant startTime ) {
206+ private String bulkIndexRequestBody (String dataStreamName , int numDocs , Instant startTime ) {
192207 StringBuilder requestBody = new StringBuilder ();
193208
194209 for (int j = 0 ; j < numDocs ; j ++) {
195210 String hostName = "host" + j % 50 ; // Not realistic, but makes asserting search / query response easier.
196211 String methodName = "method" + j % 5 ;
197212 String ip = NetworkAddress .format (randomIp (true ));
198213 String message = randomAlphasDelimitedBySpace (10 , 1 , 15 );
199- recordSmallestMessage (message );
214+ recordSmallestMessage (dataStreamName , message );
200215 long length = randomLong ();
201216 double factor = randomDouble ();
202217
@@ -229,9 +244,9 @@ public static String randomAlphasDelimitedBySpace(int maxAlphas, int minCodeUnit
229244 return String .join (" " , alphas );
230245 }
231246
232- private void recordSmallestMessage (final String message ) {
233- if (smallestMessage == null || message .compareTo (smallestMessage ) < 0 ) {
234- smallestMessage = message ;
247+ private void recordSmallestMessage (final String dataStreamName , final String message ) {
248+ if (smallestMessageMap . containsKey ( dataStreamName ) == false || message .compareTo (smallestMessageMap . get ( dataStreamName ) ) < 0 ) {
249+ smallestMessageMap . put ( dataStreamName , message ) ;
235250 }
236251 }
237252
@@ -263,7 +278,7 @@ private void phraseSearch(String dataStreamName) throws Exception {
263278 }
264279 }
265280 }
266- """ .replace ("$smallestMessage" , smallestMessage ));
281+ """ .replace ("$smallestMessage" , smallestMessageMap . get ( dataStreamName ) ));
267282 var response = client ().performRequest (searchRequest );
268283 assertOK (response );
269284 var responseBody = entityAsMap (response );
@@ -296,7 +311,7 @@ private void query(String dataStreamName) throws Exception {
296311 Double maxTx = ObjectPath .evaluate (responseBody , "values.0.1" );
297312 assertThat (maxTx , notNullValue ());
298313 String key = ObjectPath .evaluate (responseBody , "values.0.2" );
299- assertThat (key , equalTo (smallestMessage ));
314+ assertThat (key , equalTo (smallestMessageMap . get ( dataStreamName ) ));
300315 }
301316
302317 protected static void startTrial () throws IOException {
0 commit comments