-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Closed
Description
Elasticsearch version (bin/elasticsearch --version):
version >= 6.3.1
Plugins installed: [ defaults ]
JVM version (java -version):
Java(TM) SE Runtime Environment (build 1.8.0_102-b14)
OS version (uname -a if on a Unix-like system):
CentOS 6
Description of the problem including expected versus actual behavior:
The issue faced is when using the Java API BulkProcessor with RestHighLevelClient in client side applications. Bulk processor threads gets deadlocked, and Java application using BulkProcessing hangs without any data flush.
similar issues have been discussed here:
#26533 Java application using BulkProcessing hangs if elasticsearch hangs
#42528 BulkProcessor hangs instead of timeout
Cause of the deadlocked:
- User thread
Sink: ruleEngineEsSink_tc_bifurion_2c_bakusingBulkRequestHandlerflush data to ES asynchronously. User thread lockedBulkProcessorobject andBulkRequestHandlerblock current user thread by usinglatch.await(). - ES client scheduler thread
elasticsearch[scheduler][T#1]execute FlushTask whenBulkProcessor.flushIntervaltime is up. But scheduler thread is blocked, becasue ofBulkProcessorobject has been locked in user thread. - In step 1,
CountDownLatchonly can be released bylatch.countDown()in ActionListener's callback functiononResponse()oronFailure(). - In
Retry.RetryHandlerclass, when we executeonResponse()to parse bulkItemResponses and found any failure in bulkItemResponses, we will retry those failureBulkRequestby using scheduler which the same one in step 2, the scheduler isScheduledThreadPoolExecutoronly have one corePoolSize. And nowtimeelasticsearch[scheduler]has beenBLOCKED. Hence, the retry logic won't be executed and theCountDownLatchwon't be released in step 3. - As above, our application get hangs for threads deadlocked.
Thread dump:
"elasticsearch[scheduler][T#1]" #170 daemon prio=5 os_prio=0 tid=0x00000000021fc800 nid=0x3b5772 waiting for monitor entry [0x00007ffa839ba000]
java.lang.Thread.State: BLOCKED (on object monitor)
at org.elasticsearch.action.bulk.BulkProcessor$Flush.run(BulkProcessor.java:367)
- waiting to lock <0x00000007b20a24d8> (a org.elasticsearch.action.bulk.BulkProcessor)
at org.elasticsearch.threadpool.Scheduler$ReschedulingRunnable.doRun(Scheduler.java:182)
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
"Sink: ruleEngineEsSink_tc_bifurion_2c_bak (47/48)" #81 prio=5 os_prio=0 tid=0x00007ffad4267800 nid=0x3b56b0 waiting on condition [0x00007ffaa5787000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000007b2a0c738> (a java.util.concurrent.CountDownLatch$Sync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:86)
at org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:339)
at org.elasticsearch.action.bulk.BulkProcessor.executeIfNeeded(BulkProcessor.java:330)
at org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:288)
- locked <0x00000007b20a24d8> (a org.elasticsearch.action.bulk.BulkProcessor)
at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:271)
at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:267)
at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:253)
at com.ly.dc.furion.rule.engine.sink.Elasticsearch6BulkProcessorIndexer.add(Elasticsearch6BulkProcessorIndexer.java:71)
at com.ly.dc.furion.rule.engine.service.EsResponseService.lambda$createEsSink$16699246$1(EsResponseService.java:91)
at com.ly.dc.furion.rule.engine.service.EsResponseService$$Lambda$56/1098068091.process(Unknown Source)
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:306)
at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
at com.ly.dc.furion.rule.engine.sink.FurionResponseElasticsearchSink.invoke(FurionResponseElasticsearchSink.java:95)
at com.ly.dc.furion.rule.engine.sink.FurionResponseElasticsearchSink.invoke(FurionResponseElasticsearchSink.java:40)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
- locked <0x00000007b2305690> (a java.lang.Object)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
tankilo, laityc, jantent, DeeeFOX, michaelassraf and 1 more
Metadata
Metadata
Assignees
Labels
No labels