Skip to content

Commit 78d86bc

Browse files
committed
[STATE] Refactor state format to use incremental state IDs
Today there is a chance that the state version for shard, index or cluster state goes backwards or is reset on a full restart etc. depending on several factors not related to the state. To prevent any collisions with already existing state files and to maintain write-once properties this change introductes an incremental state ID instead of using the plain state version. This also fixes a bug when the previous legacy state had a greater version than the current state which causes an exception on node startup or if left-over files are present. Closes #10316
1 parent ff6b605 commit 78d86bc

File tree

7 files changed

+238
-221
lines changed

7 files changed

+238
-221
lines changed

src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java

Lines changed: 65 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -56,17 +56,19 @@ public abstract class MetaDataStateFormat<T> {
5656
private static final int STATE_FILE_VERSION = 0;
5757
private static final int BUFFER_SIZE = 4096;
5858
private final XContentType format;
59-
private final boolean deleteOldFiles;
59+
private final String prefix;
60+
private final Pattern stateFilePattern;
61+
6062

6163
/**
6264
* Creates a new {@link MetaDataStateFormat} instance
6365
* @param format the format of the x-content
64-
* @param deleteOldFiles if <code>true</code> write operations will
65-
* clean up old files written with this format.
6666
*/
67-
protected MetaDataStateFormat(XContentType format, boolean deleteOldFiles) {
67+
protected MetaDataStateFormat(XContentType format, String prefix) {
6868
this.format = format;
69-
this.deleteOldFiles = deleteOldFiles;
69+
this.prefix = prefix;
70+
this.stateFilePattern = Pattern.compile(Pattern.quote(prefix) + "(\\d+)(" + MetaDataStateFormat.STATE_FILE_EXTENSION + ")?");
71+
7072
}
7173

7274
/**
@@ -83,15 +85,16 @@ public XContentType format() {
8385
* it's target filename of the pattern <tt>{prefix}{version}.st</tt>.
8486
*
8587
* @param state the state object to write
86-
* @param prefix the state names prefix used to compose the file name.
8788
* @param version the version of the state
8889
* @param locations the locations where the state should be written to.
8990
* @throws IOException if an IOException occurs
9091
*/
91-
public final void write(final T state, final String prefix, final long version, final Path... locations) throws IOException {
92+
public final void write(final T state, final long version, final Path... locations) throws IOException {
9293
Preconditions.checkArgument(locations != null, "Locations must not be null");
9394
Preconditions.checkArgument(locations.length > 0, "One or more locations required");
94-
String fileName = prefix + version + STATE_FILE_EXTENSION;
95+
final long maxStateId = findMaxStateId(prefix, locations)+1;
96+
assert maxStateId >= 0 : "maxStateId must be positive but was: [" + maxStateId + "]";
97+
final String fileName = prefix + maxStateId + STATE_FILE_EXTENSION;
9598
Path stateLocation = locations[0].resolve(STATE_DIR_NAME);
9699
Files.createDirectories(stateLocation);
97100
final Path tmpStatePath = stateLocation.resolve(fileName + ".tmp");
@@ -136,9 +139,7 @@ public void close() throws IOException {
136139
} finally {
137140
Files.deleteIfExists(tmpStatePath);
138141
}
139-
if (deleteOldFiles) {
140-
cleanupOldFiles(prefix, fileName, locations);
141-
}
142+
cleanupOldFiles(prefix, fileName, locations);
142143
}
143144

144145
protected XContentBuilder newXContentBuilder(XContentType type, OutputStream stream ) throws IOException {
@@ -161,17 +162,14 @@ protected XContentBuilder newXContentBuilder(XContentType type, OutputStream str
161162
* Reads the state from a given file and compares the expected version against the actual version of
162163
* the state.
163164
*/
164-
public final T read(Path file, long expectedVersion) throws IOException {
165+
public final T read(Path file) throws IOException {
165166
try (Directory dir = newDirectory(file.getParent())) {
166167
try (final IndexInput indexInput = dir.openInput(file.getFileName().toString(), IOContext.DEFAULT)) {
167168
// We checksum the entire file before we even go and parse it. If it's corrupted we barf right here.
168169
CodecUtil.checksumEntireFile(indexInput);
169170
CodecUtil.checkHeader(indexInput, STATE_FILE_CODEC, STATE_FILE_VERSION, STATE_FILE_VERSION);
170171
final XContentType xContentType = XContentType.values()[indexInput.readInt()];
171-
final long version = indexInput.readLong();
172-
if (version != expectedVersion) {
173-
throw new CorruptStateException("State version mismatch expected: " + expectedVersion + " but was: " + version);
174-
}
172+
indexInput.readLong(); // version currently unused
175173
long filePointer = indexInput.getFilePointer();
176174
long contentSize = indexInput.length() - CodecUtil.footerLength() - filePointer;
177175
try (IndexInput slice = indexInput.slice("state_xcontent", filePointer, contentSize)) {
@@ -210,25 +208,38 @@ public boolean accept(Path entry) throws IOException {
210208
}
211209
}
212210

211+
long findMaxStateId(final String prefix, Path... locations) throws IOException {
212+
long maxId = -1;
213+
for (Path dataLocation : locations) {
214+
final Path resolve = dataLocation.resolve(STATE_DIR_NAME);
215+
if (Files.exists(resolve)) {
216+
try (DirectoryStream<Path> stream = Files.newDirectoryStream(resolve, prefix + "*")) {
217+
for (Path stateFile : stream) {
218+
final Matcher matcher = stateFilePattern.matcher(stateFile.getFileName().toString());
219+
if (matcher.matches()) {
220+
final long id = Long.parseLong(matcher.group(1));
221+
maxId = Math.max(maxId, id);
222+
}
223+
}
224+
}
225+
}
226+
}
227+
return maxId;
228+
}
229+
213230
/**
214231
* Tries to load the latest state from the given data-locations. It tries to load the latest state determined by
215232
* the states version from one or more data directories and if none of the latest states can be loaded an exception
216233
* is thrown to prevent accidentally loading a previous state and silently omitting the latest state.
217234
*
218235
* @param logger an elasticsearch logger instance
219-
* @param format the actual metastate format to use
220-
* @param pattern the file name pattern to identify files belonging to this pattern and to read the version from.
221-
* The first capture group should return the version of the file. If the second capture group is has a
222-
* null value the files is considered a legacy file and will be treated as if the file contains a plain
223-
* x-content payload.
224-
* @param stateType the state type we are loading. used for logging contenxt only.
225236
* @param dataLocations the data-locations to try.
226237
* @return the latest state or <code>null</code> if no state was found.
227238
*/
228-
public static <T> T loadLatestState(ESLogger logger, MetaDataStateFormat<T> format, Pattern pattern, String stateType, Path... dataLocations) throws IOException {
229-
List<PathAndVersion> files = new ArrayList<>();
230-
long maxVersion = -1;
231-
boolean maxVersionIsLegacy = true;
239+
public T loadLatestState(ESLogger logger, Path... dataLocations) throws IOException {
240+
List<PathAndStateId> files = new ArrayList<>();
241+
long maxStateId = -1;
242+
boolean maxStateIdIsLegacy = true;
232243
if (dataLocations != null) { // select all eligable files first
233244
for (Path dataLocation : dataLocations) {
234245
final Path stateDir = dataLocation.resolve(STATE_DIR_NAME);
@@ -238,13 +249,13 @@ public static <T> T loadLatestState(ESLogger logger, MetaDataStateFormat<T> form
238249
// now, iterate over the current versions, and find latest one
239250
try (DirectoryStream<Path> paths = Files.newDirectoryStream(stateDir)) { // we don't pass a glob since we need the group part for parsing
240251
for (Path stateFile : paths) {
241-
final Matcher matcher = pattern.matcher(stateFile.getFileName().toString());
252+
final Matcher matcher = stateFilePattern.matcher(stateFile.getFileName().toString());
242253
if (matcher.matches()) {
243-
final long version = Long.parseLong(matcher.group(1));
244-
maxVersion = Math.max(maxVersion, version);
254+
final long stateId = Long.parseLong(matcher.group(1));
255+
maxStateId = Math.max(maxStateId, stateId);
245256
final boolean legacy = MetaDataStateFormat.STATE_FILE_EXTENSION.equals(matcher.group(2)) == false;
246-
maxVersionIsLegacy &= legacy; // on purpose, see NOTE below
247-
PathAndVersion pav = new PathAndVersion(stateFile, version, legacy);
257+
maxStateIdIsLegacy &= legacy; // on purpose, see NOTE below
258+
PathAndStateId pav = new PathAndStateId(stateFile, stateId, legacy);
248259
logger.trace("found state file: {}", pav);
249260
files.add(pav);
250261
}
@@ -259,30 +270,30 @@ public static <T> T loadLatestState(ESLogger logger, MetaDataStateFormat<T> form
259270
// new format (ie. legacy == false) then we know that the latest version state ought to use this new format.
260271
// In case the state file with the latest version does not use the new format while older state files do,
261272
// the list below will be empty and loading the state will fail
262-
for (PathAndVersion pathAndVersion : Collections2.filter(files, new VersionAndLegacyPredicate(maxVersion, maxVersionIsLegacy))) {
273+
for (PathAndStateId pathAndStateId : Collections2.filter(files, new StateIdAndLegacyPredicate(maxStateId, maxStateIdIsLegacy))) {
263274
try {
264-
final Path stateFile = pathAndVersion.file;
265-
final long version = pathAndVersion.version;
275+
final Path stateFile = pathAndStateId.file;
276+
final long id = pathAndStateId.id;
266277
final XContentParser parser;
267-
if (pathAndVersion.legacy) { // read the legacy format -- plain XContent
278+
if (pathAndStateId.legacy) { // read the legacy format -- plain XContent
268279
final byte[] data = Files.readAllBytes(stateFile);
269280
if (data.length == 0) {
270-
logger.debug("{}: no data for [{}], ignoring...", stateType, stateFile.toAbsolutePath());
281+
logger.debug("{}: no data for [{}], ignoring...", prefix, stateFile.toAbsolutePath());
271282
continue;
272283
}
273284
parser = XContentHelper.createParser(data, 0, data.length);
274-
state = format.fromXContent(parser);
285+
state = fromXContent(parser);
275286
if (state == null) {
276-
logger.debug("{}: no data for [{}], ignoring...", stateType, stateFile.toAbsolutePath());
287+
logger.debug("{}: no data for [{}], ignoring...", prefix, stateFile.toAbsolutePath());
277288
}
278289
} else {
279-
state = format.read(stateFile, version);
280-
logger.trace("state version [{}] read from [{}]", version, stateFile.getFileName());
290+
state = read(stateFile);
291+
logger.trace("state id [{}] read from [{}]", id, stateFile.getFileName());
281292
}
282293
return state;
283294
} catch (Throwable e) {
284295
exceptions.add(e);
285-
logger.debug("{}: failed to read [{}], ignoring...", e, pathAndVersion.file.toAbsolutePath(), stateType);
296+
logger.debug("{}: failed to read [{}], ignoring...", e, pathAndStateId.file.toAbsolutePath(), prefix);
286297
}
287298
}
288299
// if we reach this something went wrong
@@ -295,42 +306,42 @@ public static <T> T loadLatestState(ESLogger logger, MetaDataStateFormat<T> form
295306
}
296307

297308
/**
298-
* Filters out all {@link MetaDataStateFormat.PathAndVersion} instances with a different version than
309+
* Filters out all {@link org.elasticsearch.gateway.MetaDataStateFormat.PathAndStateId} instances with a different id than
299310
* the given one.
300311
*/
301-
private static final class VersionAndLegacyPredicate implements Predicate<PathAndVersion> {
302-
private final long version;
312+
private static final class StateIdAndLegacyPredicate implements Predicate<PathAndStateId> {
313+
private final long id;
303314
private final boolean legacy;
304315

305-
VersionAndLegacyPredicate(long version, boolean legacy) {
306-
this.version = version;
316+
StateIdAndLegacyPredicate(long id, boolean legacy) {
317+
this.id = id;
307318
this.legacy = legacy;
308319
}
309320

310321
@Override
311-
public boolean apply(PathAndVersion input) {
312-
return input.version == version && input.legacy == legacy;
322+
public boolean apply(PathAndStateId input) {
323+
return input.id == id && input.legacy == legacy;
313324
}
314325
}
315326

316327
/**
317-
* Internal struct-like class that holds the parsed state version, the file
328+
* Internal struct-like class that holds the parsed state id, the file
318329
* and a flag if the file is a legacy state ie. pre 1.5
319330
*/
320-
private static class PathAndVersion {
331+
private static class PathAndStateId {
321332
final Path file;
322-
final long version;
333+
final long id;
323334
final boolean legacy;
324335

325-
private PathAndVersion(Path file, long version, boolean legacy) {
336+
private PathAndStateId(Path file, long id, boolean legacy) {
326337
this.file = file;
327-
this.version = version;
338+
this.id = id;
328339
this.legacy = legacy;
329340
}
330341

331342
@Override
332343
public String toString() {
333-
return "[version:" + version + ", legacy:" + legacy + ", file:" + file.toAbsolutePath() + "]";
344+
return "[id:" + id + ", legacy:" + legacy + ", file:" + file.toAbsolutePath() + "]";
334345
}
335346
}
336347

src/main/java/org/elasticsearch/gateway/MetaStateService.java

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import java.io.IOException;
3737
import java.util.Map;
3838
import java.util.Set;
39-
import java.util.regex.Pattern;
4039

4140
/**
4241
* Handles writing and loading both {@link MetaData} and {@link IndexMetaData}
@@ -47,22 +46,20 @@ public class MetaStateService extends AbstractComponent {
4746

4847
static final String GLOBAL_STATE_FILE_PREFIX = "global-";
4948
private static final String INDEX_STATE_FILE_PREFIX = "state-";
50-
static final Pattern GLOBAL_STATE_FILE_PATTERN = Pattern.compile(GLOBAL_STATE_FILE_PREFIX + "(\\d+)(" + MetaDataStateFormat.STATE_FILE_EXTENSION + ")?");
51-
static final Pattern INDEX_STATE_FILE_PATTERN = Pattern.compile(INDEX_STATE_FILE_PREFIX + "(\\d+)(" + MetaDataStateFormat.STATE_FILE_EXTENSION + ")?");
52-
private static final String GLOBAL_STATE_LOG_TYPE = "[_global]";
5349

5450
private final NodeEnvironment nodeEnv;
5551

5652
private final XContentType format;
5753
private final ToXContent.Params formatParams;
5854
private final ToXContent.Params gatewayModeFormatParams;
55+
private final MetaDataStateFormat<IndexMetaData> indexStateFormat;
56+
private final MetaDataStateFormat<MetaData> globalStateFormat;
5957

6058
@Inject
6159
public MetaStateService(Settings settings, NodeEnvironment nodeEnv) {
6260
super(settings);
6361
this.nodeEnv = nodeEnv;
6462
this.format = XContentType.fromRestContentType(settings.get(FORMAT_SETTING, "smile"));
65-
6663
if (this.format == XContentType.SMILE) {
6764
Map<String, String> params = Maps.newHashMap();
6865
params.put("binary", "true");
@@ -77,6 +74,9 @@ public MetaStateService(Settings settings, NodeEnvironment nodeEnv) {
7774
gatewayModeParams.put(MetaData.CONTEXT_MODE_PARAM, MetaData.CONTEXT_MODE_GATEWAY);
7875
gatewayModeFormatParams = new ToXContent.MapParams(gatewayModeParams);
7976
}
77+
indexStateFormat = indexStateFormat(format, formatParams);
78+
globalStateFormat = globalStateFormat(format, gatewayModeFormatParams);
79+
8080
}
8181

8282
/**
@@ -109,26 +109,23 @@ MetaData loadFullState() throws Exception {
109109
*/
110110
@Nullable
111111
IndexMetaData loadIndexState(String index) throws IOException {
112-
return MetaDataStateFormat.loadLatestState(logger, indexStateFormat(format, formatParams, true),
113-
INDEX_STATE_FILE_PATTERN, "[" + index + "]", nodeEnv.indexPaths(new Index(index)));
112+
return indexStateFormat.loadLatestState(logger, nodeEnv.indexPaths(new Index(index)));
114113
}
115114

116115
/**
117116
* Loads the global state, *without* index state, see {@link #loadFullState()} for that.
118117
*/
119118
MetaData loadGlobalState() throws IOException {
120-
return MetaDataStateFormat.loadLatestState(logger, globalStateFormat(format, gatewayModeFormatParams, true), GLOBAL_STATE_FILE_PATTERN, GLOBAL_STATE_LOG_TYPE, nodeEnv.nodeDataPaths());
119+
return globalStateFormat.loadLatestState(logger, nodeEnv.nodeDataPaths());
121120
}
122121

123122
/**
124123
* Writes the index state.
125124
*/
126125
void writeIndex(String reason, IndexMetaData indexMetaData, @Nullable IndexMetaData previousIndexMetaData) throws Exception {
127126
logger.trace("[{}] writing state, reason [{}]", indexMetaData.index(), reason);
128-
final boolean deleteOldFiles = previousIndexMetaData != null && previousIndexMetaData.version() != indexMetaData.version();
129-
final MetaDataStateFormat<IndexMetaData> writer = indexStateFormat(format, formatParams, deleteOldFiles);
130127
try {
131-
writer.write(indexMetaData, INDEX_STATE_FILE_PREFIX, indexMetaData.version(),
128+
indexStateFormat.write(indexMetaData, indexMetaData.version(),
132129
nodeEnv.indexPaths(new Index(indexMetaData.index())));
133130
} catch (Throwable ex) {
134131
logger.warn("[{}]: failed to write index state", ex, indexMetaData.index());
@@ -140,21 +137,20 @@ void writeIndex(String reason, IndexMetaData indexMetaData, @Nullable IndexMetaD
140137
* Writes the global state, *without* the indices states.
141138
*/
142139
void writeGlobalState(String reason, MetaData metaData) throws Exception {
143-
logger.trace("{} writing state, reason [{}]", GLOBAL_STATE_LOG_TYPE, reason);
144-
final MetaDataStateFormat<MetaData> writer = globalStateFormat(format, gatewayModeFormatParams, true);
140+
logger.trace("[_global] writing state, reason [{}]", reason);
145141
try {
146-
writer.write(metaData, GLOBAL_STATE_FILE_PREFIX, metaData.version(), nodeEnv.nodeDataPaths());
142+
globalStateFormat.write(metaData, metaData.version(), nodeEnv.nodeDataPaths());
147143
} catch (Throwable ex) {
148-
logger.warn("{}: failed to write global state", ex, GLOBAL_STATE_LOG_TYPE);
144+
logger.warn("[_global]: failed to write global state", ex);
149145
throw new IOException("failed to write global state", ex);
150146
}
151147
}
152148

153149
/**
154150
* Returns a StateFormat that can read and write {@link MetaData}
155151
*/
156-
static MetaDataStateFormat<MetaData> globalStateFormat(XContentType format, final ToXContent.Params formatParams, final boolean deleteOldFiles) {
157-
return new MetaDataStateFormat<MetaData>(format, deleteOldFiles) {
152+
static MetaDataStateFormat<MetaData> globalStateFormat(XContentType format, final ToXContent.Params formatParams) {
153+
return new MetaDataStateFormat<MetaData>(format, GLOBAL_STATE_FILE_PREFIX) {
158154

159155
@Override
160156
public void toXContent(XContentBuilder builder, MetaData state) throws IOException {
@@ -171,8 +167,8 @@ public MetaData fromXContent(XContentParser parser) throws IOException {
171167
/**
172168
* Returns a StateFormat that can read and write {@link IndexMetaData}
173169
*/
174-
static MetaDataStateFormat<IndexMetaData> indexStateFormat(XContentType format, final ToXContent.Params formatParams, boolean deleteOldFiles) {
175-
return new MetaDataStateFormat<IndexMetaData>(format, deleteOldFiles) {
170+
static MetaDataStateFormat<IndexMetaData> indexStateFormat(XContentType format, final ToXContent.Params formatParams) {
171+
return new MetaDataStateFormat<IndexMetaData>(format, INDEX_STATE_FILE_PREFIX) {
176172

177173
@Override
178174
public void toXContent(XContentBuilder builder, IndexMetaData state) throws IOException {

0 commit comments

Comments
 (0)