|
4 | 4 | import os |
5 | 5 | import sys |
6 | 6 | import time |
| 7 | +from threading import Thread |
7 | 8 | import unittest |
8 | 9 | from concurrent.futures.interpreter import BrokenInterpreterPool |
9 | 10 | from concurrent import interpreters |
@@ -359,45 +360,85 @@ def test_blocking(self): |
359 | 360 |
|
360 | 361 | def run(taskid, ready, blocker): |
361 | 362 | ready.put_nowait(taskid) |
362 | | - raise Exception(taskid) |
363 | 363 | blocker.get(timeout=10) # blocking |
| 364 | +# blocker.get() # blocking |
364 | 365 |
|
365 | 366 | numtasks = 10 |
366 | 367 | futures = [] |
367 | 368 | executor = self.executor_type() |
368 | 369 | try: |
| 370 | + # Request the jobs. |
369 | 371 | for i in range(numtasks): |
370 | 372 | fut = executor.submit(run, i, ready, blocker) |
371 | 373 | futures.append(fut) |
372 | 374 | # assert len(executor._threads) == numtasks, len(executor._threads) |
373 | | - exceptions1 = [] |
374 | | - for i, fut in enumerate(futures, 1): |
375 | | - try: |
376 | | - fut.result(timeout=10) |
377 | | - except Exception as exc: |
378 | | - exceptions1.append(exc) |
379 | | - exceptions2 = [] |
| 375 | + |
380 | 376 | try: |
381 | 377 | # Wait for them all to be ready. |
382 | | - for i in range(numtasks): |
| 378 | + pending = numtasks |
| 379 | + def wait_for_ready(): |
| 380 | + nonlocal pending |
383 | 381 | try: |
384 | 382 | ready.get(timeout=10) # blocking |
385 | | - except interpreters.QueueEmpty as exc: |
386 | | - exceptions2.append(exc) |
| 383 | + except interpreters.QueueEmpty: |
| 384 | + pass |
| 385 | + else: |
| 386 | + pending -= 1 |
| 387 | + threads = [Thread(target=wait_for_ready) |
| 388 | + for _ in range(pending)] |
| 389 | + for t in threads: |
| 390 | + t.start() |
| 391 | + for t in threads: |
| 392 | + t.join() |
| 393 | + if pending: |
| 394 | + if pending < numtasks: |
| 395 | + # At least one was ready, so wait longer. |
| 396 | + for _ in range(pending): |
| 397 | + ready.get() # blocking |
| 398 | + else: |
| 399 | + # Something is probably wrong. Bail out. |
| 400 | + group = [] |
| 401 | + for fut in futures: |
| 402 | + try: |
| 403 | + fut.result(timeout=0) |
| 404 | + except TimeoutError: |
| 405 | + # Still running. |
| 406 | + try: |
| 407 | + ready.get_nowait() |
| 408 | + except interpreters.QueueEmpty as exc: |
| 409 | + # It's hung. |
| 410 | + group.append(exc) |
| 411 | + else: |
| 412 | + pending -= 1 |
| 413 | + except Exception as exc: |
| 414 | + group.append(exc) |
| 415 | + if group: |
| 416 | + raise ExceptionGroup('futures', group) |
| 417 | + assert not pending, pending |
| 418 | +# for _ in range(numtasks): |
| 419 | +# ready.get() # blocking |
387 | 420 | finally: |
388 | 421 | # Unblock the workers. |
389 | 422 | for i in range(numtasks): |
390 | 423 | blocker.put_nowait(None) |
391 | | - group1 = ExceptionGroup('futures', exceptions1) if exceptions1 else None |
392 | | - group2 = ExceptionGroup('ready', exceptions2) if exceptions2 else None |
393 | | - if group2: |
394 | | - group2.__cause__ = group1 |
395 | | - raise group2 |
396 | | - elif group1: |
397 | | - raise group1 |
398 | | - raise group |
| 424 | + |
| 425 | + # Make sure they finished. |
| 426 | + group = [] |
| 427 | + def wait_for_done(fut): |
| 428 | + try: |
| 429 | + fut.result(timeout=10) |
| 430 | + except Exception as exc: |
| 431 | + group.append(exc) |
| 432 | + threads = [Thread(target=wait_for_done, args=(fut,)) |
| 433 | + for fut in futures] |
| 434 | + for t in threads: |
| 435 | + t.start() |
| 436 | + for t in threads: |
| 437 | + t.join() |
| 438 | + if group: |
| 439 | + raise ExceptionGroup('futures', group) |
399 | 440 | finally: |
400 | | - executor.shutdown(wait=True) |
| 441 | + executor.shutdown(wait=False) |
401 | 442 |
|
402 | 443 | @support.requires_gil_enabled("gh-117344: test is flaky without the GIL") |
403 | 444 | def test_idle_thread_reuse(self): |
|
0 commit comments