Skip to content

Commit 2848d9d

Browse files
giampaolopitrou
authored andcommitted
bpo-35917: Test multiprocessing manager classes and shareable types (GH-11772)
multiprocessing: provide unittests for manager classes and shareable types
1 parent bc09851 commit 2848d9d

File tree

2 files changed

+249
-0
lines changed

2 files changed

+249
-0
lines changed

Lib/test/_test_multiprocessing.py

Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4706,6 +4706,252 @@ def is_alive(self):
47064706
any(process.is_alive() for process in forked_processes))
47074707

47084708

4709+
class TestSyncManagerTypes(unittest.TestCase):
4710+
"""Test all the types which can be shared between a parent and a
4711+
child process by using a manager which acts as an intermediary
4712+
between them.
4713+
4714+
In the following unit-tests the base type is created in the parent
4715+
process, the @classmethod represents the worker process and the
4716+
shared object is readable and editable between the two.
4717+
4718+
# The child.
4719+
@classmethod
4720+
def _test_list(cls, obj):
4721+
assert obj[0] == 5
4722+
assert obj.append(6)
4723+
4724+
# The parent.
4725+
def test_list(self):
4726+
o = self.manager.list()
4727+
o.append(5)
4728+
self.run_worker(self._test_list, o)
4729+
assert o[1] == 6
4730+
"""
4731+
manager_class = multiprocessing.managers.SyncManager
4732+
4733+
def setUp(self):
4734+
self.manager = self.manager_class()
4735+
self.manager.start()
4736+
self.proc = None
4737+
4738+
def tearDown(self):
4739+
if self.proc is not None and self.proc.is_alive():
4740+
self.proc.terminate()
4741+
self.proc.join()
4742+
self.manager.shutdown()
4743+
4744+
@classmethod
4745+
def setUpClass(cls):
4746+
support.reap_children()
4747+
4748+
tearDownClass = setUpClass
4749+
4750+
def wait_proc_exit(self):
4751+
# Only the manager process should be returned by active_children()
4752+
# but this can take a bit on slow machines, so wait a few seconds
4753+
# if there are other children too (see #17395).
4754+
join_process(self.proc)
4755+
start_time = time.monotonic()
4756+
t = 0.01
4757+
while len(multiprocessing.active_children()) > 1:
4758+
time.sleep(t)
4759+
t *= 2
4760+
dt = time.monotonic() - start_time
4761+
if dt >= 5.0:
4762+
test.support.environment_altered = True
4763+
print("Warning -- multiprocessing.Manager still has %s active "
4764+
"children after %s seconds"
4765+
% (multiprocessing.active_children(), dt),
4766+
file=sys.stderr)
4767+
break
4768+
4769+
def run_worker(self, worker, obj):
4770+
self.proc = multiprocessing.Process(target=worker, args=(obj, ))
4771+
self.proc.daemon = True
4772+
self.proc.start()
4773+
self.wait_proc_exit()
4774+
self.assertEqual(self.proc.exitcode, 0)
4775+
4776+
@classmethod
4777+
def _test_queue(cls, obj):
4778+
assert obj.qsize() == 2
4779+
assert obj.full()
4780+
assert not obj.empty()
4781+
assert obj.get() == 5
4782+
assert not obj.empty()
4783+
assert obj.get() == 6
4784+
assert obj.empty()
4785+
4786+
def test_queue(self, qname="Queue"):
4787+
o = getattr(self.manager, qname)(2)
4788+
o.put(5)
4789+
o.put(6)
4790+
self.run_worker(self._test_queue, o)
4791+
assert o.empty()
4792+
assert not o.full()
4793+
4794+
def test_joinable_queue(self):
4795+
self.test_queue("JoinableQueue")
4796+
4797+
@classmethod
4798+
def _test_event(cls, obj):
4799+
assert obj.is_set()
4800+
obj.wait()
4801+
obj.clear()
4802+
obj.wait(0.001)
4803+
4804+
def test_event(self):
4805+
o = self.manager.Event()
4806+
o.set()
4807+
self.run_worker(self._test_event, o)
4808+
assert not o.is_set()
4809+
o.wait(0.001)
4810+
4811+
@classmethod
4812+
def _test_lock(cls, obj):
4813+
obj.acquire()
4814+
4815+
def test_lock(self, lname="Lock"):
4816+
o = getattr(self.manager, lname)()
4817+
self.run_worker(self._test_lock, o)
4818+
o.release()
4819+
self.assertRaises(RuntimeError, o.release) # already released
4820+
4821+
@classmethod
4822+
def _test_rlock(cls, obj):
4823+
obj.acquire()
4824+
obj.release()
4825+
4826+
def test_rlock(self, lname="Lock"):
4827+
o = getattr(self.manager, lname)()
4828+
self.run_worker(self._test_rlock, o)
4829+
4830+
@classmethod
4831+
def _test_semaphore(cls, obj):
4832+
obj.acquire()
4833+
4834+
def test_semaphore(self, sname="Semaphore"):
4835+
o = getattr(self.manager, sname)()
4836+
self.run_worker(self._test_semaphore, o)
4837+
o.release()
4838+
4839+
def test_bounded_semaphore(self):
4840+
self.test_semaphore(sname="BoundedSemaphore")
4841+
4842+
@classmethod
4843+
def _test_condition(cls, obj):
4844+
obj.acquire()
4845+
obj.release()
4846+
4847+
def test_condition(self):
4848+
o = self.manager.Condition()
4849+
self.run_worker(self._test_condition, o)
4850+
4851+
@classmethod
4852+
def _test_barrier(cls, obj):
4853+
assert obj.parties == 5
4854+
obj.reset()
4855+
4856+
def test_barrier(self):
4857+
o = self.manager.Barrier(5)
4858+
self.run_worker(self._test_barrier, o)
4859+
4860+
@classmethod
4861+
def _test_pool(cls, obj):
4862+
# TODO: fix https://bugs.python.org/issue35919
4863+
with obj:
4864+
pass
4865+
4866+
def test_pool(self):
4867+
o = self.manager.Pool(processes=4)
4868+
self.run_worker(self._test_pool, o)
4869+
4870+
@classmethod
4871+
def _test_list(cls, obj):
4872+
assert obj[0] == 5
4873+
assert obj.count(5) == 1
4874+
assert obj.index(5) == 0
4875+
obj.sort()
4876+
obj.reverse()
4877+
for x in obj:
4878+
pass
4879+
assert len(obj) == 1
4880+
assert obj.pop(0) == 5
4881+
4882+
def test_list(self):
4883+
o = self.manager.list()
4884+
o.append(5)
4885+
self.run_worker(self._test_list, o)
4886+
assert not o
4887+
self.assertEqual(len(o), 0)
4888+
4889+
@classmethod
4890+
def _test_dict(cls, obj):
4891+
assert len(obj) == 1
4892+
assert obj['foo'] == 5
4893+
assert obj.get('foo') == 5
4894+
# TODO: fix https://bugs.python.org/issue35918
4895+
# assert obj.has_key('foo')
4896+
assert list(obj.items()) == [('foo', 5)]
4897+
assert list(obj.keys()) == ['foo']
4898+
assert list(obj.values()) == [5]
4899+
assert obj.copy() == {'foo': 5}
4900+
assert obj.popitem() == ('foo', 5)
4901+
4902+
def test_dict(self):
4903+
o = self.manager.dict()
4904+
o['foo'] = 5
4905+
self.run_worker(self._test_dict, o)
4906+
assert not o
4907+
self.assertEqual(len(o), 0)
4908+
4909+
@classmethod
4910+
def _test_value(cls, obj):
4911+
assert obj.value == 1
4912+
assert obj.get() == 1
4913+
obj.set(2)
4914+
4915+
def test_value(self):
4916+
o = self.manager.Value('i', 1)
4917+
self.run_worker(self._test_value, o)
4918+
self.assertEqual(o.value, 2)
4919+
self.assertEqual(o.get(), 2)
4920+
4921+
@classmethod
4922+
def _test_array(cls, obj):
4923+
assert obj[0] == 0
4924+
assert obj[1] == 1
4925+
assert len(obj) == 2
4926+
assert list(obj) == [0, 1]
4927+
4928+
def test_array(self):
4929+
o = self.manager.Array('i', [0, 1])
4930+
self.run_worker(self._test_array, o)
4931+
4932+
@classmethod
4933+
def _test_namespace(cls, obj):
4934+
assert obj.x == 0
4935+
assert obj.y == 1
4936+
4937+
def test_namespace(self):
4938+
o = self.manager.Namespace()
4939+
o.x = 0
4940+
o.y = 1
4941+
self.run_worker(self._test_namespace, o)
4942+
4943+
4944+
try:
4945+
import multiprocessing.shared_memory
4946+
except ImportError:
4947+
@unittest.skip("SharedMemoryManager not available on this platform")
4948+
class TestSharedMemoryManagerTypes(TestSyncManagerTypes):
4949+
pass
4950+
else:
4951+
class TestSharedMemoryManagerTypes(TestSyncManagerTypes):
4952+
"""Same as above but by using SharedMemoryManager."""
4953+
manager_class = multiprocessing.shared_memory.SharedMemoryManager
4954+
47094955

47104956
class MiscTestCase(unittest.TestCase):
47114957
def test__all__(self):
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
multiprocessing: provide unit tests for SyncManager and SharedMemoryManager
2+
classes + all the shareable types which are supposed to be supported by
3+
them. (patch by Giampaolo Rodola)

0 commit comments

Comments
 (0)