Skip to content

Sniffer.sniffOnFailure performs blocking request on AsyncClient callback, freezing all subsequent requests until it fails #27984

@reembs

Description

@reembs

Elasticsearch version (5.5.3):

Plugins installed: []

JVM version: Java HotSpot(TM) 64-Bit Server VM (build 25.65-b01, mixed mode)

OS version: Darwin MacBook-Pro-2.local 17.3.0 Darwin Kernel Version 17.3.0: Thu Nov 9 18:09:22 PST 2017; root:xnu-4570.31.3~1/RELEASE_X86_64 x86_64

Description of the problem including expected versus actual behavior:

Initializing the RestClient in the following manner (using SniffOnFailureListener):

SniffOnFailureListener sniffOnFailureListener = new SniffOnFailureListener();

RestClientBuilder builder = RestClient.builder(Iterables.toArray(hosts, HttpHost.class))
    .setMaxRetryTimeoutMillis(timeoutInMillis)
    .setFailureListener(sniffOnFailureListener);

RestClient restClient = builder.build();

Sniffer sniffer = Sniffer.builder(restClient)
    .setSniffIntervalMillis(SNIFFING_INTERVAL)
    .build();

sniffOnFailureListener.setSniffer(sniffer);

Causes a thread race and freeze due to lock in Apache AsyncHttpClient when cluster nodes terminate.

The issue seems to be at org.elasticsearch.client.sniff.Sniffer#sniffOnFailure line. This method invokes the sniffing process in a call stack that originates in the Async event loop's failure() callback. This blocking on the async HTTP client thread blocks all other requests to that HTTP client indefinitely (no timeout) until the sniffing request times out by itself.

Thread dump in blocked state:

"pool-7-thread-1" #68 prio=5 os_prio=31 tid=0x00007fbfd2b22000 nid=0x14703 waiting on condition [0x0000700010ee6000]
   java.lang.Thread.State: TIMED_WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x00000007bfb08000> (a java.util.concurrent.CountDownLatch$Sync)
	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
	at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277)
	at org.elasticsearch.client.RestClient$SyncResponseListener.get(RestClient.java:659)
	at org.elasticsearch.client.RestClient.performRequest(RestClient.java:219)
	at org.elasticsearch.client.RestClient.performRequest(RestClient.java:191)
	at org.elasticsearch.client.RestClient.performRequest(RestClient.java:170)
	at org.elasticsearch.client.sniff.ElasticsearchHostsSniffer.sniffHosts(ElasticsearchHostsSniffer.java:93)
	at org.elasticsearch.client.sniff.Sniffer$Task.sniff(Sniffer.java:113)
	at org.elasticsearch.client.sniff.Sniffer$Task.sniffOnFailure(Sniffer.java:107)
	at org.elasticsearch.client.sniff.Sniffer.sniffOnFailure(Sniffer.java:59)
	at org.elasticsearch.client.sniff.SniffOnFailureListener.onFailure(SniffOnFailureListener.java:62)
	at org.elasticsearch.client.RestClient.onFailure(RestClient.java:491)
	at org.elasticsearch.client.RestClient.access$400(RestClient.java:89)
	at org.elasticsearch.client.RestClient$1.failed(RestClient.java:374)
	at org.apache.http.concurrent.BasicFuture.failed(BasicFuture.java:134)
	at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.failed(AbstractClientExchangeHandler.java:419)
	at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.connectionRequestFailed(AbstractClientExchangeHandler.java:335)
	at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.access$100(AbstractClientExchangeHandler.java:62)
	at org.apache.http.impl.nio.client.AbstractClientExchangeHandler$1.failed(AbstractClientExchangeHandler.java:378)
	at org.apache.http.concurrent.BasicFuture.failed(BasicFuture.java:134)
	at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager$InternalPoolEntryCallback.failed(PoolingNHttpClientConnectionManager.java:504)
	at org.apache.http.concurrent.BasicFuture.failed(BasicFuture.java:134)
	at org.apache.http.nio.pool.RouteSpecificPool.timeout(RouteSpecificPool.java:168)
	at org.apache.http.nio.pool.AbstractNIOConnPool.requestTimeout(AbstractNIOConnPool.java:561)
	at org.apache.http.nio.pool.AbstractNIOConnPool$InternalSessionRequestCallback.timeout(AbstractNIOConnPool.java:822)
	at org.apache.http.impl.nio.reactor.SessionRequestImpl.timeout(SessionRequestImpl.java:183)
	- locked <0x00000007bfb06e50> (a org.apache.http.impl.nio.reactor.SessionRequestImpl)
	at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processTimeouts(DefaultConnectingIOReactor.java:210)
	at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:155)
	at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:348)
	at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:192)
	at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)
	at java.lang.Thread.run(Thread.java:745)
