From d20549cbfa2dea41ebf709eba20a0853e5055794 Mon Sep 17 00:00:00 2001 From: Giampaolo Rodola Date: Wed, 6 Feb 2019 17:27:37 +0100 Subject: [PATCH 01/11] multiprocessing: provide unittests for manager classes and shareable types --- Lib/test/_test_multiprocessing.py | 206 +++++++++++++++++++++++++++++- 1 file changed, 205 insertions(+), 1 deletion(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 7341131231a4f0..469eda8ebb4051 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -4706,12 +4706,216 @@ def is_alive(self): any(process.is_alive() for process in forked_processes)) - class MiscTestCase(unittest.TestCase): def test__all__(self): # Just make sure names in blacklist are excluded support.check__all__(self, multiprocessing, extra=multiprocessing.__all__, blacklist=['SUBDEBUG', 'SUBWARNING']) + + +class TestSyncManagerTypes(unittest.TestCase): + """Test all the types which can be shared between parent and child + process by using a manager which acts as an intermediary between + the two. + + In the following tests the base type is created in the parent + process. "callback" function represents how the base type is + received (and handled) from within the child process. E.g.: + + def test_list(self): + def callback(obj): + # === within the child process === + assert obj[0] == 1 + + # === within the parent process === + o = self.manager.list() + o.append(1) + self.run_test(callback, o) + """ + manager_class = multiprocessing.managers.SyncManager + + def setUp(self): + self.manager = self.manager_class() + self.manager.start() + self.proc = None + + def tearDown(self): + self.manager.shutdown() + if self.proc is not None: + self.proc.terminate() + self.proc.join() + + def run_test(self, callback, obj): + self.proc = multiprocessing.Process(target=callback, args=(obj, )) + self.proc.start() + self.proc.join() + + def test_queue(self, qname="Queue"): + def callback(obj): + assert obj.qsize() == 2 + assert obj.full() + assert not obj.empty() + assert obj.get() == 5 + assert not obj.empty() + assert obj.get() == 6 + assert obj.empty() + + o = getattr(self.manager, qname)(2) + o.put(5) + o.put(6) + self.run_test(callback, o) + assert o.empty() + assert not o.full() + + def test_joinable_queue(self): + self.test_queue("JoinableQueue") + + def test_event(self): + def callback(obj): + assert obj.is_set() + obj.wait() + obj.clear() + obj.wait(0.001) + + o = self.manager.Event() + o.set() + self.run_test(callback, o) + assert not o.is_set() + o.wait(0.001) + + def test_lock(self, lname="Lock"): + def callback(obj): + o.acquire() + + o = getattr(self.manager, lname)() + self.run_test(callback, o) + o.release() + self.assertRaises(RuntimeError, o.release) # already released + + def test_rlock(self): + self.test_lock(lname="RLock") + + def test_semaphore(self, sname="Semaphore"): + def callback(obj): + obj.acquire() + + o = getattr(self.manager, sname)() + self.run_test(callback, o) + o.release() + + def test_bounded_semaphore(self): + self.test_semaphore(sname="BoundedSemaphore") + + def test_condition(self): + def callback(obj): + obj.acquire() + + o = self.manager.Condition() + self.run_test(callback, o) + o.release() + self.assertRaises(RuntimeError, o.release) # already released + + def test_barrier(self): + def callback(obj): + assert obj.parties == 5 + obj.reset() + + o = self.manager.Barrier(5) + self.run_test(callback, o) + + def test_pool(self): + def callback(obj): + # XXX: further tests were supposed to be here but it + # seems Pool() is broken. When using apply_async() or + # or map() I get: + # .../multiprocessing/managers.py", line 802, in _callmethod + # proxytype = self._manager._registry[token.typeid][-1] + # AttributeError: 'NoneType' object has no attribute '_registry' + with obj: + pass + + o = self.manager.Pool(processes=4) + self.run_test(callback, o) + + def test_list(self): + def callback(obj): + assert obj[0] == 5 + assert obj.count(5) == 1 + assert obj.index(5) == 0 + obj.sort() + obj.reverse() + for x in obj: + pass + assert len(obj) == 1 + assert obj.pop(0) == 5 + + o = self.manager.list() + o.append(5) + self.run_test(callback, o) + assert not o + self.assertEqual(len(o), 0) + + def test_dict(self): + def callback(obj): + assert len(obj) == 1 + assert obj['foo'] == 5 + assert obj.get('foo') == 5 + assert list(obj.items()) == [('foo', 5)] + assert list(obj.keys()) == ['foo'] + assert list(obj.values()) == [5] + assert obj.copy() == {'foo': 5} + assert obj.popitem() == ('foo', 5) + + o = self.manager.dict() + o['foo'] = 5 + self.run_test(callback, o) + assert not o + self.assertEqual(len(o), 0) + + def test_value(self): + def callback(obj): + assert obj.value == 1 + assert obj.get() == 1 + obj.set(2) + + o = self.manager.Value('i', 1) + self.run_test(callback, o) + self.assertEqual(o.value, 2) + self.assertEqual(o.get(), 2) + + def test_array(self): + def callback(obj): + assert obj[0] == 0 + assert obj[1] == 1 + assert len(obj) == 2 + assert list(obj) == [0, 1] + + o = self.manager.Array('i', [0, 1]) + self.run_test(callback, o) + + def test_namespace(self): + def callback(obj): + assert obj.x == 0 + assert obj.y == 1 + + o = self.manager.Namespace() + o.x = 0 + o.y = 1 + self.run_test(callback, o) + + +try: + import multiprocessing.shared_memory +except ImportError: + @unittest.skipIf(True, "SharedMemoryManager not available") + class TestSharedMemoryManagerTypes(TestSyncManagerTypes): + pass +else: + class TestSharedMemoryManagerTypes(TestSyncManagerTypes): + """Same as above but by using SharedMemoryManager.""" + manager_class = multiprocessing.shared_memory.SharedMemoryManager + + # # Mixins # From 312b5c477f86ef457e63ec4c2542c3c9884429bf Mon Sep 17 00:00:00 2001 From: Giampaolo Rodola Date: Wed, 6 Feb 2019 18:07:10 +0100 Subject: [PATCH 02/11] add NEWS entry + rename function/arg-name --- Lib/multiprocessing/managers.py | 1 + Lib/test/_test_multiprocessing.py | 56 +++++++++---------- .../2019-02-06-18-06-16.bpo-35917.-Clv1L.rst | 3 + 3 files changed, 32 insertions(+), 28 deletions(-) create mode 100644 Misc/NEWS.d/next/Tests/2019-02-06-18-06-16.bpo-35917.-Clv1L.rst diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index dbed993a38d65a..d7c5fb9c457ae9 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -1200,3 +1200,4 @@ class SyncManager(BaseManager): # types returned by methods of PoolProxy SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False) SyncManager.register('AsyncResult', create_method=False) + diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 469eda8ebb4051..9402af2f47f504 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -4723,14 +4723,14 @@ class TestSyncManagerTypes(unittest.TestCase): received (and handled) from within the child process. E.g.: def test_list(self): - def callback(obj): + def worker(obj): # === within the child process === assert obj[0] == 1 # === within the parent process === o = self.manager.list() o.append(1) - self.run_test(callback, o) + self.run_test(worker, o) """ manager_class = multiprocessing.managers.SyncManager @@ -4745,13 +4745,13 @@ def tearDown(self): self.proc.terminate() self.proc.join() - def run_test(self, callback, obj): - self.proc = multiprocessing.Process(target=callback, args=(obj, )) + def run_test(self, worker, obj): + self.proc = multiprocessing.Process(target=worker, args=(obj, )) self.proc.start() self.proc.join() def test_queue(self, qname="Queue"): - def callback(obj): + def worker(obj): assert obj.qsize() == 2 assert obj.full() assert not obj.empty() @@ -4763,7 +4763,7 @@ def callback(obj): o = getattr(self.manager, qname)(2) o.put(5) o.put(6) - self.run_test(callback, o) + self.run_test(worker, o) assert o.empty() assert not o.full() @@ -4771,7 +4771,7 @@ def test_joinable_queue(self): self.test_queue("JoinableQueue") def test_event(self): - def callback(obj): + def worker(obj): assert obj.is_set() obj.wait() obj.clear() @@ -4779,16 +4779,16 @@ def callback(obj): o = self.manager.Event() o.set() - self.run_test(callback, o) + self.run_test(worker, o) assert not o.is_set() o.wait(0.001) def test_lock(self, lname="Lock"): - def callback(obj): + def worker(obj): o.acquire() o = getattr(self.manager, lname)() - self.run_test(callback, o) + self.run_test(worker, o) o.release() self.assertRaises(RuntimeError, o.release) # already released @@ -4796,35 +4796,35 @@ def test_rlock(self): self.test_lock(lname="RLock") def test_semaphore(self, sname="Semaphore"): - def callback(obj): + def worker(obj): obj.acquire() o = getattr(self.manager, sname)() - self.run_test(callback, o) + self.run_test(worker, o) o.release() def test_bounded_semaphore(self): self.test_semaphore(sname="BoundedSemaphore") def test_condition(self): - def callback(obj): + def worker(obj): obj.acquire() o = self.manager.Condition() - self.run_test(callback, o) + self.run_test(worker, o) o.release() self.assertRaises(RuntimeError, o.release) # already released def test_barrier(self): - def callback(obj): + def worker(obj): assert obj.parties == 5 obj.reset() o = self.manager.Barrier(5) - self.run_test(callback, o) + self.run_test(worker, o) def test_pool(self): - def callback(obj): + def worker(obj): # XXX: further tests were supposed to be here but it # seems Pool() is broken. When using apply_async() or # or map() I get: @@ -4835,10 +4835,10 @@ def callback(obj): pass o = self.manager.Pool(processes=4) - self.run_test(callback, o) + self.run_test(worker, o) def test_list(self): - def callback(obj): + def worker(obj): assert obj[0] == 5 assert obj.count(5) == 1 assert obj.index(5) == 0 @@ -4851,12 +4851,12 @@ def callback(obj): o = self.manager.list() o.append(5) - self.run_test(callback, o) + self.run_test(worker, o) assert not o self.assertEqual(len(o), 0) def test_dict(self): - def callback(obj): + def worker(obj): assert len(obj) == 1 assert obj['foo'] == 5 assert obj.get('foo') == 5 @@ -4868,40 +4868,40 @@ def callback(obj): o = self.manager.dict() o['foo'] = 5 - self.run_test(callback, o) + self.run_test(worker, o) assert not o self.assertEqual(len(o), 0) def test_value(self): - def callback(obj): + def worker(obj): assert obj.value == 1 assert obj.get() == 1 obj.set(2) o = self.manager.Value('i', 1) - self.run_test(callback, o) + self.run_test(worker, o) self.assertEqual(o.value, 2) self.assertEqual(o.get(), 2) def test_array(self): - def callback(obj): + def worker(obj): assert obj[0] == 0 assert obj[1] == 1 assert len(obj) == 2 assert list(obj) == [0, 1] o = self.manager.Array('i', [0, 1]) - self.run_test(callback, o) + self.run_test(worker, o) def test_namespace(self): - def callback(obj): + def worker(obj): assert obj.x == 0 assert obj.y == 1 o = self.manager.Namespace() o.x = 0 o.y = 1 - self.run_test(callback, o) + self.run_test(worker, o) try: diff --git a/Misc/NEWS.d/next/Tests/2019-02-06-18-06-16.bpo-35917.-Clv1L.rst b/Misc/NEWS.d/next/Tests/2019-02-06-18-06-16.bpo-35917.-Clv1L.rst new file mode 100644 index 00000000000000..1bdd6eade2f3f2 --- /dev/null +++ b/Misc/NEWS.d/next/Tests/2019-02-06-18-06-16.bpo-35917.-Clv1L.rst @@ -0,0 +1,3 @@ +multiprocessing: provide unit tests for SyncManager and SharedMemoryManager +classes + all the shareable types which are supposed to be supported by +them. (patch by Giampaolo Rodola. (patch by Giampaolo Rodola) From 55fdf89cc94a17b7da7982b4ffdb8c31067beec4 Mon Sep 17 00:00:00 2001 From: Giampaolo Rodola Date: Wed, 6 Feb 2019 18:08:05 +0100 Subject: [PATCH 03/11] fix typo --- Misc/NEWS.d/next/Tests/2019-02-06-18-06-16.bpo-35917.-Clv1L.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Tests/2019-02-06-18-06-16.bpo-35917.-Clv1L.rst b/Misc/NEWS.d/next/Tests/2019-02-06-18-06-16.bpo-35917.-Clv1L.rst index 1bdd6eade2f3f2..546d47e39d872e 100644 --- a/Misc/NEWS.d/next/Tests/2019-02-06-18-06-16.bpo-35917.-Clv1L.rst +++ b/Misc/NEWS.d/next/Tests/2019-02-06-18-06-16.bpo-35917.-Clv1L.rst @@ -1,3 +1,3 @@ multiprocessing: provide unit tests for SyncManager and SharedMemoryManager classes + all the shareable types which are supposed to be supported by -them. (patch by Giampaolo Rodola. (patch by Giampaolo Rodola) +them. (patch by Giampaolo Rodola) From 9eca2506e98bbc4bac860dd3d58856a324082de7 Mon Sep 17 00:00:00 2001 From: Giampaolo Rodola Date: Wed, 6 Feb 2019 19:53:04 +0100 Subject: [PATCH 04/11] move tests in a separate sub-module in order to avoid the global() related hacks performed in install_tests_in_module_dict() --- Lib/multiprocessing/managers.py | 1 - Lib/test/_test_multiprocessing.py | 206 +------------------- Lib/test/test_multiprocessing_managers.py | 217 ++++++++++++++++++++++ 3 files changed, 218 insertions(+), 206 deletions(-) create mode 100644 Lib/test/test_multiprocessing_managers.py diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index d7c5fb9c457ae9..dbed993a38d65a 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -1200,4 +1200,3 @@ class SyncManager(BaseManager): # types returned by methods of PoolProxy SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False) SyncManager.register('AsyncResult', create_method=False) - diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 9402af2f47f504..7341131231a4f0 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -4706,216 +4706,12 @@ def is_alive(self): any(process.is_alive() for process in forked_processes)) + class MiscTestCase(unittest.TestCase): def test__all__(self): # Just make sure names in blacklist are excluded support.check__all__(self, multiprocessing, extra=multiprocessing.__all__, blacklist=['SUBDEBUG', 'SUBWARNING']) - - -class TestSyncManagerTypes(unittest.TestCase): - """Test all the types which can be shared between parent and child - process by using a manager which acts as an intermediary between - the two. - - In the following tests the base type is created in the parent - process. "callback" function represents how the base type is - received (and handled) from within the child process. E.g.: - - def test_list(self): - def worker(obj): - # === within the child process === - assert obj[0] == 1 - - # === within the parent process === - o = self.manager.list() - o.append(1) - self.run_test(worker, o) - """ - manager_class = multiprocessing.managers.SyncManager - - def setUp(self): - self.manager = self.manager_class() - self.manager.start() - self.proc = None - - def tearDown(self): - self.manager.shutdown() - if self.proc is not None: - self.proc.terminate() - self.proc.join() - - def run_test(self, worker, obj): - self.proc = multiprocessing.Process(target=worker, args=(obj, )) - self.proc.start() - self.proc.join() - - def test_queue(self, qname="Queue"): - def worker(obj): - assert obj.qsize() == 2 - assert obj.full() - assert not obj.empty() - assert obj.get() == 5 - assert not obj.empty() - assert obj.get() == 6 - assert obj.empty() - - o = getattr(self.manager, qname)(2) - o.put(5) - o.put(6) - self.run_test(worker, o) - assert o.empty() - assert not o.full() - - def test_joinable_queue(self): - self.test_queue("JoinableQueue") - - def test_event(self): - def worker(obj): - assert obj.is_set() - obj.wait() - obj.clear() - obj.wait(0.001) - - o = self.manager.Event() - o.set() - self.run_test(worker, o) - assert not o.is_set() - o.wait(0.001) - - def test_lock(self, lname="Lock"): - def worker(obj): - o.acquire() - - o = getattr(self.manager, lname)() - self.run_test(worker, o) - o.release() - self.assertRaises(RuntimeError, o.release) # already released - - def test_rlock(self): - self.test_lock(lname="RLock") - - def test_semaphore(self, sname="Semaphore"): - def worker(obj): - obj.acquire() - - o = getattr(self.manager, sname)() - self.run_test(worker, o) - o.release() - - def test_bounded_semaphore(self): - self.test_semaphore(sname="BoundedSemaphore") - - def test_condition(self): - def worker(obj): - obj.acquire() - - o = self.manager.Condition() - self.run_test(worker, o) - o.release() - self.assertRaises(RuntimeError, o.release) # already released - - def test_barrier(self): - def worker(obj): - assert obj.parties == 5 - obj.reset() - - o = self.manager.Barrier(5) - self.run_test(worker, o) - - def test_pool(self): - def worker(obj): - # XXX: further tests were supposed to be here but it - # seems Pool() is broken. When using apply_async() or - # or map() I get: - # .../multiprocessing/managers.py", line 802, in _callmethod - # proxytype = self._manager._registry[token.typeid][-1] - # AttributeError: 'NoneType' object has no attribute '_registry' - with obj: - pass - - o = self.manager.Pool(processes=4) - self.run_test(worker, o) - - def test_list(self): - def worker(obj): - assert obj[0] == 5 - assert obj.count(5) == 1 - assert obj.index(5) == 0 - obj.sort() - obj.reverse() - for x in obj: - pass - assert len(obj) == 1 - assert obj.pop(0) == 5 - - o = self.manager.list() - o.append(5) - self.run_test(worker, o) - assert not o - self.assertEqual(len(o), 0) - - def test_dict(self): - def worker(obj): - assert len(obj) == 1 - assert obj['foo'] == 5 - assert obj.get('foo') == 5 - assert list(obj.items()) == [('foo', 5)] - assert list(obj.keys()) == ['foo'] - assert list(obj.values()) == [5] - assert obj.copy() == {'foo': 5} - assert obj.popitem() == ('foo', 5) - - o = self.manager.dict() - o['foo'] = 5 - self.run_test(worker, o) - assert not o - self.assertEqual(len(o), 0) - - def test_value(self): - def worker(obj): - assert obj.value == 1 - assert obj.get() == 1 - obj.set(2) - - o = self.manager.Value('i', 1) - self.run_test(worker, o) - self.assertEqual(o.value, 2) - self.assertEqual(o.get(), 2) - - def test_array(self): - def worker(obj): - assert obj[0] == 0 - assert obj[1] == 1 - assert len(obj) == 2 - assert list(obj) == [0, 1] - - o = self.manager.Array('i', [0, 1]) - self.run_test(worker, o) - - def test_namespace(self): - def worker(obj): - assert obj.x == 0 - assert obj.y == 1 - - o = self.manager.Namespace() - o.x = 0 - o.y = 1 - self.run_test(worker, o) - - -try: - import multiprocessing.shared_memory -except ImportError: - @unittest.skipIf(True, "SharedMemoryManager not available") - class TestSharedMemoryManagerTypes(TestSyncManagerTypes): - pass -else: - class TestSharedMemoryManagerTypes(TestSyncManagerTypes): - """Same as above but by using SharedMemoryManager.""" - manager_class = multiprocessing.shared_memory.SharedMemoryManager - - # # Mixins # diff --git a/Lib/test/test_multiprocessing_managers.py b/Lib/test/test_multiprocessing_managers.py new file mode 100644 index 00000000000000..5bc891b14eeb14 --- /dev/null +++ b/Lib/test/test_multiprocessing_managers.py @@ -0,0 +1,217 @@ +import unittest +from multiprocessing import Process +from multiprocessing.managers import SyncManager +from test.support import reap_children + + +class TestSyncManagerTypes(unittest.TestCase): + """Test all the types which can be shared between parent and child + process by using a manager which acts as an intermediary between + the two. + + In the following tests the base type is created in the parent + process. "worker" function represents how the base type is + received (and handled) from within the child process. E.g.: + + def test_list(self): + def worker(obj): + # === within the child process === + assert obj[0] == 1 + + # === within the parent process === + o = self.manager.list() + o.append(1) + self.run_test(worker, o) + """ + manager_class = SyncManager + + def setUp(self): + self.manager = self.manager_class() + self.manager.start() + self.proc = None + + def tearDown(self): + self.manager.shutdown() + if self.proc is not None and self.proc.is_alive(): + self.proc.terminate() + self.proc.join() + + @classmethod + def tearDownClass(cls): + reap_children() + + def run_test(self, worker, obj): + self.proc = Process(target=worker, args=(obj, )) + self.proc.start() + self.proc.join() + self.assertEqual(self.proc.exitcode, 0) + + def test_queue(self, qname="Queue"): + def worker(obj): + assert obj.qsize() == 2 + assert obj.full() + assert not obj.empty() + assert obj.get() == 5 + assert not obj.empty() + assert obj.get() == 6 + assert obj.empty() + + o = getattr(self.manager, qname)(2) + o.put(5) + o.put(6) + self.run_test(worker, o) + assert o.empty() + assert not o.full() + + def test_joinable_queue(self): + self.test_queue("JoinableQueue") + + def test_event(self): + def worker(obj): + assert obj.is_set() + obj.wait() + obj.clear() + obj.wait(0.001) + + o = self.manager.Event() + o.set() + self.run_test(worker, o) + assert not o.is_set() + o.wait(0.001) + + def test_lock(self, lname="Lock"): + def worker(obj): + o.acquire() + + o = getattr(self.manager, lname)() + self.run_test(worker, o) + o.release() + self.assertRaises(RuntimeError, o.release) # already released + + def test_rlock(self): + self.test_lock(lname="RLock") + + def test_semaphore(self, sname="Semaphore"): + def worker(obj): + obj.acquire() + + o = getattr(self.manager, sname)() + self.run_test(worker, o) + o.release() + + def test_bounded_semaphore(self): + self.test_semaphore(sname="BoundedSemaphore") + + def test_condition(self): + def worker(obj): + obj.acquire() + + o = self.manager.Condition() + self.run_test(worker, o) + o.release() + self.assertRaises(RuntimeError, o.release) # already released + + def test_barrier(self): + def worker(obj): + assert obj.parties == 5 + obj.reset() + + o = self.manager.Barrier(5) + self.run_test(worker, o) + + def test_pool(self): + def worker(obj): + # TODO: fix https://bugs.python.org/issue35919 + with obj: + pass + + o = self.manager.Pool(processes=4) + self.run_test(worker, o) + + def test_list(self): + def worker(obj): + assert obj[0] == 5 + assert obj.count(5) == 1 + assert obj.index(5) == 0 + obj.sort() + obj.reverse() + for x in obj: + pass + assert len(obj) == 1 + assert obj.pop(0) == 5 + + o = self.manager.list() + o.append(5) + self.run_test(worker, o) + assert not o + self.assertEqual(len(o), 0) + + def test_dict(self): + def worker(obj): + assert len(obj) == 1 + assert obj['foo'] == 5 + assert obj.get('foo') == 5 + # TODO: fix https://bugs.python.org/issue35918 + # assert obj.has_key('foo') + assert list(obj.items()) == [('foo', 5)] + assert list(obj.keys()) == ['foo'] + assert list(obj.values()) == [5] + assert obj.copy() == {'foo': 5} + assert obj.popitem() == ('foo', 5) + + o = self.manager.dict() + o['foo'] = 5 + self.run_test(worker, o) + assert not o + self.assertEqual(len(o), 0) + + def test_value(self): + def worker(obj): + assert obj.value == 1 + assert obj.get() == 1 + obj.set(2) + + o = self.manager.Value('i', 1) + self.run_test(worker, o) + self.assertEqual(o.value, 2) + self.assertEqual(o.get(), 2) + + def test_array(self): + def worker(obj): + assert obj[0] == 0 + assert obj[1] == 1 + assert len(obj) == 2 + assert list(obj) == [0, 1] + + o = self.manager.Array('i', [0, 1]) + self.run_test(worker, o) + + def test_namespace(self): + def worker(obj): + assert obj.x == 0 + assert obj.y == 1 + + o = self.manager.Namespace() + o.x = 0 + o.y = 1 + self.run_test(worker, o) + + +try: + from multiprocessing.shared_memory import SharedMemoryManager +except ImportError: + @unittest.skip("SharedMemoryManager not available on this platform") + class TestSharedMemoryManagerTypes(TestSyncManagerTypes): + pass +else: + class TestSharedMemoryManagerTypes(TestSyncManagerTypes): + """Same as above but by using SharedMemoryManager.""" + manager_class = SharedMemoryManager + + +def tearDownModule(): + reap_children() + + +if __name__ == '__main__': + unittest.main() From bba51f736c32e23a0014e0211f0f45b6482b61f1 Mon Sep 17 00:00:00 2001 From: Giampaolo Rodola Date: Wed, 6 Feb 2019 20:18:01 +0100 Subject: [PATCH 05/11] try to fix global()-related discoverability issues on Windows by using install_tests_in_module_dict() --- Lib/test/test_multiprocessing_managers.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/Lib/test/test_multiprocessing_managers.py b/Lib/test/test_multiprocessing_managers.py index 5bc891b14eeb14..83b93fff171be6 100644 --- a/Lib/test/test_multiprocessing_managers.py +++ b/Lib/test/test_multiprocessing_managers.py @@ -1,7 +1,8 @@ -import unittest from multiprocessing import Process from multiprocessing.managers import SyncManager from test.support import reap_children +import test._test_multiprocessing +import unittest class TestSyncManagerTypes(unittest.TestCase): @@ -213,5 +214,8 @@ def tearDownModule(): reap_children() +test._test_multiprocessing.install_tests_in_module_dict(globals(), 'managers') + + if __name__ == '__main__': unittest.main() From 05d9c33011a0f0ce31dd61b47655898d30953181 Mon Sep 17 00:00:00 2001 From: Giampaolo Rodola Date: Wed, 6 Feb 2019 21:38:43 +0100 Subject: [PATCH 06/11] move tests back into _test_multiprocessing and use @classmethod in order to make the worker object accessible/serializable on Windows --- Lib/test/_test_multiprocessing.py | 219 +++++++++++++++++++++ Lib/test/test_multiprocessing_managers.py | 221 ---------------------- 2 files changed, 219 insertions(+), 221 deletions(-) delete mode 100644 Lib/test/test_multiprocessing_managers.py diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 7341131231a4f0..e050aa32a51c81 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -4706,6 +4706,225 @@ def is_alive(self): any(process.is_alive() for process in forked_processes)) +class TestSyncManagerTypes(unittest.TestCase): + """Test all the types which can be shared between a parent and a + child process by using a manager which acts as an intermediary + between them. + + In the following unit-tests the base type is created in the parent + process, the @classmethod represents the worker process and the + shared object is readable and editable between the two. + + # The child. + @classmethod + def _test_list(cls, obj): + assert obj[0] == 5 + assert obj.append(6) + + # The parent. + def test_list(self): + o = self.manager.list() + o.append(5) + self.run_test(self._test_list, o) + assert o[1] == 6 + """ + manager_class = multiprocessing.managers.SyncManager + + def setUp(self): + self.manager = self.manager_class() + self.manager.start() + self.proc = None + + def tearDown(self): + self.manager.shutdown() + if self.proc is not None and self.proc.is_alive(): + self.proc.terminate() + self.proc.join() + + @classmethod + def tearDownClass(cls): + support.reap_children() + + def run_test(self, worker, obj): + self.proc = multiprocessing.Process(target=worker, args=(obj, )) + self.proc.start() + self.proc.join() + self.assertEqual(self.proc.exitcode, 0) + + @classmethod + def _test_queue(cls, obj): + assert obj.qsize() == 2 + assert obj.full() + assert not obj.empty() + assert obj.get() == 5 + assert not obj.empty() + assert obj.get() == 6 + assert obj.empty() + + def test_queue(self, qname="Queue"): + o = getattr(self.manager, qname)(2) + o.put(5) + o.put(6) + self.run_test(self._test_queue, o) + assert o.empty() + assert not o.full() + + def test_joinable_queue(self): + self.test_queue("JoinableQueue") + + @classmethod + def _test_event(cls, obj): + assert obj.is_set() + obj.wait() + obj.clear() + obj.wait(0.001) + + def test_event(self): + o = self.manager.Event() + o.set() + self.run_test(self._test_event, o) + assert not o.is_set() + o.wait(0.001) + + @classmethod + def _test_lock(cls, obj): + obj.acquire() + + def test_lock(self, lname="Lock"): + o = getattr(self.manager, lname)() + self.run_test(self._test_lock, o) + o.release() + self.assertRaises(RuntimeError, o.release) # already released + + def test_rlock(self): + self.test_lock(lname="RLock") + + @classmethod + def _test_semaphore(cls, obj): + obj.acquire() + + def test_semaphore(self, sname="Semaphore"): + o = getattr(self.manager, sname)() + self.run_test(self._test_semaphore, o) + o.release() + + def test_bounded_semaphore(self): + self.test_semaphore(sname="BoundedSemaphore") + + @classmethod + def _test_condition(cls, obj): + obj.acquire() + + def test_condition(self): + o = self.manager.Condition() + self.run_test(self._test_condition, o) + o.release() + self.assertRaises(RuntimeError, o.release) # already released + + @classmethod + def _test_barrier(cls, obj): + assert obj.parties == 5 + obj.reset() + + def test_barrier(self): + o = self.manager.Barrier(5) + self.run_test(self._test_barrier, o) + + @classmethod + def _test_pool(cls, obj): + # TODO: fix https://bugs.python.org/issue35919 + with obj: + pass + + def test_pool(self): + o = self.manager.Pool(processes=4) + self.run_test(self._test_pool, o) + + @classmethod + def _test_list(cls, obj): + assert obj[0] == 5 + assert obj.count(5) == 1 + assert obj.index(5) == 0 + obj.sort() + obj.reverse() + for x in obj: + pass + assert len(obj) == 1 + assert obj.pop(0) == 5 + + def test_list(self): + o = self.manager.list() + o.append(5) + self.run_test(self._test_list, o) + assert not o + self.assertEqual(len(o), 0) + + @classmethod + def _test_dict(cls, obj): + assert len(obj) == 1 + assert obj['foo'] == 5 + assert obj.get('foo') == 5 + # TODO: fix https://bugs.python.org/issue35918 + # assert obj.has_key('foo') + assert list(obj.items()) == [('foo', 5)] + assert list(obj.keys()) == ['foo'] + assert list(obj.values()) == [5] + assert obj.copy() == {'foo': 5} + assert obj.popitem() == ('foo', 5) + + def test_dict(self): + o = self.manager.dict() + o['foo'] = 5 + self.run_test(self._test_dict, o) + assert not o + self.assertEqual(len(o), 0) + + @classmethod + def _test_value(cls, obj): + assert obj.value == 1 + assert obj.get() == 1 + obj.set(2) + + def test_value(self): + o = self.manager.Value('i', 1) + self.run_test(self._test_value, o) + self.assertEqual(o.value, 2) + self.assertEqual(o.get(), 2) + + @classmethod + def _test_array(cls, obj): + assert obj[0] == 0 + assert obj[1] == 1 + assert len(obj) == 2 + assert list(obj) == [0, 1] + + def test_array(self): + o = self.manager.Array('i', [0, 1]) + self.run_test(self._test_array, o) + + @classmethod + def _test_namespace(cls, obj): + assert obj.x == 0 + assert obj.y == 1 + + def test_namespace(self): + o = self.manager.Namespace() + o.x = 0 + o.y = 1 + self.run_test(self._test_namespace, o) + + +try: + import multiprocessing.shared_memory +except ImportError: + @unittest.skip("SharedMemoryManager not available on this platform") + class TestSharedMemoryManagerTypes(TestSyncManagerTypes): + pass +else: + class TestSharedMemoryManagerTypes(TestSyncManagerTypes): + """Same as above but by using SharedMemoryManager.""" + manager_class = multiprocessing.shared_memory.SharedMemoryManager + class MiscTestCase(unittest.TestCase): def test__all__(self): diff --git a/Lib/test/test_multiprocessing_managers.py b/Lib/test/test_multiprocessing_managers.py deleted file mode 100644 index 83b93fff171be6..00000000000000 --- a/Lib/test/test_multiprocessing_managers.py +++ /dev/null @@ -1,221 +0,0 @@ -from multiprocessing import Process -from multiprocessing.managers import SyncManager -from test.support import reap_children -import test._test_multiprocessing -import unittest - - -class TestSyncManagerTypes(unittest.TestCase): - """Test all the types which can be shared between parent and child - process by using a manager which acts as an intermediary between - the two. - - In the following tests the base type is created in the parent - process. "worker" function represents how the base type is - received (and handled) from within the child process. E.g.: - - def test_list(self): - def worker(obj): - # === within the child process === - assert obj[0] == 1 - - # === within the parent process === - o = self.manager.list() - o.append(1) - self.run_test(worker, o) - """ - manager_class = SyncManager - - def setUp(self): - self.manager = self.manager_class() - self.manager.start() - self.proc = None - - def tearDown(self): - self.manager.shutdown() - if self.proc is not None and self.proc.is_alive(): - self.proc.terminate() - self.proc.join() - - @classmethod - def tearDownClass(cls): - reap_children() - - def run_test(self, worker, obj): - self.proc = Process(target=worker, args=(obj, )) - self.proc.start() - self.proc.join() - self.assertEqual(self.proc.exitcode, 0) - - def test_queue(self, qname="Queue"): - def worker(obj): - assert obj.qsize() == 2 - assert obj.full() - assert not obj.empty() - assert obj.get() == 5 - assert not obj.empty() - assert obj.get() == 6 - assert obj.empty() - - o = getattr(self.manager, qname)(2) - o.put(5) - o.put(6) - self.run_test(worker, o) - assert o.empty() - assert not o.full() - - def test_joinable_queue(self): - self.test_queue("JoinableQueue") - - def test_event(self): - def worker(obj): - assert obj.is_set() - obj.wait() - obj.clear() - obj.wait(0.001) - - o = self.manager.Event() - o.set() - self.run_test(worker, o) - assert not o.is_set() - o.wait(0.001) - - def test_lock(self, lname="Lock"): - def worker(obj): - o.acquire() - - o = getattr(self.manager, lname)() - self.run_test(worker, o) - o.release() - self.assertRaises(RuntimeError, o.release) # already released - - def test_rlock(self): - self.test_lock(lname="RLock") - - def test_semaphore(self, sname="Semaphore"): - def worker(obj): - obj.acquire() - - o = getattr(self.manager, sname)() - self.run_test(worker, o) - o.release() - - def test_bounded_semaphore(self): - self.test_semaphore(sname="BoundedSemaphore") - - def test_condition(self): - def worker(obj): - obj.acquire() - - o = self.manager.Condition() - self.run_test(worker, o) - o.release() - self.assertRaises(RuntimeError, o.release) # already released - - def test_barrier(self): - def worker(obj): - assert obj.parties == 5 - obj.reset() - - o = self.manager.Barrier(5) - self.run_test(worker, o) - - def test_pool(self): - def worker(obj): - # TODO: fix https://bugs.python.org/issue35919 - with obj: - pass - - o = self.manager.Pool(processes=4) - self.run_test(worker, o) - - def test_list(self): - def worker(obj): - assert obj[0] == 5 - assert obj.count(5) == 1 - assert obj.index(5) == 0 - obj.sort() - obj.reverse() - for x in obj: - pass - assert len(obj) == 1 - assert obj.pop(0) == 5 - - o = self.manager.list() - o.append(5) - self.run_test(worker, o) - assert not o - self.assertEqual(len(o), 0) - - def test_dict(self): - def worker(obj): - assert len(obj) == 1 - assert obj['foo'] == 5 - assert obj.get('foo') == 5 - # TODO: fix https://bugs.python.org/issue35918 - # assert obj.has_key('foo') - assert list(obj.items()) == [('foo', 5)] - assert list(obj.keys()) == ['foo'] - assert list(obj.values()) == [5] - assert obj.copy() == {'foo': 5} - assert obj.popitem() == ('foo', 5) - - o = self.manager.dict() - o['foo'] = 5 - self.run_test(worker, o) - assert not o - self.assertEqual(len(o), 0) - - def test_value(self): - def worker(obj): - assert obj.value == 1 - assert obj.get() == 1 - obj.set(2) - - o = self.manager.Value('i', 1) - self.run_test(worker, o) - self.assertEqual(o.value, 2) - self.assertEqual(o.get(), 2) - - def test_array(self): - def worker(obj): - assert obj[0] == 0 - assert obj[1] == 1 - assert len(obj) == 2 - assert list(obj) == [0, 1] - - o = self.manager.Array('i', [0, 1]) - self.run_test(worker, o) - - def test_namespace(self): - def worker(obj): - assert obj.x == 0 - assert obj.y == 1 - - o = self.manager.Namespace() - o.x = 0 - o.y = 1 - self.run_test(worker, o) - - -try: - from multiprocessing.shared_memory import SharedMemoryManager -except ImportError: - @unittest.skip("SharedMemoryManager not available on this platform") - class TestSharedMemoryManagerTypes(TestSyncManagerTypes): - pass -else: - class TestSharedMemoryManagerTypes(TestSyncManagerTypes): - """Same as above but by using SharedMemoryManager.""" - manager_class = SharedMemoryManager - - -def tearDownModule(): - reap_children() - - -test._test_multiprocessing.install_tests_in_module_dict(globals(), 'managers') - - -if __name__ == '__main__': - unittest.main() From a4ac0201f7bf90888be72b280733d200c8fb413a Mon Sep 17 00:00:00 2001 From: Giampaolo Rodola Date: Wed, 6 Feb 2019 23:58:03 +0100 Subject: [PATCH 07/11] try to fix win/azure failure by using join_process() utility fun --- Lib/test/_test_multiprocessing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index e050aa32a51c81..3f6723965643bd 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -4748,7 +4748,7 @@ def tearDownClass(cls): def run_test(self, worker, obj): self.proc = multiprocessing.Process(target=worker, args=(obj, )) self.proc.start() - self.proc.join() + join_process(self.proc) self.assertEqual(self.proc.exitcode, 0) @classmethod From 6b8836a4dcbc9945d4d05f38472d6bed00132d6e Mon Sep 17 00:00:00 2001 From: Giampaolo Rodola Date: Thu, 7 Feb 2019 00:16:21 +0100 Subject: [PATCH 08/11] rename method --- Lib/test/_test_multiprocessing.py | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 3f6723965643bd..7c36038e3583cf 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -4725,7 +4725,7 @@ def _test_list(cls, obj): def test_list(self): o = self.manager.list() o.append(5) - self.run_test(self._test_list, o) + self.run_worker(self._test_list, o) assert o[1] == 6 """ manager_class = multiprocessing.managers.SyncManager @@ -4736,17 +4736,18 @@ def setUp(self): self.proc = None def tearDown(self): - self.manager.shutdown() if self.proc is not None and self.proc.is_alive(): self.proc.terminate() self.proc.join() + self.manager.shutdown() @classmethod def tearDownClass(cls): support.reap_children() - def run_test(self, worker, obj): + def run_worker(self, worker, obj): self.proc = multiprocessing.Process(target=worker, args=(obj, )) + self.proc.daemon = True self.proc.start() join_process(self.proc) self.assertEqual(self.proc.exitcode, 0) @@ -4765,7 +4766,7 @@ def test_queue(self, qname="Queue"): o = getattr(self.manager, qname)(2) o.put(5) o.put(6) - self.run_test(self._test_queue, o) + self.run_worker(self._test_queue, o) assert o.empty() assert not o.full() @@ -4782,7 +4783,7 @@ def _test_event(cls, obj): def test_event(self): o = self.manager.Event() o.set() - self.run_test(self._test_event, o) + self.run_worker(self._test_event, o) assert not o.is_set() o.wait(0.001) @@ -4792,7 +4793,7 @@ def _test_lock(cls, obj): def test_lock(self, lname="Lock"): o = getattr(self.manager, lname)() - self.run_test(self._test_lock, o) + self.run_worker(self._test_lock, o) o.release() self.assertRaises(RuntimeError, o.release) # already released @@ -4805,7 +4806,7 @@ def _test_semaphore(cls, obj): def test_semaphore(self, sname="Semaphore"): o = getattr(self.manager, sname)() - self.run_test(self._test_semaphore, o) + self.run_worker(self._test_semaphore, o) o.release() def test_bounded_semaphore(self): @@ -4817,7 +4818,7 @@ def _test_condition(cls, obj): def test_condition(self): o = self.manager.Condition() - self.run_test(self._test_condition, o) + self.run_worker(self._test_condition, o) o.release() self.assertRaises(RuntimeError, o.release) # already released @@ -4828,7 +4829,7 @@ def _test_barrier(cls, obj): def test_barrier(self): o = self.manager.Barrier(5) - self.run_test(self._test_barrier, o) + self.run_worker(self._test_barrier, o) @classmethod def _test_pool(cls, obj): @@ -4838,7 +4839,7 @@ def _test_pool(cls, obj): def test_pool(self): o = self.manager.Pool(processes=4) - self.run_test(self._test_pool, o) + self.run_worker(self._test_pool, o) @classmethod def _test_list(cls, obj): @@ -4855,7 +4856,7 @@ def _test_list(cls, obj): def test_list(self): o = self.manager.list() o.append(5) - self.run_test(self._test_list, o) + self.run_worker(self._test_list, o) assert not o self.assertEqual(len(o), 0) @@ -4875,7 +4876,7 @@ def _test_dict(cls, obj): def test_dict(self): o = self.manager.dict() o['foo'] = 5 - self.run_test(self._test_dict, o) + self.run_worker(self._test_dict, o) assert not o self.assertEqual(len(o), 0) @@ -4887,7 +4888,7 @@ def _test_value(cls, obj): def test_value(self): o = self.manager.Value('i', 1) - self.run_test(self._test_value, o) + self.run_worker(self._test_value, o) self.assertEqual(o.value, 2) self.assertEqual(o.get(), 2) @@ -4900,7 +4901,7 @@ def _test_array(cls, obj): def test_array(self): o = self.manager.Array('i', [0, 1]) - self.run_test(self._test_array, o) + self.run_worker(self._test_array, o) @classmethod def _test_namespace(cls, obj): @@ -4911,7 +4912,7 @@ def test_namespace(self): o = self.manager.Namespace() o.x = 0 o.y = 1 - self.run_test(self._test_namespace, o) + self.run_worker(self._test_namespace, o) try: From 8c10bc4e75d9748e63ce86ce785955bf378546b3 Mon Sep 17 00:00:00 2001 From: Giampaolo Rodola Date: Thu, 7 Feb 2019 01:05:02 +0100 Subject: [PATCH 09/11] attempt adding synchronization primitive to wait for the subprocess to exit --- Lib/test/_test_multiprocessing.py | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 7c36038e3583cf..b57f5a125ce672 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -4742,14 +4742,35 @@ def tearDown(self): self.manager.shutdown() @classmethod - def tearDownClass(cls): + def setUpClass(cls): support.reap_children() + tearDownClass = setUpClass + + def wait_proc_exit(self): + # Only the manager process should be returned by active_children() + # but this can take a bit on slow machines, so wait a few seconds + # if there are other children too (see #17395). + join_process(self.proc) + start_time = time.monotonic() + t = 0.01 + while len(multiprocessing.active_children()) > 1: + time.sleep(t) + t *= 2 + dt = time.monotonic() - start_time + if dt >= 5.0: + test.support.environment_altered = True + print("Warning -- multiprocessing.Manager still has %s active " + "children after %s seconds" + % (multiprocessing.active_children(), dt), + file=sys.stderr) + break + def run_worker(self, worker, obj): self.proc = multiprocessing.Process(target=worker, args=(obj, )) self.proc.daemon = True self.proc.start() - join_process(self.proc) + self.wait_proc_exit() self.assertEqual(self.proc.exitcode, 0) @classmethod From 638dd490e5c787cbc7a4a197bdec16bc3e0e4718 Mon Sep 17 00:00:00 2001 From: Giampaolo Rodola Date: Thu, 7 Feb 2019 01:24:44 +0100 Subject: [PATCH 10/11] sync issue: try using a sleep() on windows --- Lib/test/_test_multiprocessing.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index b57f5a125ce672..d0a348a68fbcca 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -4815,6 +4815,9 @@ def _test_lock(cls, obj): def test_lock(self, lname="Lock"): o = getattr(self.manager, lname)() self.run_worker(self._test_lock, o) + # See: https://ci.appveyor.com/project/python/cpython/builds/22183338 + if os.name == 'nt': + time.sleep(0.3) o.release() self.assertRaises(RuntimeError, o.release) # already released @@ -4840,6 +4843,9 @@ def _test_condition(cls, obj): def test_condition(self): o = self.manager.Condition() self.run_worker(self._test_condition, o) + # See: https://ci.appveyor.com/project/python/cpython/builds/22183338 + if os.name == 'nt': + time.sleep(0.3) o.release() self.assertRaises(RuntimeError, o.release) # already released From ff613484095b47a205ea116372cecd5c68b5613c Mon Sep 17 00:00:00 2001 From: Giampaolo Rodola Date: Thu, 7 Feb 2019 10:46:29 +0100 Subject: [PATCH 11/11] Treat Lock and RLock differently. See: https://github.com/python/cpython/pull/11772#issuecomment-461343609 --- Lib/test/_test_multiprocessing.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index d0a348a68fbcca..2f839b952126a3 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -4815,14 +4815,17 @@ def _test_lock(cls, obj): def test_lock(self, lname="Lock"): o = getattr(self.manager, lname)() self.run_worker(self._test_lock, o) - # See: https://ci.appveyor.com/project/python/cpython/builds/22183338 - if os.name == 'nt': - time.sleep(0.3) o.release() self.assertRaises(RuntimeError, o.release) # already released - def test_rlock(self): - self.test_lock(lname="RLock") + @classmethod + def _test_rlock(cls, obj): + obj.acquire() + obj.release() + + def test_rlock(self, lname="Lock"): + o = getattr(self.manager, lname)() + self.run_worker(self._test_rlock, o) @classmethod def _test_semaphore(cls, obj): @@ -4839,15 +4842,11 @@ def test_bounded_semaphore(self): @classmethod def _test_condition(cls, obj): obj.acquire() + obj.release() def test_condition(self): o = self.manager.Condition() self.run_worker(self._test_condition, o) - # See: https://ci.appveyor.com/project/python/cpython/builds/22183338 - if os.name == 'nt': - time.sleep(0.3) - o.release() - self.assertRaises(RuntimeError, o.release) # already released @classmethod def _test_barrier(cls, obj):