5959import org .elasticsearch .tasks .Task ;
6060import org .elasticsearch .threadpool .ThreadPool ;
6161import org .elasticsearch .transport .ConnectTransportException ;
62- import org .elasticsearch .transport .FutureTransportResponseHandler ;
6362import org .elasticsearch .transport .TransportChannel ;
63+ import org .elasticsearch .transport .TransportException ;
6464import org .elasticsearch .transport .TransportRequestHandler ;
6565import org .elasticsearch .transport .TransportResponse ;
66+ import org .elasticsearch .transport .TransportResponseHandler ;
6667import org .elasticsearch .transport .TransportService ;
6768
6869import java .io .IOException ;
6970import java .util .List ;
7071import java .util .StringJoiner ;
7172import java .util .concurrent .atomic .AtomicLong ;
72- import java .util .concurrent . atomic . AtomicReference ;
73+ import java .util .function . Consumer ;
7374
7475import static org .elasticsearch .common .unit .TimeValue .timeValueMillis ;
7576
@@ -142,6 +143,8 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh
142143 public void startRecovery (final IndexShard indexShard , final DiscoveryNode sourceNode , final RecoveryListener listener ) {
143144 // create a new recovery status, and process...
144145 final long recoveryId = onGoingRecoveries .startRecovery (indexShard , sourceNode , listener , recoverySettings .activityTimeout ());
146+ // we fork off quickly here and go async but this is called from the cluster state applier thread too and that can cause
147+ // assertions to trip if we executed it on the same thread hence we fork off to the generic threadpool.
145148 threadPool .generic ().execute (new RecoveryRunner (recoveryId ));
146149 }
147150
@@ -165,17 +168,16 @@ private void retryRecovery(final long recoveryId, final TimeValue retryAfter, fi
165168
166169 private void doRecovery (final long recoveryId ) {
167170 final StartRecoveryRequest request ;
168- final CancellableThreads cancellableThreads ;
169171 final RecoveryState .Timer timer ;
170-
172+ CancellableThreads cancellableThreads ;
171173 try (RecoveryRef recoveryRef = onGoingRecoveries .getRecovery (recoveryId )) {
172174 if (recoveryRef == null ) {
173175 logger .trace ("not running recovery with id [{}] - can not find it (probably finished)" , recoveryId );
174176 return ;
175177 }
176178 final RecoveryTarget recoveryTarget = recoveryRef .target ();
177- cancellableThreads = recoveryTarget .cancellableThreads ();
178179 timer = recoveryTarget .state ().getTimer ();
180+ cancellableThreads = recoveryTarget .cancellableThreads ();
179181 try {
180182 assert recoveryTarget .sourceNode () != null : "can not do a recovery without a source node" ;
181183 request = getStartRecoveryRequest (recoveryTarget );
@@ -189,51 +191,11 @@ private void doRecovery(final long recoveryId) {
189191 return ;
190192 }
191193 }
192-
193- try {
194- logger .trace ("{} starting recovery from {}" , request .shardId (), request .sourceNode ());
195- final AtomicReference <RecoveryResponse > responseHolder = new AtomicReference <>();
196- cancellableThreads .execute (() -> responseHolder .set (
197- transportService .submitRequest (request .sourceNode (), PeerRecoverySourceService .Actions .START_RECOVERY , request ,
198- new FutureTransportResponseHandler <RecoveryResponse >() {
199- @ Override
200- public RecoveryResponse read (StreamInput in ) throws IOException {
201- RecoveryResponse recoveryResponse = new RecoveryResponse ();
202- recoveryResponse .readFrom (in );
203- return recoveryResponse ;
204- }
205- }).txGet ()));
206- final RecoveryResponse recoveryResponse = responseHolder .get ();
207- final TimeValue recoveryTime = new TimeValue (timer .time ());
208- // do this through ongoing recoveries to remove it from the collection
209- onGoingRecoveries .markRecoveryAsDone (recoveryId );
210- if (logger .isTraceEnabled ()) {
211- StringBuilder sb = new StringBuilder ();
212- sb .append ('[' ).append (request .shardId ().getIndex ().getName ()).append (']' ).append ('[' ).append (request .shardId ().id ())
213- .append ("] " );
214- sb .append ("recovery completed from " ).append (request .sourceNode ()).append (", took[" ).append (recoveryTime ).append ("]\n " );
215- sb .append (" phase1: recovered_files [" ).append (recoveryResponse .phase1FileNames .size ()).append ("]" ).append (" with " +
216- "total_size of [" ).append (new ByteSizeValue (recoveryResponse .phase1TotalSize )).append ("]" )
217- .append (", took [" ).append (timeValueMillis (recoveryResponse .phase1Time )).append ("], throttling_wait [" ).append
218- (timeValueMillis (recoveryResponse .phase1ThrottlingWaitTime )).append (']' )
219- .append ("\n " );
220- sb .append (" : reusing_files [" ).append (recoveryResponse .phase1ExistingFileNames .size ()).append ("] with " +
221- "total_size of [" ).append (new ByteSizeValue (recoveryResponse .phase1ExistingTotalSize )).append ("]\n " );
222- sb .append (" phase2: start took [" ).append (timeValueMillis (recoveryResponse .startTime )).append ("]\n " );
223- sb .append (" : recovered [" ).append (recoveryResponse .phase2Operations ).append ("]" ).append (" transaction log " +
224- "operations" )
225- .append (", took [" ).append (timeValueMillis (recoveryResponse .phase2Time )).append ("]" )
226- .append ("\n " );
227- logger .trace ("{}" , sb );
228- } else {
229- logger .debug ("{} recovery done from [{}], took [{}]" , request .shardId (), request .sourceNode (), recoveryTime );
230- }
231- } catch (CancellableThreads .ExecutionCancelledException e ) {
232- logger .trace ("recovery cancelled" , e );
233- } catch (Exception e ) {
194+ Consumer <Exception > handleException = e -> {
234195 if (logger .isTraceEnabled ()) {
235196 logger .trace (() -> new ParameterizedMessage (
236- "[{}][{}] Got exception on recovery" , request .shardId ().getIndex ().getName (), request .shardId ().id ()), e );
197+ "[{}][{}] Got exception on recovery" , request .shardId ().getIndex ().getName (),
198+ request .shardId ().id ()), e );
237199 }
238200 Throwable cause = ExceptionsHelper .unwrapCause (e );
239201 if (cause instanceof CancellableThreads .ExecutionCancelledException ) {
@@ -267,14 +229,16 @@ public RecoveryResponse read(StreamInput in) throws IOException {
267229 }
268230
269231 if (cause instanceof DelayRecoveryException ) {
270- retryRecovery (recoveryId , cause , recoverySettings .retryDelayStateSync (), recoverySettings .activityTimeout ());
232+ retryRecovery (recoveryId , cause , recoverySettings .retryDelayStateSync (),
233+ recoverySettings .activityTimeout ());
271234 return ;
272235 }
273236
274237 if (cause instanceof ConnectTransportException ) {
275238 logger .debug ("delaying recovery of {} for [{}] due to networking error [{}]" , request .shardId (),
276239 recoverySettings .retryDelayNetwork (), cause .getMessage ());
277- retryRecovery (recoveryId , cause .getMessage (), recoverySettings .retryDelayNetwork (), recoverySettings .activityTimeout ());
240+ retryRecovery (recoveryId , cause .getMessage (), recoverySettings .retryDelayNetwork (),
241+ recoverySettings .activityTimeout ());
278242 return ;
279243 }
280244
@@ -285,6 +249,71 @@ public RecoveryResponse read(StreamInput in) throws IOException {
285249 }
286250
287251 onGoingRecoveries .failRecovery (recoveryId , new RecoveryFailedException (request , e ), true );
252+ };
253+
254+ try {
255+ logger .trace ("{} starting recovery from {}" , request .shardId (), request .sourceNode ());
256+ cancellableThreads .executeIO (() ->
257+ // we still execute under cancelableThreads here to ensure we interrupt any blocking call to the network if any
258+ // on the underlying transport. It's unclear if we need this here at all after moving to async execution but
259+ // the issues that a missing call to this could cause are sneaky and hard to debug. If we don't need it on this
260+ // call we can potentially remove it altogether which we should do it in a major release only with enough
261+ // time to test. This shoudl be done for 7.0 if possible
262+ transportService .submitRequest (request .sourceNode (), PeerRecoverySourceService .Actions .START_RECOVERY , request ,
263+ new TransportResponseHandler <RecoveryResponse >() {
264+ @ Override
265+ public void handleResponse (RecoveryResponse recoveryResponse ) {
266+ final TimeValue recoveryTime = new TimeValue (timer .time ());
267+ // do this through ongoing recoveries to remove it from the collection
268+ onGoingRecoveries .markRecoveryAsDone (recoveryId );
269+ if (logger .isTraceEnabled ()) {
270+ StringBuilder sb = new StringBuilder ();
271+ sb .append ('[' ).append (request .shardId ().getIndex ().getName ()).append (']' )
272+ .append ('[' ).append (request .shardId ().id ()).append ("] " );
273+ sb .append ("recovery completed from " ).append (request .sourceNode ()).append (", took[" ).append (recoveryTime )
274+ .append ("]\n " );
275+ sb .append (" phase1: recovered_files [" ).append (recoveryResponse .phase1FileNames .size ()).append ("]" )
276+ .append (" with total_size of [" ).append (new ByteSizeValue (recoveryResponse .phase1TotalSize )).append ("]" )
277+ .append (", took [" ).append (timeValueMillis (recoveryResponse .phase1Time )).append ("], throttling_wait [" )
278+ .append (timeValueMillis (recoveryResponse .phase1ThrottlingWaitTime )).append (']' ).append ("\n " );
279+ sb .append (" : reusing_files [" ).append (recoveryResponse .phase1ExistingFileNames .size ())
280+ .append ("] with total_size of [" ).append (new ByteSizeValue (recoveryResponse .phase1ExistingTotalSize ))
281+ .append ("]\n " );
282+ sb .append (" phase2: start took [" ).append (timeValueMillis (recoveryResponse .startTime )).append ("]\n " );
283+ sb .append (" : recovered [" ).append (recoveryResponse .phase2Operations ).append ("]" )
284+ .append (" transaction log operations" )
285+ .append (", took [" ).append (timeValueMillis (recoveryResponse .phase2Time )).append ("]" )
286+ .append ("\n " );
287+ logger .trace ("{}" , sb );
288+ } else {
289+ logger .debug ("{} recovery done from [{}], took [{}]" , request .shardId (), request .sourceNode (),
290+ recoveryTime );
291+ }
292+ }
293+
294+ @ Override
295+ public void handleException (TransportException e ) {
296+ handleException .accept (e );
297+ }
298+
299+ @ Override
300+ public String executor () {
301+ // we do some heavy work like refreshes in the response so fork off to the generic threadpool
302+ return ThreadPool .Names .GENERIC ;
303+ }
304+
305+ @ Override
306+ public RecoveryResponse read (StreamInput in ) throws IOException {
307+ RecoveryResponse recoveryResponse = new RecoveryResponse ();
308+ recoveryResponse .readFrom (in );
309+ return recoveryResponse ;
310+ }
311+ })
312+ );
313+ } catch (CancellableThreads .ExecutionCancelledException e ) {
314+ logger .trace ("recovery cancelled" , e );
315+ } catch (Exception e ) {
316+ handleException .accept (e );
288317 }
289318 }
290319
@@ -632,5 +661,4 @@ public void doRun() {
632661 doRecovery (recoveryId );
633662 }
634663 }
635-
636664}
0 commit comments