Skip to content

Commit 2aef7e0

Browse files
authored
Introduce mapping version to index metadata (#33147)
This commit introduces mapping version to index metadata. This value is monotonically increasing and is updated on mapping updates. This will be useful in cross-cluster replication so that we can request mapping updates from the leader only when there is a mapping update as opposed to the strategy we employ today which is to request a mapping update any time there is an index metadata update. As index metadata updates can occur for many reasons other than mapping updates, this leads to some unnecessary requests and work in cross-cluster replication.
1 parent 3d9ca4b commit 2aef7e0

File tree

12 files changed

+186
-15
lines changed

12 files changed

+186
-15
lines changed

server/src/main/java/org/elasticsearch/cluster/ClusterState.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ public String toString() {
284284
final String TAB = " ";
285285
for (IndexMetaData indexMetaData : metaData) {
286286
sb.append(TAB).append(indexMetaData.getIndex());
287-
sb.append(": v[").append(indexMetaData.getVersion()).append("]\n");
287+
sb.append(": v[").append(indexMetaData.getVersion()).append("], mv[").append(indexMetaData.getMappingVersion()).append("]\n");
288288
for (int shard = 0; shard < indexMetaData.getNumberOfShards(); shard++) {
289289
sb.append(TAB).append(TAB).append(shard).append(": ");
290290
sb.append("p_term [").append(indexMetaData.primaryTerm(shard)).append("], ");

server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.carrotsearch.hppc.cursors.ObjectCursor;
2525
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
2626

27+
import org.elasticsearch.Assertions;
2728
import org.elasticsearch.Version;
2829
import org.elasticsearch.action.admin.indices.rollover.RolloverInfo;
2930
import org.elasticsearch.action.support.ActiveShardCount;
@@ -291,6 +292,7 @@ public Iterator<Setting<Integer>> settings() {
291292

292293
public static final String KEY_IN_SYNC_ALLOCATIONS = "in_sync_allocations";
293294
static final String KEY_VERSION = "version";
295+
static final String KEY_MAPPING_VERSION = "mapping_version";
294296
static final String KEY_ROUTING_NUM_SHARDS = "routing_num_shards";
295297
static final String KEY_SETTINGS = "settings";
296298
static final String KEY_STATE = "state";
@@ -309,6 +311,9 @@ public Iterator<Setting<Integer>> settings() {
309311

310312
private final Index index;
311313
private final long version;
314+
315+
private final long mappingVersion;
316+
312317
private final long[] primaryTerms;
313318

314319
private final State state;
@@ -336,7 +341,7 @@ public Iterator<Setting<Integer>> settings() {
336341
private final ActiveShardCount waitForActiveShards;
337342
private final ImmutableOpenMap<String, RolloverInfo> rolloverInfos;
338343

339-
private IndexMetaData(Index index, long version, long[] primaryTerms, State state, int numberOfShards, int numberOfReplicas, Settings settings,
344+
private IndexMetaData(Index index, long version, long mappingVersion, long[] primaryTerms, State state, int numberOfShards, int numberOfReplicas, Settings settings,
340345
ImmutableOpenMap<String, MappingMetaData> mappings, ImmutableOpenMap<String, AliasMetaData> aliases,
341346
ImmutableOpenMap<String, Custom> customs, ImmutableOpenIntMap<Set<String>> inSyncAllocationIds,
342347
DiscoveryNodeFilters requireFilters, DiscoveryNodeFilters initialRecoveryFilters, DiscoveryNodeFilters includeFilters, DiscoveryNodeFilters excludeFilters,
@@ -345,6 +350,8 @@ private IndexMetaData(Index index, long version, long[] primaryTerms, State stat
345350

346351
this.index = index;
347352
this.version = version;
353+
assert mappingVersion >= 0 : mappingVersion;
354+
this.mappingVersion = mappingVersion;
348355
this.primaryTerms = primaryTerms;
349356
assert primaryTerms.length == numberOfShards;
350357
this.state = state;
@@ -394,6 +401,9 @@ public long getVersion() {
394401
return this.version;
395402
}
396403

404+
public long getMappingVersion() {
405+
return mappingVersion;
406+
}
397407

398408
/**
399409
* The term of the current selected primary. This is a non-negative number incremented when
@@ -644,6 +654,7 @@ private static class IndexMetaDataDiff implements Diff<IndexMetaData> {
644654
private final String index;
645655
private final int routingNumShards;
646656
private final long version;
657+
private final long mappingVersion;
647658
private final long[] primaryTerms;
648659
private final State state;
649660
private final Settings settings;
@@ -656,6 +667,7 @@ private static class IndexMetaDataDiff implements Diff<IndexMetaData> {
656667
IndexMetaDataDiff(IndexMetaData before, IndexMetaData after) {
657668
index = after.index.getName();
658669
version = after.version;
670+
mappingVersion = after.mappingVersion;
659671
routingNumShards = after.routingNumShards;
660672
state = after.state;
661673
settings = after.settings;
@@ -672,6 +684,11 @@ private static class IndexMetaDataDiff implements Diff<IndexMetaData> {
672684
index = in.readString();
673685
routingNumShards = in.readInt();
674686
version = in.readLong();
687+
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
688+
mappingVersion = in.readVLong();
689+
} else {
690+
mappingVersion = 1;
691+
}
675692
state = State.fromId(in.readByte());
676693
settings = Settings.readSettingsFromStream(in);
677694
primaryTerms = in.readVLongArray();
@@ -707,6 +724,9 @@ public void writeTo(StreamOutput out) throws IOException {
707724
out.writeString(index);
708725
out.writeInt(routingNumShards);
709726
out.writeLong(version);
727+
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
728+
out.writeVLong(mappingVersion);
729+
}
710730
out.writeByte(state.id);
711731
Settings.writeSettingsToStream(settings, out);
712732
out.writeVLongArray(primaryTerms);
@@ -723,6 +743,7 @@ public void writeTo(StreamOutput out) throws IOException {
723743
public IndexMetaData apply(IndexMetaData part) {
724744
Builder builder = builder(index);
725745
builder.version(version);
746+
builder.mappingVersion(mappingVersion);
726747
builder.setRoutingNumShards(routingNumShards);
727748
builder.state(state);
728749
builder.settings(settings);
@@ -739,6 +760,11 @@ public IndexMetaData apply(IndexMetaData part) {
739760
public static IndexMetaData readFrom(StreamInput in) throws IOException {
740761
Builder builder = new Builder(in.readString());
741762
builder.version(in.readLong());
763+
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
764+
builder.mappingVersion(in.readVLong());
765+
} else {
766+
builder.mappingVersion(1);
767+
}
742768
builder.setRoutingNumShards(in.readInt());
743769
builder.state(State.fromId(in.readByte()));
744770
builder.settings(readSettingsFromStream(in));
@@ -778,6 +804,9 @@ public static IndexMetaData readFrom(StreamInput in) throws IOException {
778804
public void writeTo(StreamOutput out) throws IOException {
779805
out.writeString(index.getName()); // uuid will come as part of settings
780806
out.writeLong(version);
807+
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
808+
out.writeVLong(mappingVersion);
809+
}
781810
out.writeInt(routingNumShards);
782811
out.writeByte(state.id());
783812
writeSettingsToStream(settings, out);
@@ -821,6 +850,7 @@ public static class Builder {
821850
private String index;
822851
private State state = State.OPEN;
823852
private long version = 1;
853+
private long mappingVersion = 1;
824854
private long[] primaryTerms = null;
825855
private Settings settings = Settings.Builder.EMPTY_SETTINGS;
826856
private final ImmutableOpenMap.Builder<String, MappingMetaData> mappings;
@@ -843,6 +873,7 @@ public Builder(IndexMetaData indexMetaData) {
843873
this.index = indexMetaData.getIndex().getName();
844874
this.state = indexMetaData.state;
845875
this.version = indexMetaData.version;
876+
this.mappingVersion = indexMetaData.mappingVersion;
846877
this.settings = indexMetaData.getSettings();
847878
this.primaryTerms = indexMetaData.primaryTerms.clone();
848879
this.mappings = ImmutableOpenMap.builder(indexMetaData.mappings);
@@ -1009,6 +1040,15 @@ public Builder version(long version) {
10091040
return this;
10101041
}
10111042

1043+
public long mappingVersion() {
1044+
return mappingVersion;
1045+
}
1046+
1047+
public Builder mappingVersion(final long mappingVersion) {
1048+
this.mappingVersion = mappingVersion;
1049+
return this;
1050+
}
1051+
10121052
/**
10131053
* returns the primary term for the given shard.
10141054
* See {@link IndexMetaData#primaryTerm(int)} for more information.
@@ -1136,7 +1176,7 @@ public IndexMetaData build() {
11361176

11371177
final String uuid = settings.get(SETTING_INDEX_UUID, INDEX_UUID_NA_VALUE);
11381178

1139-
return new IndexMetaData(new Index(index, uuid), version, primaryTerms, state, numberOfShards, numberOfReplicas, tmpSettings, mappings.build(),
1179+
return new IndexMetaData(new Index(index, uuid), version, mappingVersion, primaryTerms, state, numberOfShards, numberOfReplicas, tmpSettings, mappings.build(),
11401180
tmpAliases.build(), customs.build(), filledInSyncAllocationIds.build(), requireFilters, initialRecoveryFilters, includeFilters, excludeFilters,
11411181
indexCreatedVersion, indexUpgradedVersion, getRoutingNumShards(), routingPartitionSize, waitForActiveShards, rolloverInfos.build());
11421182
}
@@ -1145,6 +1185,7 @@ public static void toXContent(IndexMetaData indexMetaData, XContentBuilder build
11451185
builder.startObject(indexMetaData.getIndex().getName());
11461186

11471187
builder.field(KEY_VERSION, indexMetaData.getVersion());
1188+
builder.field(KEY_MAPPING_VERSION, indexMetaData.getMappingVersion());
11481189
builder.field(KEY_ROUTING_NUM_SHARDS, indexMetaData.getRoutingNumShards());
11491190
builder.field(KEY_STATE, indexMetaData.getState().toString().toLowerCase(Locale.ENGLISH));
11501191

@@ -1218,6 +1259,7 @@ public static IndexMetaData fromXContent(XContentParser parser) throws IOExcepti
12181259
if (token != XContentParser.Token.START_OBJECT) {
12191260
throw new IllegalArgumentException("expected object but got a " + token);
12201261
}
1262+
boolean mappingVersion = false;
12211263
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
12221264
if (token == XContentParser.Token.FIELD_NAME) {
12231265
currentFieldName = parser.currentName();
@@ -1316,6 +1358,9 @@ public static IndexMetaData fromXContent(XContentParser parser) throws IOExcepti
13161358
builder.state(State.fromString(parser.text()));
13171359
} else if (KEY_VERSION.equals(currentFieldName)) {
13181360
builder.version(parser.longValue());
1361+
} else if (KEY_MAPPING_VERSION.equals(currentFieldName)) {
1362+
mappingVersion = true;
1363+
builder.mappingVersion(parser.longValue());
13191364
} else if (KEY_ROUTING_NUM_SHARDS.equals(currentFieldName)) {
13201365
builder.setRoutingNumShards(parser.intValue());
13211366
} else {
@@ -1325,6 +1370,9 @@ public static IndexMetaData fromXContent(XContentParser parser) throws IOExcepti
13251370
throw new IllegalArgumentException("Unexpected token " + token);
13261371
}
13271372
}
1373+
if (Assertions.ENABLED && Version.indexCreated(builder.settings).onOrAfter(Version.V_7_0_0_alpha1)) {
1374+
assert mappingVersion : "mapping version should be present for indices created on or after 7.0.0";
1375+
}
13281376
return builder.build();
13291377
}
13301378
}

server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ private ClusterState applyRequest(ClusterState currentState, PutMappingClusterSt
287287
MetaData.Builder builder = MetaData.builder(metaData);
288288
boolean updated = false;
289289
for (IndexMetaData indexMetaData : updateList) {
290+
boolean updatedMapping = false;
290291
// do the actual merge here on the master, and update the mapping source
291292
// we use the exact same indexService and metadata we used to validate above here to actually apply the update
292293
final Index index = indexMetaData.getIndex();
@@ -303,7 +304,7 @@ private ClusterState applyRequest(ClusterState currentState, PutMappingClusterSt
303304
if (existingSource.equals(updatedSource)) {
304305
// same source, no changes, ignore it
305306
} else {
306-
updated = true;
307+
updatedMapping = true;
307308
// use the merged mapping source
308309
if (logger.isDebugEnabled()) {
309310
logger.debug("{} update_mapping [{}] with source [{}]", index, mergedMapper.type(), updatedSource);
@@ -313,7 +314,7 @@ private ClusterState applyRequest(ClusterState currentState, PutMappingClusterSt
313314

314315
}
315316
} else {
316-
updated = true;
317+
updatedMapping = true;
317318
if (logger.isDebugEnabled()) {
318319
logger.debug("{} create_mapping [{}] with source [{}]", index, mappingType, updatedSource);
319320
} else if (logger.isInfoEnabled()) {
@@ -329,7 +330,16 @@ private ClusterState applyRequest(ClusterState currentState, PutMappingClusterSt
329330
indexMetaDataBuilder.putMapping(new MappingMetaData(mapper.mappingSource()));
330331
}
331332
}
333+
if (updatedMapping) {
334+
indexMetaDataBuilder.mappingVersion(1 + indexMetaDataBuilder.mappingVersion());
335+
}
336+
/*
337+
* This implicitly increments the index metadata version and builds the index metadata. This means that we need to have
338+
* already incremented the mapping version if necessary. Therefore, the mapping version increment must remain before this
339+
* statement.
340+
*/
332341
builder.put(indexMetaDataBuilder);
342+
updated |= updatedMapping;
333343
}
334344
if (updated) {
335345
return ClusterState.builder(currentState).metaData(builder).build();

server/src/main/java/org/elasticsearch/index/IndexService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -522,8 +522,8 @@ List<SearchOperationListener> getSearchOperationListener() { // pkg private for
522522
}
523523

524524
@Override
525-
public boolean updateMapping(IndexMetaData indexMetaData) throws IOException {
526-
return mapperService().updateMapping(indexMetaData);
525+
public boolean updateMapping(final IndexMetaData currentIndexMetaData, final IndexMetaData newIndexMetaData) throws IOException {
526+
return mapperService().updateMapping(currentIndexMetaData, newIndexMetaData);
527527
}
528528

529529
private class StoreCloseListener implements Store.OnClose {

server/src/main/java/org/elasticsearch/index/mapper/MapperService.java

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.lucene.analysis.Analyzer;
2626
import org.apache.lucene.analysis.DelegatingAnalyzerWrapper;
2727
import org.apache.lucene.index.Term;
28+
import org.elasticsearch.Assertions;
2829
import org.elasticsearch.ElasticsearchGenerationException;
2930
import org.elasticsearch.Version;
3031
import org.elasticsearch.cluster.metadata.IndexMetaData;
@@ -192,8 +193,8 @@ public static Map<String, Object> parseMapping(NamedXContentRegistry xContentReg
192193
/**
193194
* Update mapping by only merging the metadata that is different between received and stored entries
194195
*/
195-
public boolean updateMapping(IndexMetaData indexMetaData) throws IOException {
196-
assert indexMetaData.getIndex().equals(index()) : "index mismatch: expected " + index() + " but was " + indexMetaData.getIndex();
196+
public boolean updateMapping(final IndexMetaData currentIndexMetaData, final IndexMetaData newIndexMetaData) throws IOException {
197+
assert newIndexMetaData.getIndex().equals(index()) : "index mismatch: expected " + index() + " but was " + newIndexMetaData.getIndex();
197198
// go over and add the relevant mappings (or update them)
198199
Set<String> existingMappers = new HashSet<>();
199200
if (mapper != null) {
@@ -205,17 +206,19 @@ public boolean updateMapping(IndexMetaData indexMetaData) throws IOException {
205206
final Map<String, DocumentMapper> updatedEntries;
206207
try {
207208
// only update entries if needed
208-
updatedEntries = internalMerge(indexMetaData, MergeReason.MAPPING_RECOVERY, true);
209+
updatedEntries = internalMerge(newIndexMetaData, MergeReason.MAPPING_RECOVERY, true);
209210
} catch (Exception e) {
210211
logger.warn(() -> new ParameterizedMessage("[{}] failed to apply mappings", index()), e);
211212
throw e;
212213
}
213214

214215
boolean requireRefresh = false;
215216

217+
assertMappingVersion(currentIndexMetaData, newIndexMetaData, updatedEntries);
218+
216219
for (DocumentMapper documentMapper : updatedEntries.values()) {
217220
String mappingType = documentMapper.type();
218-
CompressedXContent incomingMappingSource = indexMetaData.mapping(mappingType).source();
221+
CompressedXContent incomingMappingSource = newIndexMetaData.mapping(mappingType).source();
219222

220223
String op = existingMappers.contains(mappingType) ? "updated" : "added";
221224
if (logger.isDebugEnabled() && incomingMappingSource.compressed().length < 512) {
@@ -240,6 +243,45 @@ public boolean updateMapping(IndexMetaData indexMetaData) throws IOException {
240243
return requireRefresh;
241244
}
242245

246+
private void assertMappingVersion(
247+
final IndexMetaData currentIndexMetaData,
248+
final IndexMetaData newIndexMetaData,
249+
final Map<String, DocumentMapper> updatedEntries) {
250+
if (Assertions.ENABLED
251+
&& currentIndexMetaData != null
252+
&& currentIndexMetaData.getCreationVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
253+
if (currentIndexMetaData.getMappingVersion() == newIndexMetaData.getMappingVersion()) {
254+
// if the mapping version is unchanged, then there should not be any updates and all mappings should be the same
255+
assert updatedEntries.isEmpty() : updatedEntries;
256+
for (final ObjectCursor<MappingMetaData> mapping : newIndexMetaData.getMappings().values()) {
257+
final CompressedXContent currentSource = currentIndexMetaData.mapping(mapping.value.type()).source();
258+
final CompressedXContent newSource = mapping.value.source();
259+
assert currentSource.equals(newSource) :
260+
"expected current mapping [" + currentSource + "] for type [" + mapping.value.type() + "] "
261+
+ "to be the same as new mapping [" + newSource + "]";
262+
}
263+
} else {
264+
// if the mapping version is changed, it should increase, there should be updates, and the mapping should be different
265+
final long currentMappingVersion = currentIndexMetaData.getMappingVersion();
266+
final long newMappingVersion = newIndexMetaData.getMappingVersion();
267+
assert currentMappingVersion < newMappingVersion :
268+
"expected current mapping version [" + currentMappingVersion + "] "
269+
+ "to be less than new mapping version [" + newMappingVersion + "]";
270+
assert updatedEntries.isEmpty() == false;
271+
for (final DocumentMapper documentMapper : updatedEntries.values()) {
272+
final MappingMetaData currentMapping = currentIndexMetaData.mapping(documentMapper.type());
273+
if (currentMapping != null) {
274+
final CompressedXContent currentSource = currentMapping.source();
275+
final CompressedXContent newSource = documentMapper.mappingSource();
276+
assert currentSource.equals(newSource) == false :
277+
"expected current mapping [" + currentSource + "] for type [" + documentMapper.type() + "] " +
278+
"to be different than new mapping";
279+
}
280+
}
281+
}
282+
}
283+
}
284+
243285
public void merge(Map<String, Map<String, Object>> mappings, MergeReason reason) {
244286
Map<String, CompressedXContent> mappingSourcesCompressed = new LinkedHashMap<>(mappings.size());
245287
for (Map.Entry<String, Map<String, Object>> entry : mappings.entrySet()) {

0 commit comments

Comments
 (0)