2020
2121import org .apache .commons .logging .Log ;
2222import org .apache .commons .logging .LogFactory ;
23+ import org .apache .http .ConnectionClosedException ;
2324import org .apache .http .Header ;
2425import org .apache .http .HttpEntity ;
2526import org .apache .http .HttpHost ;
3839import org .apache .http .client .protocol .HttpClientContext ;
3940import org .apache .http .client .utils .URIBuilder ;
4041import org .apache .http .concurrent .FutureCallback ;
42+ import org .apache .http .conn .ConnectTimeoutException ;
4143import org .apache .http .impl .auth .BasicScheme ;
4244import org .apache .http .impl .client .BasicAuthCache ;
4345import org .apache .http .impl .nio .client .CloseableHttpAsyncClient ;
4749
4850import java .io .Closeable ;
4951import java .io .IOException ;
52+ import java .net .SocketTimeoutException ;
5053import java .net .URI ;
5154import java .net .URISyntaxException ;
5255import java .util .ArrayList ;
@@ -201,6 +204,14 @@ public Response performRequest(String method, String endpoint, Map<String, Strin
201204 * they previously failed (the more failures, the later they will be retried). In case of failures all of the alive nodes (or dead
202205 * nodes that deserve a retry) are retried until one responds or none of them does, in which case an {@link IOException} will be thrown.
203206 *
207+ * This method works by performing an asynchronous call and waiting
208+ * for the result. If the asynchronous call throws an exception we wrap
209+ * it and rethrow it so that the stack trace attached to the exception
210+ * contains the call site. While we attempt to preserve the original
211+ * exception this isn't always possible and likely haven't covered all of
212+ * the cases. You can get the original exception from
213+ * {@link Exception#getCause()}.
214+ *
204215 * @param method the http method
205216 * @param endpoint the path of the request (without host and port)
206217 * @param params the query_string parameters
@@ -218,7 +229,8 @@ public Response performRequest(String method, String endpoint, Map<String, Strin
218229 HttpEntity entity , HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory ,
219230 Header ... headers ) throws IOException {
220231 SyncResponseListener listener = new SyncResponseListener (maxRetryTimeoutMillis );
221- performRequestAsync (method , endpoint , params , entity , httpAsyncResponseConsumerFactory , listener , headers );
232+ performRequestAsyncNoCatch (method , endpoint , params , entity , httpAsyncResponseConsumerFactory ,
233+ listener , headers );
222234 return listener .get ();
223235 }
224236
@@ -293,43 +305,50 @@ public void performRequestAsync(String method, String endpoint, Map<String, Stri
293305 HttpEntity entity , HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory ,
294306 ResponseListener responseListener , Header ... headers ) {
295307 try {
296- Objects .requireNonNull (params , "params must not be null" );
297- Map <String , String > requestParams = new HashMap <>(params );
298- //ignore is a special parameter supported by the clients, shouldn't be sent to es
299- String ignoreString = requestParams .remove ("ignore" );
300- Set <Integer > ignoreErrorCodes ;
301- if (ignoreString == null ) {
302- if (HttpHead .METHOD_NAME .equals (method )) {
303- //404 never causes error if returned for a HEAD request
304- ignoreErrorCodes = Collections .singleton (404 );
305- } else {
306- ignoreErrorCodes = Collections .emptySet ();
307- }
308+ performRequestAsyncNoCatch (method , endpoint , params , entity , httpAsyncResponseConsumerFactory ,
309+ responseListener , headers );
310+ } catch (Exception e ) {
311+ responseListener .onFailure (e );
312+ }
313+ }
314+
315+ void performRequestAsyncNoCatch (String method , String endpoint , Map <String , String > params ,
316+ HttpEntity entity , HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory ,
317+ ResponseListener responseListener , Header ... headers ) {
318+ Objects .requireNonNull (params , "params must not be null" );
319+ Map <String , String > requestParams = new HashMap <>(params );
320+ //ignore is a special parameter supported by the clients, shouldn't be sent to es
321+ String ignoreString = requestParams .remove ("ignore" );
322+ Set <Integer > ignoreErrorCodes ;
323+ if (ignoreString == null ) {
324+ if (HttpHead .METHOD_NAME .equals (method )) {
325+ //404 never causes error if returned for a HEAD request
326+ ignoreErrorCodes = Collections .singleton (404 );
308327 } else {
309- String [] ignoresArray = ignoreString .split ("," );
310- ignoreErrorCodes = new HashSet <>();
311- if (HttpHead .METHOD_NAME .equals (method )) {
312- //404 never causes error if returned for a HEAD request
313- ignoreErrorCodes .add (404 );
314- }
315- for (String ignoreCode : ignoresArray ) {
316- try {
317- ignoreErrorCodes .add (Integer .valueOf (ignoreCode ));
318- } catch (NumberFormatException e ) {
319- throw new IllegalArgumentException ("ignore value should be a number, found [" + ignoreString + "] instead" , e );
320- }
328+ ignoreErrorCodes = Collections .emptySet ();
329+ }
330+ } else {
331+ String [] ignoresArray = ignoreString .split ("," );
332+ ignoreErrorCodes = new HashSet <>();
333+ if (HttpHead .METHOD_NAME .equals (method )) {
334+ //404 never causes error if returned for a HEAD request
335+ ignoreErrorCodes .add (404 );
336+ }
337+ for (String ignoreCode : ignoresArray ) {
338+ try {
339+ ignoreErrorCodes .add (Integer .valueOf (ignoreCode ));
340+ } catch (NumberFormatException e ) {
341+ throw new IllegalArgumentException ("ignore value should be a number, found [" + ignoreString + "] instead" , e );
321342 }
322343 }
323- URI uri = buildUri (pathPrefix , endpoint , requestParams );
324- HttpRequestBase request = createHttpRequest (method , uri , entity );
325- setHeaders (request , headers );
326- FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener (responseListener );
327- long startTime = System .nanoTime ();
328- performRequestAsync (startTime , nextHost (), request , ignoreErrorCodes , httpAsyncResponseConsumerFactory ,
329- failureTrackingResponseListener );
330- } catch (Exception e ) {
331- responseListener .onFailure (e );
332344 }
345+ URI uri = buildUri (pathPrefix , endpoint , requestParams );
346+ HttpRequestBase request = createHttpRequest (method , uri , entity );
347+ setHeaders (request , headers );
348+ FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener (responseListener );
349+ long startTime = System .nanoTime ();
350+ performRequestAsync (startTime , nextHost (), request , ignoreErrorCodes , httpAsyncResponseConsumerFactory ,
351+ failureTrackingResponseListener );
333352 }
334353
335354 private void performRequestAsync (final long startTime , final HostTuple <Iterator <HttpHost >> hostTuple , final HttpRequestBase request ,
@@ -674,12 +693,35 @@ Response get() throws IOException {
674693 e .addSuppressed (exception );
675694 throw e ;
676695 }
677- //try and leave the exception untouched as much as possible but we don't want to just add throws Exception clause everywhere
696+ /*
697+ * Wrap and rethrow whatever exception we received, copying the type
698+ * where possible so the synchronous API looks as much as possible
699+ * like the asynchronous API. We wrap the exception so that the caller's
700+ * signature shows up in any exception we throw.
701+ */
702+ if (exception instanceof ResponseException ) {
703+ throw new ResponseException ((ResponseException ) exception );
704+ }
705+ if (exception instanceof ConnectTimeoutException ) {
706+ ConnectTimeoutException e = new ConnectTimeoutException (exception .getMessage ());
707+ e .initCause (exception );
708+ throw e ;
709+ }
710+ if (exception instanceof SocketTimeoutException ) {
711+ SocketTimeoutException e = new SocketTimeoutException (exception .getMessage ());
712+ e .initCause (exception );
713+ throw e ;
714+ }
715+ if (exception instanceof ConnectionClosedException ) {
716+ ConnectionClosedException e = new ConnectionClosedException (exception .getMessage ());
717+ e .initCause (exception );
718+ throw e ;
719+ }
678720 if (exception instanceof IOException ) {
679- throw ( IOException ) exception ;
721+ throw new IOException ( exception . getMessage (), exception ) ;
680722 }
681723 if (exception instanceof RuntimeException ){
682- throw ( RuntimeException ) exception ;
724+ throw new RuntimeException ( exception . getMessage (), exception ) ;
683725 }
684726 throw new RuntimeException ("error while performing request" , exception );
685727 }
0 commit comments