-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Description
If the watcher thread pool and associated queue is full, the write thread pool can fill with index requests that are blocked which will eventually backup the write thread pool and it's associated queue resulting in a complete rejections of all index requests.
This is most likely to happen in monitoring clusters that monitor many remote clusters and thus have many watches. However, this can happen on any type of cluster if the Watcher thread pool and associated queue is full.
The culprit is here
When the watcher queue is full the Watch is rejected. Once the Watch is reject it attempts to writes a history record so we know that that the Watch was rejected. When Watcher writes the history document, it needs a spot in in the write queue. The way Watcher is implemented that the write task is blocking, completely synchronous. So the write needs to complete before another history document can be written. Specifically it is blocking in a synchronized method here. This means that only 1 thread that uses this BulkProcessor can write, the rest of the threads will be "Blocked" by the sychnrozied method [1]. Normally this means that Watcher is only blocking Watcher since the threads that blocked are all from the watcher Threadpool. So far, we are OK...Watcher's ability to write the history is throttled to one at a time and pending writes will end up in the watcher queue and get processes on at a time. The problems start to happen when the watcher queue is full, it starts to reject and tries to write the history document from the current thread...which can be a thread out of the write thread pool. This can happen since the normal history write happens on the response of the .triggered-watch write. The response is not threaded and the reject exception is handled by the write thread.
..so this where things go bad. Now we have 1 watcher thread waiting for a response from the index call [3], all the other watcher threads are "Blocked" by that one watcher thread. If the watcher queue fills up, rejections start happening and threads from the write pool now start trying to write the history document (on the write thread) and also get "Blocked" due to the same synchronized method. This is becuase the exception handler uses the same bulkProcessor instance to write the document on exception. So the threads from the write pool get caught up in the synchronized method that backed up the watcher queue. So now we can have threads in the write thread pool that blocked by the same synchronous Watcher writes. This backs up the write queue and will eventually fill it up and start rejecting. Now the watcher writes can't happen because the write queue is rejecting, and the writes can't happen because watcher threads are blocking write threads. 💥 Deadlock. I am sure there are some cases where it doesn't completely deadlock, but rather heavily throttles the ability to write documents.
You can reproduce this behavior by adding Thread.sleep(1000) in the BulkRequestHandler just after the semaphore aquision here and run a lot of watches on a fast schedule [4].
There are more concurrent issues here such that the synchronized method trumps the semaphore lock downstream negating the ability to use the bulk processor concurrently and the fact that deletion of triggered watches are synchronous to the write of the history document.... But will address those separately.
This issue is only applicable for 6.5+ as of #32490 Increasing xpack.watcher.bulk.actions can help but won't prevent this issue entirely.
[1]
"elasticsearch[instance-0000000012][watcher][T#9]" daemon prio=5 tid=861 BLOCKED
at org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:320)
local variable: org.elasticsearch.action.index.IndexRequest#22
at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:305)
at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:301)
at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:287)
at org.elasticsearch.xpack.watcher.history.HistoryStore.put(HistoryStore.java:49)
local variable: org.elasticsearch.common.xcontent.XContentBuilder#204
at org.elasticsearch.xpack.watcher.execution.ExecutionService.execute(ExecutionService.java:314)
local variable: java.lang.String#354386
local variable: org.elasticsearch.xpack.watcher.execution.TriggeredExecutionContext#47
local variable: org.elasticsearch.xpack.core.watcher.history.WatchRecord$ExceptionWatchRecord#2
at org.elasticsearch.xpack.watcher.execution.ExecutionService.lambda$executeAsync$5(ExecutionService.java:394)
at org.elasticsearch.xpack.watcher.execution.ExecutionService$$Lambda$3807.run(<unknown string>)
at org.elasticsearch.xpack.watcher.execution.ExecutionService$WatchExecutionTask.run(ExecutionService.java:543)
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:660)
local variable: org.elasticsearch.common.util.concurrent.ThreadContext$$Lambda$2006#851
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
local variable: org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable#460
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
local variable: java.util.concurrent.ThreadPoolExecutor$Worker#35
at java.lang.Thread.run(Thread.java:748)
[2]
"elasticsearch[instance-0000000012][write][T#2]" daemon prio=5 tid=193 BLOCKED
at org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:320)
local variable: org.elasticsearch.action.index.IndexRequest#428
at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:305)
at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:301)
at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:287)
at org.elasticsearch.xpack.watcher.history.HistoryStore.put(HistoryStore.java:49)
local variable: org.elasticsearch.common.xcontent.XContentBuilder#212
at org.elasticsearch.xpack.watcher.execution.ExecutionService.executeAsync(ExecutionService.java:402)
local variable: org.elasticsearch.xpack.watcher.execution.TriggeredExecutionContext#959
local variable: org.elasticsearch.common.util.concurrent.EsRejectedExecutionException#20
local variable: org.elasticsearch.xpack.watcher.execution.TriggeredWatch#96
local variable: org.elasticsearch.xpack.core.watcher.history.WatchRecord$MessageWatchRecord#1
local variable: java.lang.String#2849348
at org.elasticsearch.xpack.watcher.execution.ExecutionService.executeTriggeredWatches(ExecutionService.java:260)
local variable: org.elasticsearch.action.bulk.BulkItemResponse#7
local variable: org.elasticsearch.action.bulk.BulkResponse#7
local variable: org.elasticsearch.common.collect.Tuple#77849
at org.elasticsearch.xpack.watcher.execution.ExecutionService.lambda$processEventsAsync$0(ExecutionService.java:199)
at org.elasticsearch.xpack.watcher.execution.ExecutionService$$Lambda$3805.accept(<unknown string>)
at org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:60)
local variable: org.elasticsearch.action.ActionListener$1#310814
at org.elasticsearch.action.support.ContextPreservingActionListener.onResponse(ContextPreservingActionListener.java:43)
local variable: org.elasticsearch.common.util.concurrent.ThreadContext$$Lambda$1870#93643
at org.elasticsearch.action.support.TransportAction$1.onResponse(TransportAction.java:85)
at org.elasticsearch.action.support.TransportAction$1.onResponse(TransportAction.java:81)
at org.elasticsearch.action.support.ContextPreservingActionListener.onResponse(ContextPreservingActionListener.java:43)
local variable: org.elasticsearch.common.util.concurrent.ThreadContext$$Lambda$1870#93644
at org.elasticsearch.action.bulk.TransportBulkAction$BulkOperation$1.finishHim(TransportBulkAction.java:450)
at org.elasticsearch.action.bulk.TransportBulkAction$BulkOperation$1.onResponse(TransportBulkAction.java:431)
at org.elasticsearch.action.bulk.TransportBulkAction$BulkOperation$1.onResponse(TransportBulkAction.java:420)
at org.elasticsearch.action.support.TransportAction$1.onResponse(TransportAction.java:85)
at org.elasticsearch.action.support.TransportAction$1.onResponse(TransportAction.java:81)
at org.elasticsearch.action.support.ContextPreservingActionListener.onResponse(ContextPreservingActionListener.java:43)
local variable: org.elasticsearch.common.util.concurrent.ThreadContext$$Lambda$1870#93645
at org.elasticsearch.action.support.replication.TransportReplicationAction$ReroutePhase.finishOnSuccess(TransportReplicationAction.java:970)
at org.elasticsearch.action.support.replication.TransportReplicationAction$ReroutePhase$1.handleResponse(TransportReplicationAction.java:889)
at org.elasticsearch.action.support.replication.TransportReplicationAction$ReroutePhase$1.handleResponse(TransportReplicationAction.java:873)
at org.elasticsearch.transport.TransportService$ContextRestoreResponseHandler.handleResponse(TransportService.java:1093)
local variable: org.elasticsearch.common.util.concurrent.ThreadContext$$Lambda$1870#93646
at org.elasticsearch.transport.TransportService$DirectResponseChannel.processResponse(TransportService.java:1174)
local variable: org.elasticsearch.transport.TransportService$DirectResponseChannel#48
local variable: org.elasticsearch.transport.TransportService$ContextRestoreResponseHandler#50
at org.elasticsearch.transport.TransportService$DirectResponseChannel.sendResponse(TransportService.java:1154)
at org.elasticsearch.transport.TaskTransportChannel.sendResponse(TaskTransportChannel.java:54)
at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction$2.onResponse(TransportReplicationAction.java:467)
local variable: org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction$2#1
at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction$2.onResponse(TransportReplicationAction.java:445)
at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryResult.respond(TransportReplicationAction.java:532)
at org.elasticsearch.action.support.replication.TransportWriteAction$WritePrimaryResult.respondIfPossible(TransportWriteAction.java:169)
at org.elasticsearch.action.support.replication.TransportWriteAction$WritePrimaryResult.respond(TransportWriteAction.java:159)
at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction.lambda$runWithPrimaryShardReference$2(TransportReplicationAction.java:423)
at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction$$Lambda$3721.accept(<unknown string>)
at org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:60)
local variable: org.elasticsearch.action.ActionListener$1#357575
at org.elasticsearch.action.support.replication.ReplicationOperation.finish(ReplicationOperation.java:257)
at org.elasticsearch.action.support.replication.ReplicationOperation.decPendingAndFinishIfNeeded(ReplicationOperation.java:238)
at org.elasticsearch.action.support.replication.ReplicationOperation.execute(ReplicationOperation.java:128)
at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction.runWithPrimaryShardReference(TransportReplicationAction.java:425)
local variable: org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryShardReference#1
at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction.lambda$doRun$0(TransportReplicationAction.java:371)
at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction$$Lambda$3719.accept(<unknown string>)
at org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:60)
local variable: org.elasticsearch.action.ActionListener$1#357576
at org.elasticsearch.index.shard.IndexShardOperationPermits.acquire(IndexShardOperationPermits.java:273)
at org.elasticsearch.index.shard.IndexShardOperationPermits.acquire(IndexShardOperationPermits.java:240)
at org.elasticsearch.index.shard.IndexShard.acquirePrimaryOperationPermit(IndexShard.java:2352)
at org.elasticsearch.action.support.replication.TransportReplicationAction.acquirePrimaryOperationPermit(TransportReplicationAction.java:988)
at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction.doRun(TransportReplicationAction.java:370)
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
local variable: org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction#1
at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryOperationTransportHandler.messageReceived(TransportReplicationAction.java:325)
at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryOperationTransportHandler.messageReceived(TransportReplicationAction.java:312)
at org.elasticsearch.xpack.security.transport.SecurityServerTransportInterceptor$ProfileSecuredRequestHandler$1.doRun(SecurityServerTransportInterceptor.java:250)
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
at org.elasticsearch.common.util.concurrent.EsExecutors$1.execute(EsExecutors.java:140)
at org.elasticsearch.xpack.security.transport.SecurityServerTransportInterceptor$ProfileSecuredRequestHandler.lambda$messageReceived$0(SecurityServerTransportInterceptor.java:299)
local variable: org.elasticsearch.xpack.security.transport.SecurityServerTransportInterceptor$ProfileSecuredRequestHandler$1#41
at org.elasticsearch.xpack.security.transport.SecurityServerTransportInterceptor$ProfileSecuredRequestHandler$$Lambda$2911.accept(<unknown string>)
at org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:60)
local variable: org.elasticsearch.action.ActionListener$1#357577
at org.elasticsearch.xpack.security.transport.ServerTransportFilter$NodeProfile.lambda$inbound$2(ServerTransportFilter.java:155)
at org.elasticsearch.xpack.security.transport.ServerTransportFilter$NodeProfile$$Lambda$2917.accept(<unknown string>)
at org.elasticsearch.xpack.security.authz.AuthorizationUtils$AsyncAuthorizer.maybeRun(AuthorizationUtils.java:177)
local variable: org.elasticsearch.xpack.security.authz.AuthorizationUtils$AsyncAuthorizer#216
at org.elasticsearch.xpack.security.authz.AuthorizationUtils$AsyncAuthorizer.setRunAsRoles(AuthorizationUtils.java:171)
at org.elasticsearch.xpack.security.authz.AuthorizationUtils$AsyncAuthorizer.authorize(AuthorizationUtils.java:159)
at org.elasticsearch.xpack.security.transport.ServerTransportFilter$NodeProfile.lambda$inbound$3(ServerTransportFilter.java:157)
at org.elasticsearch.xpack.security.transport.ServerTransportFilter$NodeProfile$$Lambda$2913.accept(<unknown string>)
at org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:60)
local variable: org.elasticsearch.action.ActionListener$1#357578
at org.elasticsearch.xpack.security.authc.AuthenticationService$Authenticator.lambda$authenticateAsync$2(AuthenticationService.java:177)
at org.elasticsearch.xpack.security.authc.AuthenticationService$Authenticator$$Lambda$2915.accept(<unknown string>)
at org.elasticsearch.xpack.security.authc.AuthenticationService$Authenticator.lambda$lookForExistingAuthentication$4(AuthenticationService.java:210)
at org.elasticsearch.xpack.security.authc.AuthenticationService$Authenticator$$Lambda$2916.run(<unknown string>)
at org.elasticsearch.xpack.security.authc.AuthenticationService$Authenticator.lookForExistingAuthentication(AuthenticationService.java:221)
at org.elasticsearch.xpack.security.authc.AuthenticationService$Authenticator.authenticateAsync(AuthenticationService.java:175)
at org.elasticsearch.xpack.security.authc.AuthenticationService$Authenticator.access$000(AuthenticationService.java:134)
at org.elasticsearch.xpack.security.authc.AuthenticationService.authenticate(AuthenticationService.java:104)
at org.elasticsearch.xpack.security.transport.ServerTransportFilter$NodeProfile.inbound(ServerTransportFilter.java:131)
at org.elasticsearch.xpack.security.transport.SecurityServerTransportInterceptor$ProfileSecuredRequestHandler.messageReceived(SecurityServerTransportInterceptor.java:306)
local variable: org.elasticsearch.common.util.concurrent.ThreadContext$$Lambda$1870#93647
at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:66)
local variable: org.elasticsearch.action.support.replication.ReplicationTask#115
at org.elasticsearch.transport.TransportService$7.doRun(TransportService.java:687)
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:759)
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
local variable: org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable#236
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
local variable: java.util.concurrent.ThreadPoolExecutor$Worker#28
at java.lang.Thread.run(Thread.java:748)
[3]
"elasticsearch[instance-0000000012][watcher][T#6]" daemon prio=5 tid=858 WAITING
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
local variable: java.util.concurrent.CountDownLatch$Sync#176
local variable: java.util.concurrent.locks.AbstractQueuedSynchronizer$Node#1653
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at org.elasticsearch.action.bulk.BulkRequestHandler.execute(BulkRequestHandler.java:85)
local variable: org.elasticsearch.action.bulk.BulkRequestHandler#1
local variable: org.elasticsearch.action.bulk.BulkRequest#22
local variable: org.elasticsearch.action.bulk.BulkRequestHandler$$Lambda$3862#1
at org.elasticsearch.action.bulk.BulkProcessor.execute(BulkProcessor.java:374)
at org.elasticsearch.action.bulk.BulkProcessor.executeIfNeeded(BulkProcessor.java:365)
at org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:322)
at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:305)
at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:301)
at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:287)
at org.elasticsearch.xpack.watcher.history.HistoryStore.put(HistoryStore.java:49)
local variable: org.elasticsearch.common.xcontent.XContentBuilder#206
at org.elasticsearch.xpack.watcher.execution.ExecutionService.execute(ExecutionService.java:314)
local variable: java.lang.String#354418
local variable: org.elasticsearch.xpack.watcher.execution.TriggeredExecutionContext#41
local variable: org.elasticsearch.xpack.core.watcher.history.WatchRecord$ExceptionWatchRecord#5
at org.elasticsearch.xpack.watcher.execution.ExecutionService.lambda$executeAsync$5(ExecutionService.java:394)
at org.elasticsearch.xpack.watcher.execution.ExecutionService$$Lambda$3807.run(<unknown string>)
at org.elasticsearch.xpack.watcher.execution.ExecutionService$WatchExecutionTask.run(ExecutionService.java:543)
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:660)
local variable: org.elasticsearch.common.util.concurrent.ThreadContext$$Lambda$2006#854
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
local variable: org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable#463
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
local variable: java.util.concurrent.ThreadPoolExecutor$Worker#41
at java.lang.Thread.run(Thread.java:748)
[4]
for i in {1..20};do curl -XPUT "http://localhost:9200/_xpack/watcher/watch/foo$i" -H 'Content-Type: application/json' -d'{ "trigger": { "schedule": { "interval": "1s" } }, "input": { "simple": { "name": "Hello there" } }, "actions": { "logit": { "logging": { "level": "info", "text": "The payload is: {{ctx.payload}}" } } }}';done