|
41 | 41 | import org.elasticsearch.index.IndexNotFoundException; |
42 | 42 | import org.elasticsearch.index.engine.RecoveryEngineException; |
43 | 43 | import org.elasticsearch.index.mapper.MapperException; |
44 | | -import org.elasticsearch.index.seqno.SequenceNumbers; |
45 | 44 | import org.elasticsearch.index.seqno.SequenceNumbersService; |
46 | 45 | import org.elasticsearch.index.shard.IllegalIndexShardStateException; |
47 | 46 | import org.elasticsearch.index.shard.IndexEventListener; |
|
62 | 61 | import org.elasticsearch.transport.TransportService; |
63 | 62 |
|
64 | 63 | import java.io.IOException; |
65 | | -import java.util.List; |
66 | 64 | import java.util.Optional; |
67 | 65 | import java.util.concurrent.atomic.AtomicLong; |
68 | 66 | import java.util.concurrent.atomic.AtomicReference; |
69 | | -import java.util.function.Function; |
70 | 67 |
|
71 | 68 | import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; |
72 | 69 |
|
@@ -172,21 +169,24 @@ private void doRecovery(final long recoveryId) { |
172 | 169 |
|
173 | 170 | try (RecoveryRef recoveryRef = onGoingRecoveries.getRecovery(recoveryId)) { |
174 | 171 | if (recoveryRef == null) { |
175 | | - logger.trace("not running recovery with id [{}] - can't find it (probably finished)", recoveryId); |
| 172 | + logger.trace("not running recovery with id [{}] - can not find it (probably finished)", recoveryId); |
176 | 173 | return; |
177 | 174 | } |
178 | 175 | final RecoveryTarget recoveryTarget = recoveryRef.target(); |
179 | | - assert recoveryTarget.sourceNode() != null : "can't do a recovery without a source node"; |
180 | | - |
181 | | - final Optional<StartRecoveryRequest> maybeRequest = getStartRecoveryRequest(recoveryTarget); |
182 | | - if (!maybeRequest.isPresent()) return; |
183 | | - else request = maybeRequest.get(); |
184 | | - |
185 | 176 | cancellableThreads = recoveryTarget.cancellableThreads(); |
186 | 177 | timer = recoveryTarget.state().getTimer(); |
187 | | - |
188 | | - logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); |
189 | | - recoveryTarget.indexShard().prepareForIndexRecovery(); |
| 178 | + try { |
| 179 | + assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node"; |
| 180 | + request = getStartRecoveryRequest(recoveryTarget); |
| 181 | + logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); |
| 182 | + recoveryTarget.indexShard().prepareForIndexRecovery(); |
| 183 | + } catch (final Exception e) { |
| 184 | + // this will be logged as warning later on... |
| 185 | + logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e); |
| 186 | + onGoingRecoveries.failRecovery(recoveryId, |
| 187 | + new RecoveryFailedException(recoveryTarget.state(), "failed to prepare shard for recovery", e), true); |
| 188 | + return; |
| 189 | + } |
190 | 190 | } |
191 | 191 |
|
192 | 192 | try { |
@@ -289,90 +289,70 @@ public RecoveryResponse newInstance() { |
289 | 289 | } |
290 | 290 | } |
291 | 291 |
|
292 | | - @SuppressWarnings("OptionalUsedAsFieldOrParameterType") |
293 | | - private static Optional<Store.MetadataSnapshot> EMPTY_METADATA_SNAPSHOT = Optional.of(Store.MetadataSnapshot.EMPTY); |
294 | | - |
295 | 292 | /** |
296 | | - * Obtains a snapshot of the store metadata for the recovery target, or an empty {@link Optional} if obtaining the store metadata |
297 | | - * failed. |
| 293 | + * Obtains a snapshot of the store metadata for the recovery target. |
298 | 294 | * |
299 | 295 | * @param recoveryTarget the target of the recovery |
300 | | - * @return a snapshot of the store metdata, or an empty {@link Optional} |
| 296 | + * @return a snapshot of the store metdata |
301 | 297 | */ |
302 | | - private Optional<Store.MetadataSnapshot> getStoreMetadataSnapshot(final RecoveryTarget recoveryTarget) { |
| 298 | + private Store.MetadataSnapshot getStoreMetadataSnapshot(final RecoveryTarget recoveryTarget) { |
303 | 299 | try { |
304 | 300 | if (recoveryTarget.indexShard().indexSettings().isOnSharedFilesystem()) { |
305 | 301 | // we are not going to copy any files, so don't bother listing files, potentially running into concurrency issues with the |
306 | 302 | // primary changing files underneath us |
307 | | - return EMPTY_METADATA_SNAPSHOT; |
| 303 | + return Store.MetadataSnapshot.EMPTY; |
308 | 304 | } else { |
309 | | - return Optional.of(recoveryTarget.indexShard().snapshotStoreMetadata()); |
| 305 | + return recoveryTarget.indexShard().snapshotStoreMetadata(); |
310 | 306 | } |
311 | | - } catch (org.apache.lucene.index.IndexNotFoundException e) { |
| 307 | + } catch (final org.apache.lucene.index.IndexNotFoundException e) { |
312 | 308 | // happens on an empty folder. no need to log |
313 | | - logger.trace("{} shard folder empty, recover all files", recoveryTarget); |
314 | | - return EMPTY_METADATA_SNAPSHOT; |
| 309 | + logger.trace("{} shard folder empty, recovering all files", recoveryTarget); |
| 310 | + return Store.MetadataSnapshot.EMPTY; |
315 | 311 | } catch (final IOException e) { |
316 | | - logger.warn("error while listing local files, recover as if there are none", e); |
317 | | - return EMPTY_METADATA_SNAPSHOT; |
318 | | - } catch (final Exception e) { |
319 | | - // this will be logged as warning later on... |
320 | | - logger.trace("unexpected error while listing local files, failing recovery", e); |
321 | | - onGoingRecoveries.failRecovery(recoveryTarget.recoveryId(), |
322 | | - new RecoveryFailedException(recoveryTarget.state(), "failed to list local files", e), true); |
323 | | - return Optional.empty(); |
| 312 | + logger.warn("error while listing local files, recovering as if there are none", e); |
| 313 | + return Store.MetadataSnapshot.EMPTY; |
324 | 314 | } |
325 | 315 | } |
326 | 316 |
|
327 | 317 | /** |
328 | | - * Prepare the start recovery request, returning an empty {@link Optional} instance if preparing the start request failed. |
| 318 | + * Prepare the start recovery request. |
329 | 319 | * |
330 | 320 | * @param recoveryTarget the target of the recovery |
331 | | - * @return a start recovery request, or an empty {@link Optional} |
| 321 | + * @return a start recovery request |
332 | 322 | */ |
333 | | - private Optional<StartRecoveryRequest> getStartRecoveryRequest(final RecoveryTarget recoveryTarget) { |
| 323 | + private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recoveryTarget) { |
334 | 324 | final StartRecoveryRequest request; |
335 | 325 | logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode()); |
336 | 326 |
|
337 | | - final Optional<Store.MetadataSnapshot> metadataSnapshot = getStoreMetadataSnapshot(recoveryTarget); |
338 | | - if (!metadataSnapshot.isPresent()) return Optional.empty(); |
339 | | - logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.get().size()); |
| 327 | + final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(recoveryTarget); |
| 328 | + logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size()); |
340 | 329 |
|
341 | | - try { |
342 | | - final long startingSeqNo; |
343 | | - if (metadataSnapshot.get().size() > 0) { |
344 | | - startingSeqNo = getStartingSeqNo(recoveryTarget); |
345 | | - } else { |
346 | | - startingSeqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; |
347 | | - } |
348 | | - |
349 | | - if (startingSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) { |
350 | | - logger.trace("{} preparing for file-based recovery from [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode()); |
351 | | - } else { |
352 | | - logger.trace( |
353 | | - "{} preparing for sequence number-based recovery starting at local checkpoint [{}] from [{}]", |
354 | | - recoveryTarget.shardId(), |
355 | | - startingSeqNo, |
356 | | - recoveryTarget.sourceNode()); |
357 | | - } |
| 330 | + final long startingSeqNo; |
| 331 | + if (metadataSnapshot.size() > 0) { |
| 332 | + startingSeqNo = getStartingSeqNo(recoveryTarget); |
| 333 | + } else { |
| 334 | + startingSeqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; |
| 335 | + } |
358 | 336 |
|
359 | | - request = new StartRecoveryRequest( |
| 337 | + if (startingSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) { |
| 338 | + logger.trace("{} preparing for file-based recovery from [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode()); |
| 339 | + } else { |
| 340 | + logger.trace( |
| 341 | + "{} preparing for sequence number-based recovery starting at local checkpoint [{}] from [{}]", |
360 | 342 | recoveryTarget.shardId(), |
361 | | - recoveryTarget.sourceNode(), |
362 | | - clusterService.localNode(), |
363 | | - metadataSnapshot.get(), |
364 | | - recoveryTarget.state().getPrimary(), |
365 | | - recoveryTarget.recoveryId(), |
366 | | - startingSeqNo); |
367 | | - |
368 | | - } catch (final Exception e) { |
369 | | - // this will be logged as warning later on... |
370 | | - logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e); |
371 | | - onGoingRecoveries.failRecovery(recoveryTarget.recoveryId(), |
372 | | - new RecoveryFailedException(recoveryTarget.state(), "failed to prepare shard for recovery", e), true); |
373 | | - return Optional.empty(); |
| 343 | + startingSeqNo, |
| 344 | + recoveryTarget.sourceNode()); |
374 | 345 | } |
375 | | - return Optional.of(request); |
| 346 | + |
| 347 | + request = new StartRecoveryRequest( |
| 348 | + recoveryTarget.shardId(), |
| 349 | + recoveryTarget.sourceNode(), |
| 350 | + clusterService.localNode(), |
| 351 | + metadataSnapshot, |
| 352 | + recoveryTarget.state().getPrimary(), |
| 353 | + recoveryTarget.recoveryId(), |
| 354 | + startingSeqNo); |
| 355 | + return request; |
376 | 356 | } |
377 | 357 |
|
378 | 358 | /** |
|
0 commit comments