Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
0fa37cc
Add failing star tests
ForeverWintr Jul 16, 2017
1d3e7f3
Add more tests
ForeverWintr Jul 16, 2017
4097365
Fix order of operations in test
ForeverWintr Jul 16, 2017
51a6283
Add some documentation to FunctionMerge
ForeverWintr Jul 16, 2017
45a4b57
Partial implementation of BroadcastMerge
ForeverWintr Jul 16, 2017
c089e85
renamed: test_star.py -> test_broadcast.py
ForeverWintr Jul 16, 2017
f6a0ee0
Implement star loop test
ForeverWintr Jul 16, 2017
6b06a48
Make star upgrade regular functions
ForeverWintr Jul 16, 2017
e77ff24
Implement len_mismatch test
ForeverWintr Jul 20, 2017
88f92ff
Add some tests for the new star implementation
ForeverWintr Jul 23, 2017
fa85e7c
Clean up SimpleFunction __str__ by adding _name
ForeverWintr Jul 23, 2017
dc01315
Temporarily skip broadcast tests
ForeverWintr Jul 23, 2017
debcec2
Add more expected star str
ForeverWintr Jul 23, 2017
fe3c7d5
Merge branch 'dev' into new_star
ForeverWintr Jul 25, 2017
7f11b6e
Implement star decorator
ForeverWintr Jul 25, 2017
2802b83
Remove broadcast
ForeverWintr Jul 25, 2017
db9bd8a
WIP on converting broadcast to use star
ForeverWintr Jul 25, 2017
0eeefd1
Distribute old broadcast tests to correct modules
ForeverWintr Jul 26, 2017
eed294d
Remove BroadcastChain
ForeverWintr Jul 26, 2017
257410c
Convert test_broadcast_chain to test_at_operator
ForeverWintr Jul 26, 2017
a7f0b6d
Move broadcast tests to test_at_operator
ForeverWintr Jul 26, 2017
4a96b8e
Fix FunctionMerge tests
ForeverWintr Jul 26, 2017
7fea6b7
Extract iterator creation to its own method
ForeverWintr Jul 26, 2017
9883513
Fix star tests
ForeverWintr Jul 26, 2017
d61adb5
Update concurrent to __call__ like MergeFunc
ForeverWintr Jul 26, 2017
21a3869
Add some more test code
ForeverWintr Jul 26, 2017
6ede818
Coverage golf
ForeverWintr Jul 26, 2017
e26105d
Fix concurrent enumeration for order maintenance.
ForeverWintr Jul 26, 2017
38685ed
Actually fix the order error for real this time
ForeverWintr Jul 26, 2017
1c61911
Remove redundant `if call_state`
ForeverWintr Jul 26, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions metafunctions/concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,22 @@

from metafunctions.core import FunctionMerge
from metafunctions.core import inject_call_state
from metafunctions.exceptions import ConcurrentException
from metafunctions import exceptions

_no_value = object()

class ConcurrentMerge(FunctionMerge):
def __init__(self, function_merge: FunctionMerge):
'''A subclass of FunctionMerge that calls each of its component functions in parallel.

ConcurrentMerge takes a FunctionMerge object and upgrades it.
'''
if not isinstance(function_merge, FunctionMerge):
#This check is necessary because functools.wraps will copy FunctionMerge attributes to
#objects that are not FunctionMerges, so this init will succeed, then result in errors
#at call time.
raise exceptions.CompositionError(f'{type(self)} can only upgrade FunctionMerges')

