From d4f945786c1469690dc92f02c98532f4d023ac3c Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 22 Jan 2021 00:54:18 -0800 Subject: [PATCH 1/5] Add pipe to Dataset. --- .../scala/org/apache/spark/rdd/PipedRDD.scala | 2 +- .../sql/catalyst/plans/logical/object.scala | 28 ++++++++++++++++- .../scala/org/apache/spark/sql/Dataset.scala | 18 +++++++++++ .../spark/sql/execution/SparkStrategies.scala | 2 ++ .../apache/spark/sql/execution/objects.scala | 31 ++++++++++++++++++- .../org/apache/spark/sql/DatasetSuite.scala | 16 +++++++++- .../spark/sql/streaming/StreamSuite.scala | 14 +++++++++ 7 files changed, 107 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala index 5dd8cb8440be6..24a43e4d4dddb 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala @@ -230,7 +230,7 @@ private[spark] class PipedRDD[T: ClassTag]( } } -private object PipedRDD { +object PipedRDD { // Split a string into words using a standard StringTokenizer def tokenize(command: String): Seq[String] = { val buf = new ArrayBuffer[String] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala index d383532cbd3d3..4173f49ee2fba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.api.java.function.FilterFunction import org.apache.spark.broadcast.Broadcast -import org.apache.spark.sql.{Encoder, Row} +import org.apache.spark.sql.{Encoder, Encoders, Row} import org.apache.spark.sql.catalyst.analysis.UnresolvedDeserializer import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ @@ -585,3 +585,29 @@ case class CoGroup( outputObjAttr: Attribute, left: LogicalPlan, right: LogicalPlan) extends BinaryNode with ObjectProducer + +object PipeElements { + def apply[T : Encoder]( + command: String, + child: LogicalPlan): LogicalPlan = { + val deserialized = CatalystSerde.deserialize[T](child) + implicit val encoder = Encoders.STRING + val piped = PipeElements( + implicitly[Encoder[T]].clsTag.runtimeClass, + implicitly[Encoder[T]].schema, + CatalystSerde.generateObjAttr[String], + command, + deserialized) + CatalystSerde.serialize[String](piped) + } +} + +/** + * A relation produced by piping elements to a forked external process. + */ +case class PipeElements[T]( + argumentClass: Class[_], + argumentSchema: StructType, + outputObjAttr: Attribute, + command: String, + child: LogicalPlan) extends ObjectConsumer with ObjectProducer diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index f9590797434a1..63b267e95dc08 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2889,6 +2889,24 @@ class Dataset[T] private[sql]( flatMap(func)(encoder) } + /** + * Return a new Dataset of string created by piping elements to a forked external process. + * The resulting Dataset is computed by executing the given process once per partition. + * All elements of each input partition are written to a process's stdin as lines of input + * separated by a newline. The resulting partition consists of the process's stdout output, with + * each line of stdout resulting in one element of the output partition. A process is invoked + * even for empty partitions. + * + * @param command command to run in forked process. + * + * @group typedrel + * @since 3.2.0 + */ + def pipe(command: String): Dataset[String] = { + implicit val stringEncoder = Encoders.STRING + withTypedPlan[String](PipeElements[T](command, logicalPlan)) + } + /** * Applies a function `f` to all rows. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index a8d788f59d271..b0464274de8aa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -666,6 +666,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.python.MapInPandasExec(func, output, planLater(child)) :: Nil case logical.MapElements(f, _, _, objAttr, child) => execution.MapElementsExec(f, objAttr, planLater(child)) :: Nil + case logical.PipeElements(_, _, objAttr, command, child) => + execution.PipeElementsExec(objAttr, command, planLater(child)) :: Nil case logical.AppendColumns(f, _, _, in, out, child) => execution.AppendColumnsExec(f, in, out, planLater(child)) :: Nil case logical.AppendColumnsWithObject(f, childSer, newSer, child) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala index c08db132c946f..da09f511a1e3a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala @@ -25,7 +25,7 @@ import scala.language.existentials import org.apache.spark.api.java.function.MapFunction import org.apache.spark.api.r._ import org.apache.spark.broadcast.Broadcast -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{PipedRDD, RDD} import org.apache.spark.sql.Row import org.apache.spark.sql.api.r.SQLUtils._ import org.apache.spark.sql.catalyst.InternalRow @@ -624,3 +624,32 @@ case class CoGroupExec( } } } + +/** + * Piping elements to a forked external process. + * The output of its child must be a single-field row containing the input object. + */ +case class PipeElementsExec( + outputObjAttr: Attribute, + command: String, + child: SparkPlan) + extends ObjectConsumerExec with ObjectProducerExec { + + override protected def doExecute(): RDD[InternalRow] = { + val getObject = ObjectOperator.unwrapObjectFromRow(child.output.head.dataType) + val printRDDElement: (InternalRow, String => Unit) => Unit = (row, printFunc) => { + printFunc(getObject(row).toString) + } + + child.execute() + .pipe(command = PipedRDD.tokenize(command), printRDDElement = printRDDElement) + .mapPartitionsInternal { iter => + val outputObject = ObjectOperator.wrapObjectToRow(outputObjectType) + iter.map(ele => outputObject(ele)) + } + } + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + override def outputPartitioning: Partitioning = child.outputPartitioning +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 2ec4c6918a248..c3ce1a71a81dd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -24,7 +24,7 @@ import org.scalatest.Assertions._ import org.scalatest.exceptions.TestFailedException import org.scalatest.prop.TableDrivenPropertyChecks._ -import org.apache.spark.{SparkException, TaskContext} +import org.apache.spark.{SparkException, TaskContext, TestUtils} import org.apache.spark.sql.catalyst.{FooClassWithEnum, FooEnum, ScroogeLikeExample} import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder} import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi} @@ -2007,6 +2007,20 @@ class DatasetSuite extends QueryTest checkAnswer(withUDF, Row(Row(1), null, null) :: Row(Row(1), null, null) :: Nil) } + + test("Pipe Dataset") { + assume(TestUtils.testCommandAvailable("cat")) + + val nums = spark.range(4) + val piped = nums.pipe("cat").toDF + + checkAnswer(piped, Row("0") :: Row("1") :: Row("2") :: Row("3") :: Nil) + + val piped2 = nums.pipe("wc -l").toDF.collect() + assert(piped2.size == 2) + assert(piped2(0).getString(0).trim == "2") + assert(piped2(1).getString(0).trim == "2") + } } case class Bar(a: Int) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 440fe997ae133..37d4bd8b8edc1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -1264,6 +1264,20 @@ class StreamSuite extends StreamTest { } } } + + test("Pipe Streaming Dataset") { + assume(TestUtils.testCommandAvailable("cat")) + + val inputData = MemoryStream[Int] + val piped = inputData.toDS() + .pipe("cat").toDF + + testStream(piped)( + AddData(inputData, 1, 2, 3), + CheckAnswer(Row("1"), Row("2"), Row("3")), + AddData(inputData, 4), + CheckAnswer(Row("1"), Row("2"), Row("3"), Row("4"))) + } } abstract class FakeSource extends StreamSourceProvider { From c6e57c1d93ac8839fa6261a5eb4e710507440b0e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 22 Jan 2021 13:57:58 -0800 Subject: [PATCH 2/5] Address comments. --- .../src/main/scala/org/apache/spark/rdd/PipedRDD.scala | 2 +- .../test/scala/org/apache/spark/sql/DatasetSuite.scala | 10 +++++++++- .../org/apache/spark/sql/streaming/StreamSuite.scala | 2 +- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala index 24a43e4d4dddb..df03a2174dd9d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala @@ -230,7 +230,7 @@ private[spark] class PipedRDD[T: ClassTag]( } } -object PipedRDD { +private[spark] object PipedRDD { // Split a string into words using a standard StringTokenizer def tokenize(command: String): Seq[String] = { val buf = new ArrayBuffer[String] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index c3ce1a71a81dd..cef05924bce74 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -2008,7 +2008,7 @@ class DatasetSuite extends QueryTest checkAnswer(withUDF, Row(Row(1), null, null) :: Row(Row(1), null, null) :: Nil) } - test("Pipe Dataset") { + test("SPARK-34205: Pipe Dataset") { assume(TestUtils.testCommandAvailable("cat")) val nums = spark.range(4) @@ -2021,6 +2021,14 @@ class DatasetSuite extends QueryTest assert(piped2(0).getString(0).trim == "2") assert(piped2(1).getString(0).trim == "2") } + + test("SPARK-34205: pipe Dataset with empty partition") { + val data = Seq(123, 4567).toDF("num").repartition(8, $"num") + val piped = data.pipe("wc -l") + assert(piped.count == 8) + val lineCounts = piped.map(_.trim.toInt).collect().toSet + assert(Set(0, 1, 1) == lineCounts) + } } case class Bar(a: Int) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 37d4bd8b8edc1..55290dd7098ec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -1265,7 +1265,7 @@ class StreamSuite extends StreamTest { } } - test("Pipe Streaming Dataset") { + test("SPARK-34205: Pipe Streaming Dataset") { assume(TestUtils.testCommandAvailable("cat")) val inputData = MemoryStream[Int] From ff0da84bd3e194f8b2c1bd1fb8edff7539ff92ae Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 22 Jan 2021 14:33:09 -0800 Subject: [PATCH 3/5] For review comment. --- .../src/main/scala/org/apache/spark/sql/execution/objects.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala index da09f511a1e3a..d1d46366494e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala @@ -645,7 +645,7 @@ case class PipeElementsExec( .pipe(command = PipedRDD.tokenize(command), printRDDElement = printRDDElement) .mapPartitionsInternal { iter => val outputObject = ObjectOperator.wrapObjectToRow(outputObjectType) - iter.map(ele => outputObject(ele)) + iter.map(e => outputObject(e)) } } From 45bd4a7d4f65f9db98e8f487fa0c3a0aa2195e15 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 24 Jan 2021 22:29:44 -0800 Subject: [PATCH 4/5] printElement is required parameter. --- .../sql/catalyst/plans/logical/object.scala | 3 ++ .../scala/org/apache/spark/sql/Dataset.scala | 14 ++++++-- .../spark/sql/execution/SparkStrategies.scala | 4 +-- .../apache/spark/sql/execution/objects.scala | 4 ++- .../org/apache/spark/sql/DatasetSuite.scala | 32 +++++++++++++++++-- .../spark/sql/streaming/StreamSuite.scala | 2 +- 6 files changed, 49 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala index 4173f49ee2fba..56f503624bfb6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala @@ -589,6 +589,7 @@ case class CoGroup( object PipeElements { def apply[T : Encoder]( command: String, + printElement: (Any, String => Unit) => Unit, child: LogicalPlan): LogicalPlan = { val deserialized = CatalystSerde.deserialize[T](child) implicit val encoder = Encoders.STRING @@ -597,6 +598,7 @@ object PipeElements { implicitly[Encoder[T]].schema, CatalystSerde.generateObjAttr[String], command, + printElement, deserialized) CatalystSerde.serialize[String](piped) } @@ -610,4 +612,5 @@ case class PipeElements[T]( argumentSchema: StructType, outputObjAttr: Attribute, command: String, + printElement: (Any, String => Unit) => Unit, child: LogicalPlan) extends ObjectConsumer with ObjectProducer diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 63b267e95dc08..683095b61978f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2897,14 +2897,22 @@ class Dataset[T] private[sql]( * each line of stdout resulting in one element of the output partition. A process is invoked * even for empty partitions. * - * @param command command to run in forked process. + * Note that for micro-batch streaming Dataset, the effect of pipe is only per micro-batch, not + * cross entire stream. * + * @param command command to run in forked process. + * @param printElement Use this function to customize how to pipe elements. This function + * will be called with each Dataset element as the 1st parameter, and the + * print line function (like out.println()) as the 2nd parameter. * @group typedrel * @since 3.2.0 */ - def pipe(command: String): Dataset[String] = { + def pipe(command: String, printElement: (T, String => Unit) => Unit): Dataset[String] = { implicit val stringEncoder = Encoders.STRING - withTypedPlan[String](PipeElements[T](command, logicalPlan)) + withTypedPlan[String](PipeElements[T]( + command, + printElement.asInstanceOf[(Any, String => Unit) => Unit], + logicalPlan)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index b0464274de8aa..d71b8ad715a81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -666,8 +666,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.python.MapInPandasExec(func, output, planLater(child)) :: Nil case logical.MapElements(f, _, _, objAttr, child) => execution.MapElementsExec(f, objAttr, planLater(child)) :: Nil - case logical.PipeElements(_, _, objAttr, command, child) => - execution.PipeElementsExec(objAttr, command, planLater(child)) :: Nil + case logical.PipeElements(_, _, objAttr, command, printElement, child) => + execution.PipeElementsExec(objAttr, command, printElement, planLater(child)) :: Nil case logical.AppendColumns(f, _, _, in, out, child) => execution.AppendColumnsExec(f, in, out, planLater(child)) :: Nil case logical.AppendColumnsWithObject(f, childSer, newSer, child) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala index d1d46366494e0..fa208ac958b90 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala @@ -632,13 +632,15 @@ case class CoGroupExec( case class PipeElementsExec( outputObjAttr: Attribute, command: String, + printElement: (Any, String => Unit) => Unit, child: SparkPlan) extends ObjectConsumerExec with ObjectProducerExec { override protected def doExecute(): RDD[InternalRow] = { val getObject = ObjectOperator.unwrapObjectFromRow(child.output.head.dataType) val printRDDElement: (InternalRow, String => Unit) => Unit = (row, printFunc) => { - printFunc(getObject(row).toString) + val obj = getObject(row) + printElement(obj, printFunc) } child.execute() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index cef05924bce74..ce021b5e15470 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -2012,19 +2012,45 @@ class DatasetSuite extends QueryTest assume(TestUtils.testCommandAvailable("cat")) val nums = spark.range(4) - val piped = nums.pipe("cat").toDF + val piped = nums.pipe("cat", (l, printFunc) => printFunc(l.toString)).toDF checkAnswer(piped, Row("0") :: Row("1") :: Row("2") :: Row("3") :: Nil) - val piped2 = nums.pipe("wc -l").toDF.collect() + val piped2 = nums.pipe("wc -l", (l, printFunc) => printFunc(l.toString)).toDF.collect() assert(piped2.size == 2) assert(piped2(0).getString(0).trim == "2") assert(piped2(1).getString(0).trim == "2") } + test("SPARK-34205: Pipe DataFrame") { + assume(TestUtils.testCommandAvailable("cat")) + + val data = Seq((123, "first"), (4567, "second")).toDF("num", "word") + + def printElement(row: Row, printFunc: (String) => Unit): Unit = { + val line = s"num: ${row.getInt(0)}, word: ${row.getString(1)}" + printFunc.apply(line) + } + val piped = data.pipe("cat", printElement).toDF + checkAnswer(piped, Row("num: 123, word: first") :: Row("num: 4567, word: second") :: Nil) + } + + test("SPARK-34205: Pipe complex type Dataset") { + assume(TestUtils.testCommandAvailable("cat")) + + val data = Seq(DoubleData(123, "first"), DoubleData(4567, "second")).toDS + + def printElement(data: DoubleData, printFunc: (String) => Unit): Unit = { + val line = s"num: ${data.id}, word: ${data.val1}" + printFunc.apply(line) + } + val piped = data.pipe("cat", printElement).toDF + checkAnswer(piped, Row("num: 123, word: first") :: Row("num: 4567, word: second") :: Nil) + } + test("SPARK-34205: pipe Dataset with empty partition") { val data = Seq(123, 4567).toDF("num").repartition(8, $"num") - val piped = data.pipe("wc -l") + val piped = data.pipe("wc -l", (row, printFunc) => printFunc(row.getInt(0).toString)) assert(piped.count == 8) val lineCounts = piped.map(_.trim.toInt).collect().toSet assert(Set(0, 1, 1) == lineCounts) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 55290dd7098ec..66dfe2e7c1ddb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -1270,7 +1270,7 @@ class StreamSuite extends StreamTest { val inputData = MemoryStream[Int] val piped = inputData.toDS() - .pipe("cat").toDF + .pipe("cat", (n, printFunc) => printFunc(n.toString)).toDF testStream(piped)( AddData(inputData, 1, 2, 3), From ac7460b0431e7012f17c3ac293226bb4f428bd15 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 26 Jan 2021 00:18:28 -0800 Subject: [PATCH 5/5] Address comments. --- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 5 ++++- .../scala/org/apache/spark/sql/streaming/StreamSuite.scala | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 683095b61978f..06b23bcf64e7a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2898,7 +2898,10 @@ class Dataset[T] private[sql]( * even for empty partitions. * * Note that for micro-batch streaming Dataset, the effect of pipe is only per micro-batch, not - * cross entire stream. + * cross entire stream. If your external process does aggregation-like on inputs, e.g. `wc -l`, + * the aggregation is applied per a partition in micro-batch. You may want to aggregate these + * outputs after calling pipe to get global aggregation across partitions and also across + * micro-batches. * * @param command command to run in forked process. * @param printElement Use this function to customize how to pipe elements. This function diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 66dfe2e7c1ddb..4e6b652dcc58d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -1276,7 +1276,7 @@ class StreamSuite extends StreamTest { AddData(inputData, 1, 2, 3), CheckAnswer(Row("1"), Row("2"), Row("3")), AddData(inputData, 4), - CheckAnswer(Row("1"), Row("2"), Row("3"), Row("4"))) + CheckNewAnswer(Row("4"))) } }