From cc32ac942e84302fa8a67ebd5adb11e8b623195b Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 29 Apr 2014 17:10:47 -0700 Subject: [PATCH] fix interrupted system call error in pyspark's RDD.pipe --- python/pyspark/rdd.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index a59778c72130e..3a1c56af5b221 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -537,8 +537,8 @@ def pipe(self, command, env={}): """ Return an RDD created by piping elements to a forked external process. - >>> sc.parallelize([1, 2, 3]).pipe('cat').collect() - ['1', '2', '3'] + >>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect() + ['1', '2', '', '3'] """ def func(iterator): pipe = Popen(shlex.split(command), env=env, stdin=PIPE, stdout=PIPE) @@ -547,7 +547,7 @@ def pipe_objs(out): out.write(str(obj).rstrip('\n') + '\n') out.close() Thread(target=pipe_objs, args=[pipe.stdin]).start() - return (x.rstrip('\n') for x in pipe.stdout) + return (x.rstrip('\n') for x in iter(pipe.stdout.readline, '')) return self.mapPartitions(func) def foreach(self, f):