super().__init__(
function_merge._merge_func,
function_merge._functions,
Expand All @@ -29,16 +36,25 @@ def __call__(self, *args, **kwargs):
'''We fork here, and execute each function in a child process before joining the results
with _merge_func
'''
arg_iter, func_iter = self._get_call_iterators(args)
result_q = Queue()
error_q = Queue()

#spawn a child for each function
children = []
for i, f in enumerate(self.functions):
for i, (arg, f) in enumerate(zip(arg_iter, func_iter)):
pid = os.fork()
if not pid:
#we are the child
self._process_and_die(i, f, result_q, error_q, kwargs, arg)
children.append(pid)

#iterate over any remaining functions for which we have no args
for j, f in enumerate(func_iter, i+1):
pid = os.fork()
if not pid:
#we are the child
self._process_and_die(i, f, result_q, error_q, args, kwargs)
self._process_and_die(j, f, result_q, error_q, kwargs)
children.append(pid)

#the parent waits for all children to complete
Expand All @@ -51,20 +67,23 @@ def __call__(self, *args, **kwargs):
except queue.Empty:
pass
else:
raise ConcurrentException('Caught exception in child process') from error
raise exceptions.ConcurrentException('Caught exception in child process') from error

result_q.put(None)
results = [r[1] for r in sorted(iter(result_q.get, None), key=itemgetter(0))]

return self._merge_func(*results)

@staticmethod
def _process_and_die(idx, func, result_q, error_q, args, kwargs):
def _process_and_die(idx, func, result_q, error_q, kwargs, arg=_no_value):
'''This function is only called by child processes. Call the given function with the given
args and kwargs, put the result in result_q, then die.
'''
try:
r = func(*args, **kwargs)
if arg is _no_value:
r = func(**kwargs)
else:
r = func(arg, **kwargs)
except Exception as e:
error_q.put(e)
else:
Expand Down
61 changes: 28 additions & 33 deletions metafunctions/core/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,13 @@ def __rtruediv__(self, other):

@binary_operation
def __matmul__(self, other):
return BroadcastChain.combine(self, other)
from metafunctions.util import star
return FunctionChain.combine(self, star(other))

@binary_operation
def __rmatmul__(self, other):
return BroadcastChain.combine(other, self)
from metafunctions.util import star
return FunctionChain.combine(other, star(self))


class FunctionChain(MetaFunction):
Expand Down Expand Up @@ -143,18 +145,6 @@ def combine(cls, *funcs):
return cls(*new_funcs)


class BroadcastChain(FunctionChain):
_function_join_str = '@'

@inject_call_state
def __call__(self, *args, **kwargs):
f_iter = iter(self._functions)
result = next(f_iter)(*args, **kwargs)
for f in f_iter:
result = f(*result, **kwargs)
return result


class FunctionMerge(MetaFunction):
_character_to_operator = {
'+': operator.add,
Expand Down Expand Up @@ -193,14 +183,7 @@ def __init__(self, merge_func:tp.Callable, functions:tuple, function_join_str=''

@inject_call_state
def __call__(self, *args, **kwargs):
args_iter = iter(args)
func_iter = iter(self.functions)
if len(args) > len(self.functions):
raise exceptions.CallError(
f'{self} takes 1 or <= {len(self.functions)} '
f'arguments, but {len(args)} were given')
if len(args) == 1:
args_iter = itertools.repeat(next(args_iter))
args_iter, func_iter = self._get_call_iterators(args)

results = []
# Note that args_iter appears first in the zip. This is because I know its len is <=
Expand All @@ -223,7 +206,8 @@ def combine(cls, merge_func: tp.Callable, *funcs, function_join_str=None):
them into a single FunctionMerge.

NOTE: combine does not check to make sure the merge_func can accept the new number of
arguments.
arguments, or that combining is appropriate for the operator. (e.g., it is inappropriate to
combine FunctionMerges where order of operations matter. 5 / 2 / 3 != 5 / (2 / 3))
'''
new_funcs = []
for f in funcs:
Expand All @@ -233,9 +217,25 @@ def combine(cls, merge_func: tp.Callable, *funcs, function_join_str=None):
new_funcs.append(f)
return cls(merge_func, tuple(new_funcs), function_join_str=function_join_str)

def _get_call_iterators(self, args):
'''Do length checking and return (`args_iter`, `call_iter`), iterables of arguments and
self.functions. Call them using zip. Note that len(args) can be less than
len(self.functions), and remaining functions should be called with no argument.
'''
args_iter = iter(args)
func_iter = iter(self.functions)
if len(args) > len(self.functions):
raise exceptions.CallError(
f'{self} takes 1 or <= {len(self.functions)} '
f'arguments, but {len(args)} were given')
if len(args) == 1:
args_iter = itertools.repeat(next(args_iter))

return args_iter, func_iter


class SimpleFunction(MetaFunction):
def __init__(self, function, print_location_in_traceback=True):
def __init__(self, function, name=None, print_location_in_traceback=True):
'''A MetaFunction-aware wrapper around a single function
The `bind` parameter causes us to pass a meta object as the first argument to our inherited function, but it is only respected if the wrapped function is not another metafunction.
'''
Expand All @@ -248,6 +248,7 @@ def __init__(self, function, print_location_in_traceback=True):
super().__init__()
self._function = function
self.add_location_to_traceback = print_location_in_traceback
self._name = name or getattr(function, '__name__', False) or str(function)

@inject_call_state
def __call__(self, *args, call_state, **kwargs):
Expand All @@ -263,12 +264,7 @@ def __repr__(self):
return f'{self.__class__.__name__}({self.functions[0]!r})'

def __str__(self):
try:
return self.__name__
except AttributeError:
# We're usually wrapping a function, but it's possible we're wrapping another
# metafunction
return str(self.functions[0])
return self._name

@property
def functions(self):
Expand All @@ -278,8 +274,7 @@ def _handle_exception(self, call_state, e):
if self.add_location_to_traceback:
from metafunctions.util import highlight_current_function
detailed_message = str(e)
if call_state:
detailed_message = f"{str(e)} \n\nOccured in the following function: {highlight_current_function(call_state)}"
detailed_message = f"{str(e)} \n\nOccured in the following function: {highlight_current_function(call_state)}"
raise type(e)(detailed_message).with_traceback(e.__traceback__)
raise

Expand All @@ -289,7 +284,7 @@ def __init__(self, value):
'''A simple Deferred Value. Returns `value` when called. Equivalent to lambda x: x.
'''
self._value = value
self.__name__ = repr(value)
self._name = repr(value)

def __call__(self, *args, **kwargs):
return self._value
Expand Down
66 changes: 66 additions & 0 deletions metafunctions/tests/test_at_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import unittest

from metafunctions.util import node
from metafunctions.tests.util import BaseTestCase


class TestUnit(BaseTestCase):
def test_broadcast(self):
@node
def f(*args):
return args

cmp = (a | b) @ f
self.assertEqual(cmp('_'), ('_', 'a', 'b'))

rcmp = (1, 2, 3) @ f
self.assertEqual(rcmp(), (1, 2, 3))

def test_str_repr(self):
c = a @ b
self.assertEqual(str(c), '(a | star(b))')
self.assertEqual(repr(c),
f"FunctionChain{(a, c.functions[1])}")

c = a | a @ b * 5 / 7 | b & b @ a
self.assertEqual(str(c), '(a | (((a | star(b)) * 5) / 7) | (b & (b | star(a))))')

c = (1, 2, 3) @ a
self.assertEqual(str(c), '((1, 2, 3) | star(a))')

def test_upgrade_merge(self):
aabbcc = (a & b & c) @ (a&b&c)
self.assertEqual(aabbcc('_'), ('_aa', '_bb', '_cc'))

@unittest.skip('TODO')
def test_recursive_upgrade(self):
aabbcc = (a & b & c) @ star(a+b+c)
self.assertEqual(aabbcc('_'), '_aa_bb_cc')

def test_str_repr(self):
cmp = a @ (b&c)
self.assertEqual(str(cmp), '(a | star(b & c))')
self.assertEqual(repr(cmp),
f'FunctionChain({a!r}, SimpleFunction({cmp._functions[1]._function}))')

@unittest.skip('Map')
def test_loop(self):
cmp = (b & c & 'stoke') @ star(a)
self.assertEqual(cmp('_'), ('_ba', '_ca', 'stokea'))

@unittest.skip('Map')
def test_loop_with_non_meta(self):
cmp = (b & c & 'stoke') @ star(len)
self.assertEqual(cmp('_'), (2, 2, 5))

@node
def a(x):
return x + 'a'
@node
def b(x):
return x + 'b'

@node
def c(x):
return x + 'c'

33 changes: 0 additions & 33 deletions metafunctions/tests/test_broadcast_chain.py

This file was deleted.

19 changes: 16 additions & 3 deletions metafunctions/tests/test_concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from metafunctions.util import highlight_current_function
from metafunctions.util import concurrent
from metafunctions.concurrent import ConcurrentMerge
from metafunctions.exceptions import ConcurrentException
from metafunctions.exceptions import ConcurrentException, CompositionError, CallError


class TestUnit(BaseTestCase):
Expand Down Expand Up @@ -70,6 +70,19 @@ def i(call_state, x):

# how do pretty tracebacks work in multiprocessing?

def test_call(self):
c = concurrent(a+b)
self.assertEqual(c('_'), '_a_b')
self.assertEqual(c('-', '_'), '-a_b')
with self.assertRaises(CallError):
c('_', '_', '_')

@node
def d():
return 'd'
abd = concurrent(a & b & d)
self.assertEqual(abd('-', '_'), ('-a', '_b', 'd'))

def test_concurrent(self):
c = concurrent(a + b)
self.assertIsInstance(c, ConcurrentMerge)
Expand All @@ -78,9 +91,9 @@ def test_concurrent(self):
def test_not_concurrent(self):
#can only upgrade FunctionMerges

with self.assertRaises(AttributeError):
with self.assertRaises(CompositionError):
concurrent(a)
with self.assertRaises(AttributeError):
with self.assertRaises(CompositionError):
concurrent(a | b)

def test_str_repr(self):
Expand Down
Loading