Skip to content

Duplicate Watcher events firing #38482

@robinjoseph08

Description

@robinjoseph08

Elasticsearch version (bin/elasticsearch --version):

v6.3.1

Plugins installed:

x-pack, discovery-ec2, repository-s3

JVM version (java -version):

java version "1.8.0_131"
Java(TM) SE Runtime Environment (build 1.8.0_131-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode)

OS version (uname -a if on a Unix-like system):

Linux ip-10-200-130-216 4.14.62-65.117.amzn1.x86_64 #1 SMP Fri Aug 10 20:03:52 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux

Description of the problem including expected versus actual behavior:

Occasionally, our configured watchers will fire twice, resulting in duplicate notifications. We have Slack as an output and we'll see two messages posted in the channel. But when we go and check the actual index, only one copy of the documents exist. From a recommendation I saw in the discussion forum, I also outputted the watcher ID in the message to see if there is another watcher that is triggering it that we aren't aware of, but every duplicate has the same watcher ID.

I have a feeling it might be because our cluster is a bit overloaded at times. A minute or so before the duplicate event fires, we'll see some thread pool rejections. However, despite being overloaded, it is working just fine for our use-cases, so it would be ideal if we didn't have to add more power to the cluster because of this. It would also be helpful to know why this would cause a problem for the watcher.

Steps to reproduce:

Unfortunately, since this is non-deterministic, I don't know of a way to reliably reproduce the issue.

Provide logs (if relevant):

A log line of the thread pool rejection:

