Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 246 additions & 0 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4706,6 +4706,252 @@ def is_alive(self):
any(process.is_alive() for process in forked_processes))


class TestSyncManagerTypes(unittest.TestCase):
"""Test all the types which can be shared between a parent and a
child process by using a manager which acts as an intermediary
between them.

In the following unit-tests the base type is created in the parent
process, the @classmethod represents the worker process and the
shared object is readable and editable between the two.

# The child.
@classmethod
def _test_list(cls, obj):
assert obj[0] == 5
assert obj.append(6)

# The parent.
def test_list(self):
o = self.manager.list()
o.append(5)
self.run_worker(self._test_list, o)
assert o[1] == 6
"""
manager_class = multiprocessing.managers.SyncManager

def setUp(self):
self.manager = self.manager_class()
self.manager.start()
self.proc = None

def tearDown(self):
if self.proc is not None and self.proc.is_alive():
self.proc.terminate()
self.proc.join()
self.manager.shutdown()

@classmethod
def setUpClass(cls):
support.reap_children()

tearDownClass = setUpClass

def wait_proc_exit(self):
# Only the manager process should be returned by active_children()
# but this can take a bit on slow machines, so wait a few seconds
# if there are other children too (see #17395).
join_process(self.proc)
start_time = time.monotonic()
t = 0.01
while len(multiprocessing.active_children()) > 1:
time.sleep(t)
t *= 2
dt = time.monotonic() - start_time
if dt >= 5.0:
test.support.environment_altered = True
print("Warning -- multiprocessing.Manager still has %s active "
"children after %s seconds"
% (multiprocessing.active_children(), dt),
file=sys.stderr)
break

def run_worker(self, worker, obj):
self.proc = multiprocessing.Process(target=worker, args=(obj, ))
self.proc.daemon = True
self.proc.start()
self.wait_proc_exit()
self.assertEqual(self.proc.exitcode, 0)

@classmethod
def _test_queue(cls, obj):
assert obj.qsize() == 2
assert obj.full()
assert not obj.empty()
assert obj.get() == 5
assert not obj.empty()
assert obj.get() == 6
assert obj.empty()

def test_queue(self, qname="Queue"):
o = getattr(self.manager, qname)(2)
o.put(5)
o.put(6)
self.run_worker(self._test_queue, o)
assert o.empty()
assert not o.full()

def test_joinable_queue(self):
self.test_queue("JoinableQueue")

@classmethod
def _test_event(cls, obj):
assert obj.is_set()
obj.wait()
obj.clear()
obj.wait(0.001)

def test_event(self):
o = self.manager.Event()
o.set()
self.run_worker(self._test_event, o)
assert not o.is_set()
o.wait(0.001)

@classmethod
def _test_lock(cls, obj):
obj.acquire()

def test_lock(self, lname="Lock"):
o = getattr(self.manager, lname)()
self.run_worker(self._test_lock, o)
o.release()
self.assertRaises(RuntimeError, o.release) # already released

@classmethod
def _test_rlock(cls, obj):
obj.acquire()
obj.release()

def test_rlock(self, lname="Lock"):
o = getattr(self.manager, lname)()
self.run_worker(self._test_rlock, o)

@classmethod
def _test_semaphore(cls, obj):
obj.acquire()

def test_semaphore(self, sname="Semaphore"):
o = getattr(self.manager, sname)()
self.run_worker(self._test_semaphore, o)
o.release()

def test_bounded_semaphore(self):
self.test_semaphore(sname="BoundedSemaphore")

@classmethod
def _test_condition(cls, obj):
obj.acquire()
obj.release()

def test_condition(self):
o = self.manager.Condition()
self.run_worker(self._test_condition, o)

@classmethod
def _test_barrier(cls, obj):
assert obj.parties == 5
obj.reset()

def test_barrier(self):
o = self.manager.Barrier(5)
self.run_worker(self._test_barrier, o)

@classmethod
def _test_pool(cls, obj):
# TODO: fix https://bugs.python.org/issue35919
with obj:
pass

def test_pool(self):
o = self.manager.Pool(processes=4)
self.run_worker(self._test_pool, o)

@classmethod
def _test_list(cls, obj):
assert obj[0] == 5
assert obj.count(5) == 1
assert obj.index(5) == 0
obj.sort()
obj.reverse()
for x in obj:
pass
assert len(obj) == 1
assert obj.pop(0) == 5

def test_list(self):
o = self.manager.list()
o.append(5)
self.run_worker(self._test_list, o)
assert not o
self.assertEqual(len(o), 0)

@classmethod
def _test_dict(cls, obj):
assert len(obj) == 1
assert obj['foo'] == 5
assert obj.get('foo') == 5
# TODO: fix https://bugs.python.org/issue35918
# assert obj.has_key('foo')
assert list(obj.items()) == [('foo', 5)]
assert list(obj.keys()) == ['foo']
assert list(obj.values()) == [5]
assert obj.copy() == {'foo': 5}
assert obj.popitem() == ('foo', 5)

def test_dict(self):
o = self.manager.dict()
o['foo'] = 5
self.run_worker(self._test_dict, o)
assert not o
self.assertEqual(len(o), 0)

@classmethod
def _test_value(cls, obj):
assert obj.value == 1
assert obj.get() == 1
obj.set(2)

def test_value(self):
o = self.manager.Value('i', 1)
self.run_worker(self._test_value, o)
self.assertEqual(o.value, 2)
self.assertEqual(o.get(), 2)

@classmethod
def _test_array(cls, obj):
assert obj[0] == 0
assert obj[1] == 1
assert len(obj) == 2
assert list(obj) == [0, 1]

def test_array(self):
o = self.manager.Array('i', [0, 1])
self.run_worker(self._test_array, o)

@classmethod
def _test_namespace(cls, obj):
assert obj.x == 0
assert obj.y == 1

def test_namespace(self):
o = self.manager.Namespace()
o.x = 0
o.y = 1
self.run_worker(self._test_namespace, o)


try:
import multiprocessing.shared_memory
except ImportError:
@unittest.skip("SharedMemoryManager not available on this platform")
class TestSharedMemoryManagerTypes(TestSyncManagerTypes):
pass
else:
class TestSharedMemoryManagerTypes(TestSyncManagerTypes):
"""Same as above but by using SharedMemoryManager."""
manager_class = multiprocessing.shared_memory.SharedMemoryManager


class MiscTestCase(unittest.TestCase):
def test__all__(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
multiprocessing: provide unit tests for SyncManager and SharedMemoryManager
classes + all the shareable types which are supposed to be supported by
them. (patch by Giampaolo Rodola)