1919
2020package org .elasticsearch .threadpool ;
2121
22+ import org .elasticsearch .common .settings .Settings ;
23+ import org .elasticsearch .common .unit .TimeValue ;
24+ import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
25+ import org .elasticsearch .common .util .concurrent .EsExecutors ;
26+ import org .elasticsearch .common .util .concurrent .EsThreadPoolExecutor ;
27+ import org .elasticsearch .common .util .concurrent .PrioritizedEsThreadPoolExecutor ;
2228import org .elasticsearch .test .ESTestCase ;
2329import org .junit .After ;
2430import org .junit .Before ;
2531
2632import java .util .Optional ;
2733import java .util .concurrent .CountDownLatch ;
34+ import java .util .concurrent .ExecutorService ;
35+ import java .util .concurrent .ScheduledThreadPoolExecutor ;
36+ import java .util .concurrent .TimeUnit ;
2837import java .util .concurrent .atomic .AtomicReference ;
2938import java .util .function .Consumer ;
3039
@@ -42,30 +51,279 @@ public void setUpThreadPool() {
4251 }
4352
4453 @ After
45- public void tearDownThreadPool () throws InterruptedException {
54+ public void tearDownThreadPool () {
4655 terminate (threadPool );
4756 }
4857
49- public void testExecutionException () throws InterruptedException {
50- runExecutionExceptionTest (
51- () -> {
58+ public void testExecutionErrorOnDefaultThreadPoolTypes () throws InterruptedException {
59+ for (String executor : ThreadPool .THREAD_POOL_TYPES .keySet ()) {
60+ checkExecutionError (getExecuteRunner (threadPool .executor (executor )));
61+ checkExecutionError (getSubmitRunner (threadPool .executor (executor )));
62+ checkExecutionError (getScheduleRunner (executor ));
63+ }
64+ }
65+
66+ public void testExecutionErrorOnDirectExecutorService () throws InterruptedException {
67+ final ExecutorService directExecutorService = EsExecutors .newDirectExecutorService ();
68+ checkExecutionError (getExecuteRunner (directExecutorService ));
69+ checkExecutionError (getSubmitRunner (directExecutorService ));
70+ }
71+
72+ public void testExecutionErrorOnFixedESThreadPoolExecutor () throws InterruptedException {
73+ final EsThreadPoolExecutor fixedExecutor = EsExecutors .newFixed ("test" , 1 , 1 ,
74+ EsExecutors .daemonThreadFactory ("test" ), threadPool .getThreadContext ());
75+ try {
76+ checkExecutionError (getExecuteRunner (fixedExecutor ));
77+ checkExecutionError (getSubmitRunner (fixedExecutor ));
78+ } finally {
79+ ThreadPool .terminate (fixedExecutor , 10 , TimeUnit .SECONDS );
80+ }
81+ }
82+
83+ public void testExecutionErrorOnScalingESThreadPoolExecutor () throws InterruptedException {
84+ final EsThreadPoolExecutor scalingExecutor = EsExecutors .newScaling ("test" , 1 , 1 ,
85+ 10 , TimeUnit .SECONDS , EsExecutors .daemonThreadFactory ("test" ), threadPool .getThreadContext ());
86+ try {
87+ checkExecutionError (getExecuteRunner (scalingExecutor ));
88+ checkExecutionError (getSubmitRunner (scalingExecutor ));
89+ } finally {
90+ ThreadPool .terminate (scalingExecutor , 10 , TimeUnit .SECONDS );
91+ }
92+ }
93+
94+ public void testExecutionErrorOnAutoQueueFixedESThreadPoolExecutor () throws InterruptedException {
95+ final EsThreadPoolExecutor autoQueueFixedExecutor = EsExecutors .newAutoQueueFixed ("test" , 1 , 1 ,
96+ 1 , 1 , 1 , TimeValue .timeValueSeconds (10 ), EsExecutors .daemonThreadFactory ("test" ), threadPool .getThreadContext ());
97+ try {
98+ checkExecutionError (getExecuteRunner (autoQueueFixedExecutor ));
99+ checkExecutionError (getSubmitRunner (autoQueueFixedExecutor ));
100+ } finally {
101+ ThreadPool .terminate (autoQueueFixedExecutor , 10 , TimeUnit .SECONDS );
102+ }
103+ }
104+
105+ public void testExecutionErrorOnSinglePrioritizingThreadPoolExecutor () throws InterruptedException {
106+ final PrioritizedEsThreadPoolExecutor prioritizedExecutor = EsExecutors .newSinglePrioritizing ("test" ,
107+ EsExecutors .daemonThreadFactory ("test" ), threadPool .getThreadContext (), threadPool .scheduler ());
108+ try {
109+ checkExecutionError (getExecuteRunner (prioritizedExecutor ));
110+ checkExecutionError (getSubmitRunner (prioritizedExecutor ));
111+ checkExecutionError (r -> prioritizedExecutor .execute (r , TimeValue .ZERO , r ));
112+ } finally {
113+ ThreadPool .terminate (prioritizedExecutor , 10 , TimeUnit .SECONDS );
114+ }
115+ }
116+
117+ public void testExecutionErrorOnScheduler () throws InterruptedException {
118+ final ScheduledThreadPoolExecutor scheduler = Scheduler .initScheduler (Settings .EMPTY );
119+ try {
120+ checkExecutionError (getExecuteRunner (scheduler ));
121+ checkExecutionError (getSubmitRunner (scheduler ));
122+ checkExecutionError (r -> scheduler .schedule (r , randomFrom (0 , 1 ), TimeUnit .MILLISECONDS ));
123+ } finally {
124+ Scheduler .terminate (scheduler , 10 , TimeUnit .SECONDS );
125+ }
126+ }
127+
128+ private void checkExecutionError (Consumer <Runnable > runner ) throws InterruptedException {
129+ logger .info ("checking error for {}" , runner );
130+ final Runnable runnable ;
131+ if (randomBoolean ()) {
132+ runnable = () -> {
133+ throw new Error ("future error" );
134+ };
135+ } else {
136+ runnable = new AbstractRunnable () {
137+ @ Override
138+ public void onFailure (Exception e ) {
139+
140+ }
141+
142+ @ Override
143+ protected void doRun () {
52144 throw new Error ("future error" );
53- },
54- true ,
55- o -> {
56- assertTrue (o .isPresent ());
57- assertThat (o .get (), instanceOf (Error .class ));
58- assertThat (o .get (), hasToString (containsString ("future error" )));
59- });
60- runExecutionExceptionTest (
61- () -> {
145+ }
146+ };
147+ }
148+ runExecutionTest (
149+ runner ,
150+ runnable ,
151+ true ,
152+ o -> {
153+ assertTrue (o .isPresent ());
154+ assertThat (o .get (), instanceOf (Error .class ));
155+ assertThat (o .get (), hasToString (containsString ("future error" )));
156+ });
157+ }
158+
159+ public void testExecutionExceptionOnDefaultThreadPoolTypes () throws InterruptedException {
160+ for (String executor : ThreadPool .THREAD_POOL_TYPES .keySet ()) {
161+ final boolean expectExceptionOnExecute =
162+ // fixed_auto_queue_size wraps stuff into TimedRunnable, which is an AbstractRunnable
163+ // TODO: this is dangerous as it will silently swallow exceptions, and possibly miss calling a response listener
164+ ThreadPool .THREAD_POOL_TYPES .get (executor ) != ThreadPool .ThreadPoolType .FIXED_AUTO_QUEUE_SIZE ;
165+ checkExecutionException (getExecuteRunner (threadPool .executor (executor )), expectExceptionOnExecute );
166+
167+ // here, it's ok for the exception not to bubble up. Accessing the future will yield the exception
168+ checkExecutionException (getSubmitRunner (threadPool .executor (executor )), false );
169+
170+ final boolean expectExceptionOnSchedule =
171+ // fixed_auto_queue_size wraps stuff into TimedRunnable, which is an AbstractRunnable
172+ // TODO: this is dangerous as it will silently swallow exceptions, and possibly miss calling a response listener
173+ ThreadPool .THREAD_POOL_TYPES .get (executor ) != ThreadPool .ThreadPoolType .FIXED_AUTO_QUEUE_SIZE
174+ // scheduler just swallows the exception here
175+ // TODO: bubble these exceptions up
176+ && ThreadPool .THREAD_POOL_TYPES .get (executor ) != ThreadPool .ThreadPoolType .DIRECT ;
177+ checkExecutionException (getScheduleRunner (executor ), expectExceptionOnSchedule );
178+ }
179+ }
180+
181+ public void testExecutionExceptionOnDirectExecutorService () throws InterruptedException {
182+ final ExecutorService directExecutorService = EsExecutors .newDirectExecutorService ();
183+ checkExecutionException (getExecuteRunner (directExecutorService ), true );
184+ checkExecutionException (getSubmitRunner (directExecutorService ), false );
185+ }
186+
187+ public void testExecutionExceptionOnFixedESThreadPoolExecutor () throws InterruptedException {
188+ final EsThreadPoolExecutor fixedExecutor = EsExecutors .newFixed ("test" , 1 , 1 ,
189+ EsExecutors .daemonThreadFactory ("test" ), threadPool .getThreadContext ());
190+ try {
191+ checkExecutionException (getExecuteRunner (fixedExecutor ), true );
192+ checkExecutionException (getSubmitRunner (fixedExecutor ), false );
193+ } finally {
194+ ThreadPool .terminate (fixedExecutor , 10 , TimeUnit .SECONDS );
195+ }
196+ }
197+
198+ public void testExecutionExceptionOnScalingESThreadPoolExecutor () throws InterruptedException {
199+ final EsThreadPoolExecutor scalingExecutor = EsExecutors .newScaling ("test" , 1 , 1 ,
200+ 10 , TimeUnit .SECONDS , EsExecutors .daemonThreadFactory ("test" ), threadPool .getThreadContext ());
201+ try {
202+ checkExecutionException (getExecuteRunner (scalingExecutor ), true );
203+ checkExecutionException (getSubmitRunner (scalingExecutor ), false );
204+ } finally {
205+ ThreadPool .terminate (scalingExecutor , 10 , TimeUnit .SECONDS );
206+ }
207+ }
208+
209+ public void testExecutionExceptionOnAutoQueueFixedESThreadPoolExecutor () throws InterruptedException {
210+ final EsThreadPoolExecutor autoQueueFixedExecutor = EsExecutors .newAutoQueueFixed ("test" , 1 , 1 ,
211+ 1 , 1 , 1 , TimeValue .timeValueSeconds (10 ), EsExecutors .daemonThreadFactory ("test" ), threadPool .getThreadContext ());
212+ try {
213+ // fixed_auto_queue_size wraps stuff into TimedRunnable, which is an AbstractRunnable
214+ // TODO: this is dangerous as it will silently swallow exceptions, and possibly miss calling a response listener
215+ checkExecutionException (getExecuteRunner (autoQueueFixedExecutor ), false );
216+ checkExecutionException (getSubmitRunner (autoQueueFixedExecutor ), false );
217+ } finally {
218+ ThreadPool .terminate (autoQueueFixedExecutor , 10 , TimeUnit .SECONDS );
219+ }
220+ }
221+
222+ public void testExecutionExceptionOnSinglePrioritizingThreadPoolExecutor () throws InterruptedException {
223+ final PrioritizedEsThreadPoolExecutor prioritizedExecutor = EsExecutors .newSinglePrioritizing ("test" ,
224+ EsExecutors .daemonThreadFactory ("test" ), threadPool .getThreadContext (), threadPool .scheduler ());
225+ try {
226+ checkExecutionException (getExecuteRunner (prioritizedExecutor ), true );
227+ checkExecutionException (getSubmitRunner (prioritizedExecutor ), false );
228+ checkExecutionException (r -> prioritizedExecutor .execute (r , TimeValue .ZERO , r ), true );
229+ } finally {
230+ ThreadPool .terminate (prioritizedExecutor , 10 , TimeUnit .SECONDS );
231+ }
232+ }
233+
234+ public void testExecutionExceptionOnScheduler () throws InterruptedException {
235+ final ScheduledThreadPoolExecutor scheduler = Scheduler .initScheduler (Settings .EMPTY );
236+ try {
237+ // scheduler just swallows the exceptions
238+ // TODO: bubble these exceptions up
239+ checkExecutionException (getExecuteRunner (scheduler ), false );
240+ checkExecutionException (getSubmitRunner (scheduler ), false );
241+ checkExecutionException (r -> scheduler .schedule (r , randomFrom (0 , 1 ), TimeUnit .MILLISECONDS ), false );
242+ } finally {
243+ Scheduler .terminate (scheduler , 10 , TimeUnit .SECONDS );
244+ }
245+ }
246+
247+ private void checkExecutionException (Consumer <Runnable > runner , boolean expectException ) throws InterruptedException {
248+ logger .info ("checking exception for {}" , runner );
249+ final Runnable runnable ;
250+ final boolean willThrow ;
251+ if (randomBoolean ()) {
252+ runnable = () -> {
253+ throw new IllegalStateException ("future exception" );
254+ };
255+ willThrow = expectException ;
256+ } else {
257+ runnable = new AbstractRunnable () {
258+ @ Override
259+ public void onFailure (Exception e ) {
260+
261+ }
262+
263+ @ Override
264+ protected void doRun () {
62265 throw new IllegalStateException ("future exception" );
63- },
64- false ,
65- o -> assertFalse (o .isPresent ()));
266+ }
267+ };
268+ willThrow = false ;
269+ }
270+ runExecutionTest (
271+ runner ,
272+ runnable ,
273+ willThrow ,
274+ o -> {
275+ assertEquals (willThrow , o .isPresent ());
276+ if (willThrow ) {
277+ assertThat (o .get (), instanceOf (IllegalStateException .class ));
278+ assertThat (o .get (), hasToString (containsString ("future exception" )));
279+ }
280+ });
281+ }
282+
283+ Consumer <Runnable > getExecuteRunner (ExecutorService executor ) {
284+ return new Consumer <Runnable >() {
285+ @ Override
286+ public void accept (Runnable runnable ) {
287+ executor .execute (runnable );
288+ }
289+
290+ @ Override
291+ public String toString () {
292+ return "executor(" + executor + ").execute()" ;
293+ }
294+ };
295+ }
296+
297+ Consumer <Runnable > getSubmitRunner (ExecutorService executor ) {
298+ return new Consumer <Runnable >() {
299+ @ Override
300+ public void accept (Runnable runnable ) {
301+ executor .submit (runnable );
302+ }
303+
304+ @ Override
305+ public String toString () {
306+ return "executor(" + executor + ").submit()" ;
307+ }
308+ };
309+ }
310+
311+ Consumer <Runnable > getScheduleRunner (String executor ) {
312+ return new Consumer <Runnable >() {
313+ @ Override
314+ public void accept (Runnable runnable ) {
315+ threadPool .schedule (randomFrom (TimeValue .ZERO , TimeValue .timeValueMillis (1 )), executor , runnable );
316+ }
317+
318+ @ Override
319+ public String toString () {
320+ return "schedule(" + executor + ")" ;
321+ }
322+ };
66323 }
67324
68- private void runExecutionExceptionTest (
325+ private void runExecutionTest (
326+ final Consumer <Runnable > runner ,
69327 final Runnable runnable ,
70328 final boolean expectThrowable ,
71329 final Consumer <Optional <Throwable >> consumer ) throws InterruptedException {
@@ -82,13 +340,18 @@ private void runExecutionExceptionTest(
82340
83341 final CountDownLatch supplierLatch = new CountDownLatch (1 );
84342
85- threadPool .generic ().submit (() -> {
86- try {
87- runnable .run ();
88- } finally {
89- supplierLatch .countDown ();
90- }
91- });
343+ try {
344+ runner .accept (() -> {
345+ try {
346+ runnable .run ();
347+ } finally {
348+ supplierLatch .countDown ();
349+ }
350+ });
351+ } catch (Throwable t ) {
352+ consumer .accept (Optional .of (t ));
353+ return ;
354+ }
92355
93356 supplierLatch .await ();
94357
0 commit comments