[2019-02-04T21:01:17,197][ERROR][o.e.a.b.TransportBulkAction] [ip-10-200-130-216] failed to execute pipeline for a bulk request
org.elasticsearch.common.util.concurrent.EsRejectedExecutionException: rejected execution of org.elasticsearch.ingest.PipelineExecutionService$1@4b0074fe on EsThreadPoolExecutor[name = ip-10-200-130-216/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@67cf75da[Running, pool size = 16, active threads = 16, queued tasks = 201, completed tasks = 1076429360]]
        at org.elasticsearch.common.util.concurrent.EsAbortPolicy.rejectedExecution(EsAbortPolicy.java:48) ~[elasticsearch-6.3.1.jar:6.3.1]
        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) ~[?:1.8.0_131]
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) ~[?:1.8.0_131]
        at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.doExecute(EsThreadPoolExecutor.java:98) ~[elasticsearch-6.3.1.jar:6.3.1]
        at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.execute(EsThreadPoolExecutor.java:93) ~[elasticsearch-6.3.1.jar:6.3.1]
        at org.elasticsearch.ingest.PipelineExecutionService.executeBulkRequest(PipelineExecutionService.java:59) ~[elasticsearch-6.3.1.jar:6.3.1]
        at org.elasticsearch.action.bulk.TransportBulkAction.processBulkIndexIngestRequest(TransportBulkAction.java:495) ~[elasticsearch-6.3.1.jar:6.3.1]
        at org.elasticsearch.action.bulk.TransportBulkAction.doExecute(TransportBulkAction.java:134) ~[elasticsearch-6.3.1.jar:6.3.1]
        at org.elasticsearch.action.bulk.TransportBulkAction.doExecute(TransportBulkAction.java:85) ~[elasticsearch-6.3.1.jar:6.3.1]
        at org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:167) ~[elasticsearch-6.3.1.jar:6.3.1]
        at org.elasticsearch.xpack.security.action.filter.SecurityActionFilter.lambda$apply$0(SecurityActionFilter.java:92) ~[?:?]
        at org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:60) ~[elasticsearch-6.3.1.jar:6.3.1]
        at org.elasticsearch.xpack.security.action.filter.SecurityActionFilter.lambda$authorizeRequest$4(SecurityActionFilter.java:181) ~[?:?]
        at org.elasticsearch.xpack.security.authz.AuthorizationUtils$AsyncAuthorizer.maybeRun(AuthorizationUtils.java:173) ~[?:?]
        at org.elasticsearch.xpack.security.authz.AuthorizationUtils$AsyncAuthorizer.setRunAsRoles(AuthorizationUtils.java:167) ~[?:?]
        at org.elasticsearch.xpack.security.authz.AuthorizationUtils$AsyncAuthorizer.authorize(AuthorizationUtils.java:155) ~[?:?]
        at org.elasticsearch.xpack.security.action.filter.SecurityActionFilter.authorizeRequest(SecurityActionFilter.java:183) ~[?:?]
        at org.elasticsearch.xpack.security.action.filter.SecurityActionFilter.lambda$applyInternal$3(SecurityActionFilter.java:161) ~[?:?]
        at org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:60) ~[elasticsearch-6.3.1.jar:6.3.1]
        at org.elasticsearch.xpack.security.authc.AuthenticationService$Authenticator.lambda$authenticateAsync$2(AuthenticationService.java:172) ~[?:?]
        at org.elasticsearch.xpack.security.authc.AuthenticationService$Authenticator.lambda$lookForExistingAuthentication$4(AuthenticationService.java:205) ~[?:?]
        at org.elasticsearch.xpack.security.authc.AuthenticationService$Authenticator.lookForExistingAuthentication(AuthenticationService.java:216) ~[?:?]
        at org.elasticsearch.xpack.security.authc.AuthenticationService$Authenticator.authenticateAsync(AuthenticationService.java:170) ~[?:?]
        at org.elasticsearch.xpack.security.authc.AuthenticationService$Authenticator.access$000(AuthenticationService.java:131) ~[?:?]
        at org.elasticsearch.xpack.security.authc.AuthenticationService.authenticate(AuthenticationService.java:101) ~[?:?]
        at org.elasticsearch.xpack.security.action.filter.SecurityActionFilter.applyInternal(SecurityActionFilter.java:160) ~[?:?]
        at org.elasticsearch.xpack.security.action.filter.SecurityActionFilter.lambda$apply$2(SecurityActionFilter.java:106) ~[?:?]
        at org.elasticsearch.xpack.core.security.SecurityContext.executeAsUser(SecurityContext.java:95) ~[?:?]
        at org.elasticsearch.xpack.security.authz.AuthorizationUtils.switchUserBasedOnActionOriginAndExecute(AuthorizationUtils.java:114) ~[?:?]
        at org.elasticsearch.xpack.security.action.filter.SecurityActionFilter.apply(SecurityActionFilter.java:104) ~[?:?]
        at org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:165) ~[elasticsearch-6.3.1.jar:6.3.1]
        at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:139) ~[elasticsearch-6.3.1.jar:6.3.1]
        at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:81) ~[elasticsearch-6.3.1.jar:6.3.1]
        at org.elasticsearch.client.node.NodeClient.executeLocally(NodeClient.java:87) ~[elasticsearch-6.3.1.jar:6.3.1]
        at org.elasticsearch.client.node.NodeClient.doExecute(NodeClient.java:76) ~[elasticsearch-6.3.1.jar:6.3.1]
        at org.elasticsearch.client.support.AbstractClient.execute(AbstractClient.java:405) ~[elasticsearch-6.3.1.jar:6.3.1]
        at org.elasticsearch.client.support.AbstractClient.bulk(AbstractClient.java:482) ~[elasticsearch-6.3.1.jar:6.3.1]
        at org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin(ClientHelper.java:62) ~[x-pack-core-6.3.1.jar:6.3.1]
        at org.elasticsearch.xpack.monitoring.exporter.local.LocalBulk.doFlush(LocalBulk.java:108) ~[?:?]
        at org.elasticsearch.xpack.monitoring.exporter.ExportBulk.flush(ExportBulk.java:60) ~[?:?]
        at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$Compound.lambda$doFlush$1(ExportBulk.java:154) ~[?:?]
        at org.elasticsearch.xpack.core.common.IteratingActionListener.run(IteratingActionListener.java:81) [x-pack-core-6.3.1.jar:6.3.1]
        at org.elasticsearch.xpack.monitoring.exporter.ExportBulk$Compound.doFlush(ExportBulk.java:170) [x-pack-monitoring-6.3.1.jar:6.3.1]
        at org.elasticsearch.xpack.monitoring.exporter.ExportBulk.flushAndClose(ExportBulk.java:84) [x-pack-monitoring-6.3.1.jar:6.3.1]
        at org.elasticsearch.xpack.monitoring.exporter.ExportBulk.close(ExportBulk.java:74) [x-pack-monitoring-6.3.1.jar:6.3.1]
        at org.elasticsearch.xpack.monitoring.exporter.Exporters.export(Exporters.java:195) [x-pack-monitoring-6.3.1.jar:6.3.1]
        at org.elasticsearch.xpack.monitoring.action.TransportMonitoringBulkAction$AsyncAction$1.doRun(TransportMonitoringBulkAction.java:147) [x-pack-monitoring-6.3.1.jar:6.3.1]
        at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:725) [elasticsearch-6.3.1.jar:6.3.1]
        at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-6.3.1.jar:6.3.1]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_131]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_131]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]

Metadata

Metadata

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions