2727import org .apache .lucene .store .Directory ;
2828import org .apache .lucene .store .IOContext ;
2929import org .apache .lucene .store .IndexInput ;
30- import org .apache .lucene .store .OutputStreamIndexOutput ;
30+ import org .apache .lucene .store .IndexOutput ;
3131import org .apache .lucene .store .SimpleFSDirectory ;
3232import org .elasticsearch .ExceptionsHelper ;
3333import org .elasticsearch .common .logging .Loggers ;
4848import java .nio .file .Files ;
4949import java .nio .file .NoSuchFileException ;
5050import java .nio .file .Path ;
51- import java .nio .file .StandardCopyOption ;
5251import java .util .ArrayList ;
53- import java .util .Collection ;
52+ import java .util .Arrays ;
53+ import java .util .Collections ;
5454import java .util .List ;
5555import java .util .regex .Matcher ;
5656import java .util .regex .Pattern ;
@@ -69,7 +69,6 @@ public abstract class MetaDataStateFormat<T> {
6969 private static final String STATE_FILE_CODEC = "state" ;
7070 private static final int MIN_COMPATIBLE_STATE_FILE_VERSION = 1 ;
7171 private static final int STATE_FILE_VERSION = 1 ;
72- private static final int BUFFER_SIZE = 4096 ;
7372 private final String prefix ;
7473 private final Pattern stateFilePattern ;
7574
@@ -81,16 +80,75 @@ public abstract class MetaDataStateFormat<T> {
8180 protected MetaDataStateFormat (String prefix ) {
8281 this .prefix = prefix ;
8382 this .stateFilePattern = Pattern .compile (Pattern .quote (prefix ) + "(\\ d+)(" + MetaDataStateFormat .STATE_FILE_EXTENSION + ")?" );
83+ }
84+
85+ private static void deleteFileIfExists (Path stateLocation , Directory directory , String fileName ) throws IOException {
86+ try {
87+ directory .deleteFile (fileName );
88+ } catch (FileNotFoundException | NoSuchFileException ignored ) {
89+
90+ }
91+ logger .trace ("cleaned up {}" , stateLocation .resolve (fileName ));
92+ }
93+
94+ private void writeStateToFirstLocation (final T state , Path stateLocation , Directory stateDir , String fileName , String tmpFileName )
95+ throws IOException {
96+ try {
97+ deleteFileIfExists (stateLocation , stateDir , tmpFileName );
98+ try (IndexOutput out = stateDir .createOutput (tmpFileName , IOContext .DEFAULT )) {
99+ CodecUtil .writeHeader (out , STATE_FILE_CODEC , STATE_FILE_VERSION );
100+ out .writeInt (FORMAT .index ());
101+ try (XContentBuilder builder = newXContentBuilder (FORMAT , new IndexOutputOutputStream (out ) {
102+ @ Override
103+ public void close () throws IOException {
104+ // this is important since some of the XContentBuilders write bytes on close.
105+ // in order to write the footer we need to prevent closing the actual index input.
106+ }
107+ })) {
108+
109+ builder .startObject ();
110+ {
111+ toXContent (builder , state );
112+ }
113+ builder .endObject ();
114+ }
115+ CodecUtil .writeFooter (out );
116+ }
84117
118+ stateDir .sync (Collections .singleton (tmpFileName ));
119+ stateDir .rename (tmpFileName , fileName );
120+ stateDir .syncMetaData ();
121+ logger .trace ("written state to {}" , stateLocation .resolve (fileName ));
122+ } finally {
123+ deleteFileIfExists (stateLocation , stateDir , tmpFileName );
124+ }
125+ }
126+
127+ private void copyStateToExtraLocation (Directory srcStateDir , Path extraStateLocation , String fileName , String tmpFileName )
128+ throws IOException {
129+ try (Directory extraStateDir = newDirectory (extraStateLocation )) {
130+ try {
131+ deleteFileIfExists (extraStateLocation , extraStateDir , tmpFileName );
132+ extraStateDir .copyFrom (srcStateDir , fileName , tmpFileName , IOContext .DEFAULT );
133+ extraStateDir .sync (Collections .singleton (tmpFileName ));
134+ extraStateDir .rename (tmpFileName , fileName );
135+ extraStateDir .syncMetaData ();
136+ logger .trace ("copied state to {}" , extraStateLocation .resolve (fileName ));
137+ } finally {
138+ deleteFileIfExists (extraStateLocation , extraStateDir , tmpFileName );
139+ }
140+ }
85141 }
86142
87143 /**
88144 * Writes the given state to the given directories. The state is written to a
89145 * state directory ({@value #STATE_DIR_NAME}) underneath each of the given file locations and is created if it
90146 * doesn't exist. The state is serialized to a temporary file in that directory and is then atomically moved to
91147 * it's target filename of the pattern {@code {prefix}{version}.st}.
148+ * If this method returns without exception there is a guarantee that state is persisted to the disk and loadLatestState will return it.
149+ * But if this method throws an exception, loadLatestState could return this state or some previous state.
92150 *
93- * @param state the state object to write
151+ * @param state the state object to write
94152 * @param locations the locations where the state should be written to.
95153 * @throws IOException if an IOException occurs
96154 */
@@ -101,60 +159,22 @@ public final void write(final T state, final Path... locations) throws IOExcepti
101159 if (locations .length <= 0 ) {
102160 throw new IllegalArgumentException ("One or more locations required" );
103161 }
104- final long maxStateId = findMaxStateId (prefix , locations )+ 1 ;
162+ final long maxStateId = findMaxStateId (prefix , locations ) + 1 ;
105163 assert maxStateId >= 0 : "maxStateId must be positive but was: [" + maxStateId + "]" ;
164+
106165 final String fileName = prefix + maxStateId + STATE_FILE_EXTENSION ;
107- Path stateLocation = locations [0 ].resolve (STATE_DIR_NAME );
108- Files .createDirectories (stateLocation );
109- final Path tmpStatePath = stateLocation .resolve (fileName + ".tmp" );
110- final Path finalStatePath = stateLocation .resolve (fileName );
111- try {
112- final String resourceDesc = "MetaDataStateFormat.write(path=\" " + tmpStatePath + "\" )" ;
113- try (OutputStreamIndexOutput out =
114- new OutputStreamIndexOutput (resourceDesc , fileName , Files .newOutputStream (tmpStatePath ), BUFFER_SIZE )) {
115- CodecUtil .writeHeader (out , STATE_FILE_CODEC , STATE_FILE_VERSION );
116- out .writeInt (FORMAT .index ());
117- try (XContentBuilder builder = newXContentBuilder (FORMAT , new IndexOutputOutputStream (out ) {
118- @ Override
119- public void close () throws IOException {
120- // this is important since some of the XContentBuilders write bytes on close.
121- // in order to write the footer we need to prevent closing the actual index input.
122- } })) {
166+ final String tmpFileName = fileName + ".tmp" ;
167+ final Path firstStateLocation = locations [0 ].resolve (STATE_DIR_NAME );
168+ try (Directory stateDir = newDirectory (firstStateLocation )) {
169+ writeStateToFirstLocation (state , firstStateLocation , stateDir , fileName , tmpFileName );
123170
124- builder .startObject ();
125- {
126- toXContent (builder , state );
127- }
128- builder .endObject ();
129- }
130- CodecUtil .writeFooter (out );
131- }
132- IOUtils .fsync (tmpStatePath , false ); // fsync the state file
133- Files .move (tmpStatePath , finalStatePath , StandardCopyOption .ATOMIC_MOVE );
134- IOUtils .fsync (stateLocation , true );
135- logger .trace ("written state to {}" , finalStatePath );
136171 for (int i = 1 ; i < locations .length ; i ++) {
137- stateLocation = locations [i ].resolve (STATE_DIR_NAME );
138- Files .createDirectories (stateLocation );
139- Path tmpPath = stateLocation .resolve (fileName + ".tmp" );
140- Path finalPath = stateLocation .resolve (fileName );
141- try {
142- Files .copy (finalStatePath , tmpPath );
143- IOUtils .fsync (tmpPath , false ); // fsync the state file
144- // we are on the same FileSystem / Partition here we can do an atomic move
145- Files .move (tmpPath , finalPath , StandardCopyOption .ATOMIC_MOVE );
146- IOUtils .fsync (stateLocation , true );
147- logger .trace ("copied state to {}" , finalPath );
148- } finally {
149- Files .deleteIfExists (tmpPath );
150- logger .trace ("cleaned up {}" , tmpPath );
151- }
172+ final Path extraStateLocation = locations [i ].resolve (STATE_DIR_NAME );
173+ copyStateToExtraLocation (stateDir , extraStateLocation , fileName , tmpFileName );
152174 }
153- } finally {
154- Files .deleteIfExists (tmpStatePath );
155- logger .trace ("cleaned up {}" , tmpStatePath );
156175 }
157- cleanupOldFiles (prefix , fileName , locations );
176+
177+ cleanupOldFiles (fileName , locations );
158178 }
159179
160180 protected XContentBuilder newXContentBuilder (XContentType type , OutputStream stream ) throws IOException {
@@ -207,26 +227,29 @@ protected Directory newDirectory(Path dir) throws IOException {
207227 return new SimpleFSDirectory (dir );
208228 }
209229
210- private void cleanupOldFiles (final String prefix , final String currentStateFile , Path [] locations ) throws IOException {
211- final DirectoryStream .Filter <Path > filter = entry -> {
212- final String entryFileName = entry .getFileName ().toString ();
213- return Files .isRegularFile (entry )
214- && entryFileName .startsWith (prefix ) // only state files
215- && currentStateFile .equals (entryFileName ) == false ; // keep the current state file around
216- };
217- // now clean up the old files
218- for (Path dataLocation : locations ) {
219- logger .trace ("cleanupOldFiles: cleaning up {}" , dataLocation );
220- try (DirectoryStream <Path > stream = Files .newDirectoryStream (dataLocation .resolve (STATE_DIR_NAME ), filter )) {
221- for (Path stateFile : stream ) {
222- Files .deleteIfExists (stateFile );
223- logger .trace ("cleanupOldFiles: cleaned up {}" , stateFile );
230+ private void cleanupOldFiles (final String currentStateFile , Path [] locations ) throws IOException {
231+ for (Path location : locations ) {
232+ logger .trace ("cleanupOldFiles: cleaning up {}" , location );
233+ Path stateLocation = location .resolve (STATE_DIR_NAME );
234+ try (Directory stateDir = newDirectory (stateLocation )) {
235+ for (String file : stateDir .listAll ()) {
236+ if (file .startsWith (prefix ) && file .equals (currentStateFile ) == false ) {
237+ deleteFileIfExists (stateLocation , stateDir , file );
238+ }
224239 }
225240 }
226241 }
227242 }
228243
229- long findMaxStateId (final String prefix , Path ... locations ) throws IOException {
244+ /**
245+ * Finds state file with maximum id.
246+ *
247+ * @param prefix - filename prefix
248+ * @param locations - paths to directories with state folder
249+ * @return maximum id of state file or -1 if no such files are found
250+ * @throws IOException if IOException occurs
251+ */
252+ private long findMaxStateId (final String prefix , Path ... locations ) throws IOException {
230253 long maxId = -1 ;
231254 for (Path dataLocation : locations ) {
232255 final Path resolve = dataLocation .resolve (STATE_DIR_NAME );
@@ -245,6 +268,24 @@ long findMaxStateId(final String prefix, Path... locations) throws IOException {
245268 return maxId ;
246269 }
247270
271+ private List <Path > findStateFilesByGeneration (final long generation , Path ... locations ) {
272+ List <Path > files = new ArrayList <>();
273+ if (generation == -1 ) {
274+ return files ;
275+ }
276+
277+ final String fileName = prefix + generation + STATE_FILE_EXTENSION ;
278+ for (Path dataLocation : locations ) {
279+ final Path stateFilePath = dataLocation .resolve (STATE_DIR_NAME ).resolve (fileName );
280+ if (Files .exists (stateFilePath )) {
281+ logger .trace ("found state file: {}" , stateFilePath );
282+ files .add (stateFilePath );
283+ }
284+ }
285+
286+ return files ;
287+ }
288+
248289 /**
249290 * Tries to load the latest state from the given data-locations. It tries to load the latest state determined by
250291 * the states version from one or more data directories and if none of the latest states can be loaded an exception
@@ -255,78 +296,39 @@ long findMaxStateId(final String prefix, Path... locations) throws IOException {
255296 * @return the latest state or <code>null</code> if no state was found.
256297 */
257298 public T loadLatestState (Logger logger , NamedXContentRegistry namedXContentRegistry , Path ... dataLocations ) throws IOException {
258- List <PathAndStateId > files = new ArrayList <>();
259- long maxStateId = -1 ;
260- if (dataLocations != null ) { // select all eligible files first
261- for (Path dataLocation : dataLocations ) {
262- final Path stateDir = dataLocation .resolve (STATE_DIR_NAME );
263- // now, iterate over the current versions, and find latest one
264- // we don't check if the stateDir is present since it could be deleted
265- // after the check. Also if there is a _state file and it's not a dir something is really wrong
266- // we don't pass a glob since we need the group part for parsing
267- try (DirectoryStream <Path > paths = Files .newDirectoryStream (stateDir )) {
268- for (Path stateFile : paths ) {
269- final Matcher matcher = stateFilePattern .matcher (stateFile .getFileName ().toString ());
270- if (matcher .matches ()) {
271- final long stateId = Long .parseLong (matcher .group (1 ));
272- maxStateId = Math .max (maxStateId , stateId );
273- PathAndStateId pav = new PathAndStateId (stateFile , stateId );
274- logger .trace ("found state file: {}" , pav );
275- files .add (pav );
276- }
277- }
278- } catch (NoSuchFileException | FileNotFoundException ex ) {
279- // no _state directory -- move on
280- }
281- }
299+ long maxStateId = findMaxStateId (prefix , dataLocations );
300+ List <Path > stateFiles = findStateFilesByGeneration (maxStateId , dataLocations );
301+
302+ if (maxStateId > -1 && stateFiles .isEmpty ()) {
303+ throw new IllegalStateException ("unable to find state files with state id " + maxStateId +
304+ " returned by findMaxStateId function, in data folders [" +
305+ Arrays .stream (dataLocations ).map (Path ::toAbsolutePath ).
306+ map (Object ::toString ).collect (Collectors .joining (", " )) +
307+ "], concurrent writes?" );
282308 }
283- // NOTE: we might have multiple version of the latest state if there are multiple data dirs.. for this case
284- // we iterate only over the ones with the max version.
285- long finalMaxStateId = maxStateId ;
286- Collection <PathAndStateId > pathAndStateIds = files
287- .stream ()
288- .filter (pathAndStateId -> pathAndStateId .id == finalMaxStateId )
289- .collect (Collectors .toCollection (ArrayList ::new ));
290309
291310 final List <Throwable > exceptions = new ArrayList <>();
292- for (PathAndStateId pathAndStateId : pathAndStateIds ) {
311+ for (Path stateFile : stateFiles ) {
293312 try {
294- T state = read (namedXContentRegistry , pathAndStateId . file );
295- logger .trace ("state id [{}] read from [{}]" , pathAndStateId . id , pathAndStateId . file .getFileName ());
313+ T state = read (namedXContentRegistry , stateFile );
314+ logger .trace ("state id [{}] read from [{}]" , maxStateId , stateFile .getFileName ());
296315 return state ;
297316 } catch (Exception e ) {
298- exceptions .add (new IOException ("failed to read " + pathAndStateId . toString (), e ));
317+ exceptions .add (new IOException ("failed to read " + stateFile . toAbsolutePath (), e ));
299318 logger .debug (() -> new ParameterizedMessage (
300- "{}: failed to read [{}], ignoring..." , pathAndStateId . file .toAbsolutePath (), prefix ), e );
319+ "{}: failed to read [{}], ignoring..." , stateFile .toAbsolutePath (), prefix ), e );
301320 }
302321 }
303322 // if we reach this something went wrong
304323 ExceptionsHelper .maybeThrowRuntimeAndSuppress (exceptions );
305- if (files .size () > 0 ) {
324+ if (stateFiles .size () > 0 ) {
306325 // We have some state files but none of them gave us a usable state
307- throw new IllegalStateException ("Could not find a state file to recover from among " + files );
326+ throw new IllegalStateException ("Could not find a state file to recover from among " +
327+ stateFiles .stream ().map (Path ::toAbsolutePath ).map (Object ::toString ).collect (Collectors .joining (", " )));
308328 }
309329 return null ;
310330 }
311331
312- /**
313- * Internal struct-like class that holds the parsed state id and the file
314- */
315- private static class PathAndStateId {
316- final Path file ;
317- final long id ;
318-
319- private PathAndStateId (Path file , long id ) {
320- this .file = file ;
321- this .id = id ;
322- }
323-
324- @ Override
325- public String toString () {
326- return "[id:" + id + ", file:" + file .toAbsolutePath () + "]" ;
327- }
328- }
329-
330332 /**
331333 * Deletes all meta state directories recursively for the given data locations
332334 * @param dataLocations the data location to delete
0 commit comments