"KumulusThread-18" #33 daemon prio=5 os_prio=31 tid=0x00007fbfd1157000 nid=0x9203 waiting on condition [0x000070000eb7d000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x00000007606c1450> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
	at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
	at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
	at org.apache.http.nio.pool.AbstractNIOConnPool.lease(AbstractNIOConnPool.java:272)
	at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.requestConnection(PoolingNHttpClientConnectionManager.java:266)
	at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.requestConnection(AbstractClientExchangeHandler.java:363)
	at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.start(DefaultClientExchangeHandlerImpl.java:125)
	at org.apache.http.impl.nio.client.InternalHttpAsyncClient.execute(InternalHttpAsyncClient.java:141)
	at org.elasticsearch.client.RestClient.performRequestAsync(RestClient.java:343)
	at org.elasticsearch.client.RestClient.performRequestAsync(RestClient.java:325)
	at org.elasticsearch.client.RestClient.performRequestAsync(RestClient.java:268)
	at com.forter.utils.storm.bolts.elasticsearch.AbstractElasticsearchRestBolt.performRequest(AbstractElasticsearchRestBolt.java:115)
	at com.forter.utils.storm.bolts.elasticsearch.AbstractElasticsearchRestBolt.execute(AbstractElasticsearchRestBolt.java:142)
	at com.forter.utils.storm.builder.wrappers.impl.BaseBasicBoltWrapperFactory$ForterBasicBoltExecutor.execute(BaseBasicBoltWrapperFactory.java:64)
	at com.forter.utils.storm.bolts.decorators.RichBoltDecorator.execute(RichBoltDecorator.java:28)
	at com.forter.utils.storm.bolts.decorators.RichBoltDecorator.execute(RichBoltDecorator.java:28)
	at com.forter.utils.storm.builder.wrappers.ReadyStreamWrapperFactory$ReadyStreamWrapper.execute(ReadyStreamWrapperFactory.java:169)
	at com.forter.utils.storm.bolts.decorators.RichBoltDecorator.execute(RichBoltDecorator.java:28)
	at com.forter.utils.storm.bolts.decorators.RichBoltDecorator.execute(RichBoltDecorator.java:28)
	at com.forter.utils.storm.builder.wrappers.DebugBoltWrapperFactory$DebugBoltWrapper.execute(DebugBoltWrapperFactory.java:50)
	at com.forter.utils.storm.bolts.decorators.RichBoltDecorator.execute(RichBoltDecorator.java:28)
	at com.forter.monitoring.MonitoredBolt.execute(MonitoredBolt.java:96)
	at com.forter.utils.storm.builder.wrappers.impl.MonitorBoltWrapperFactory$BrainTopologyMonitorBolt.execute(MonitorBoltWrapperFactory.java:253)
	at com.forter.utils.storm.bolts.decorators.RichBoltDecorator.execute(RichBoltDecorator.java:28)
	at org.xyro.kumulus.component.KumulusBolt.execute(KumulusBolt.kt:38)
	at org.xyro.kumulus.KumulusTopology.handleQueueItem(KumulusTopology.kt:229)
	at org.xyro.kumulus.KumulusTopology.access$handleQueueItem(KumulusTopology.kt:13)
	at org.xyro.kumulus.KumulusTopology$boltExecutionPool$1.invoke(KumulusTopology.kt:19)
	at org.xyro.kumulus.KumulusTopology$boltExecutionPool$1.invoke(KumulusTopology.kt:13)
	at org.xyro.kumulus.ExecutionPool.threadMain(ExecutionPool.kt:36)
	at org.xyro.kumulus.ExecutionPool.access$threadMain(ExecutionPool.kt:7)
	at org.xyro.kumulus.ExecutionPool$1.invoke(ExecutionPool.kt:18)
	at org.xyro.kumulus.ExecutionPool$1.invoke(ExecutionPool.kt:7)
	at org.xyro.kumulus.ExecutionPoolKt$sam$Runnable$8b513235.run(ExecutionPool.kt)
	at java.lang.Thread.run(Thread.java:745)

Steps to reproduce:

  • Create a cluster of any size
  • Execute a Java app using rest client as initialized above to index/search the cluster repeatedly with timeout 1s.
  • Terminate a cluster node

Expected: the request execution using performRequest should not exceed ~1s

Actual: all calls to the rest-client after the onFailure sniffing method begins execution are blocked until the sniffing request that was caused by a previous request failure is timed-out.

Provide logs (if relevant):

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions