-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-34205][SQL][SS] Add pipe to Dataset to enable Streaming Dataset pipe #31296
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2889,6 +2889,35 @@ class Dataset[T] private[sql]( | |
| flatMap(func)(encoder) | ||
| } | ||
|
|
||
| /** | ||
| * Return a new Dataset of string created by piping elements to a forked external process. | ||
| * The resulting Dataset is computed by executing the given process once per partition. | ||
| * All elements of each input partition are written to a process's stdin as lines of input | ||
| * separated by a newline. The resulting partition consists of the process's stdout output, with | ||
| * each line of stdout resulting in one element of the output partition. A process is invoked | ||
| * even for empty partitions. | ||
| * | ||
| * Note that for micro-batch streaming Dataset, the effect of pipe is only per micro-batch, not | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd kindly explain the case they need to be careful, like |
||
| * cross entire stream. If your external process does aggregation-like on inputs, e.g. `wc -l`, | ||
| * the aggregation is applied per a partition in micro-batch. You may want to aggregate these | ||
| * outputs after calling pipe to get global aggregation across partitions and also across | ||
| * micro-batches. | ||
| * | ||
| * @param command command to run in forked process. | ||
| * @param printElement Use this function to customize how to pipe elements. This function | ||
| * will be called with each Dataset element as the 1st parameter, and the | ||
| * print line function (like out.println()) as the 2nd parameter. | ||
| * @group typedrel | ||
| * @since 3.2.0 | ||
| */ | ||
| def pipe(command: String, printElement: (T, String => Unit) => Unit): Dataset[String] = { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see all examples are simply calling print function with converted string. Could we simply get serializer func like
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it is okay. Most cases there should be no difference. Only difference might be when we want to print out multi-lines per obj: def printElement(obj: T, printFunc: String => Unit) = {
printFunc(obj.a)
printFunc(obj.b)
...
}def serializeFn(obj: T): String = {
s"${obj.a}\n${obj.b}\n..."
}I'm fine with either one as they are working the same effect although taking different form. |
||
| implicit val stringEncoder = Encoders.STRING | ||
| withTypedPlan[String](PipeElements[T]( | ||
| command, | ||
| printElement.asInstanceOf[(Any, String => Unit) => Unit], | ||
| logicalPlan)) | ||
| } | ||
|
|
||
| /** | ||
| * Applies a function `f` to all rows. | ||
| * | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,7 +24,7 @@ import org.scalatest.Assertions._ | |
| import org.scalatest.exceptions.TestFailedException | ||
| import org.scalatest.prop.TableDrivenPropertyChecks._ | ||
|
|
||
| import org.apache.spark.{SparkException, TaskContext} | ||
| import org.apache.spark.{SparkException, TaskContext, TestUtils} | ||
| import org.apache.spark.sql.catalyst.{FooClassWithEnum, FooEnum, ScroogeLikeExample} | ||
| import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder} | ||
| import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi} | ||
|
|
@@ -2007,6 +2007,54 @@ class DatasetSuite extends QueryTest | |
|
|
||
| checkAnswer(withUDF, Row(Row(1), null, null) :: Row(Row(1), null, null) :: Nil) | ||
| } | ||
|
|
||
| test("SPARK-34205: Pipe Dataset") { | ||
| assume(TestUtils.testCommandAvailable("cat")) | ||
|
|
||
| val nums = spark.range(4) | ||
| val piped = nums.pipe("cat", (l, printFunc) => printFunc(l.toString)).toDF | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @viirya, what do you think about we expose an scala> val data = Seq((123, "first"), (4567, "second")).toDF("num", "word")
data: org.apache.spark.sql.DataFrame = [num: int, word: string]
scala> data.createOrReplaceTempView("t1")
scala> sql("select transform(*) using 'cat' from t1").show()
+----+------+
| key| value|
+----+------+
| 123| first|
|4567|second|
+----+------+scala> data.repartition(1).createOrReplaceTempView("t1")
scala> sql("select transform(*) using 'wc -l' as (echo) from t1").show()
+--------+
| echo|
+--------+
| 2|
+--------+Spark lately added the native support of script transformation, and I think it could do what you want.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this could address most of comments here such as #31296 (comment), being typed or non-standard stuff (as it follows Hive's feature) - at least we have one format to follow, etc.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be great if we can leverage the script transform #29414.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great point! I don't know how exhaustive Spark implements the Hive's transform feature, but the description in Hive's manual for transform looks pretty much powerful, and much beyond on what we plan to provide with pipe.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I considered Although I asked our customer and they only use primitive type Dataset for now. So untyped Dataset should be enough for the purpose. Another reason is although the query looks like "SELECT TRANSFORM(...) FROM ...", it is actually not an expression but implemented as an operator. If we have it as DSL expression, there will be some problems. Unlike Window function, it seems to me that we cannot have a query like "SELECT a, TRANSFORM(...), c FROM ..." or in DSL format like: df.select($"a", $"b", transform(...) ...)But for Window function we can do: df.select($"a", $"b", lead("key", 1).over(window) ...)That being said, in the end it is also
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see what @viirya said. I'd agree that transform looks to behave as an operation (not sure that is intended or not, but looks like at least for now) and transform also requires top level API to cover up like we did for If we are OK to add the top level API (again not yet decided so just a 2 cents) then which one? I'd rather say
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @AngersZhuuuu The top-level API here means the new API added in Dataset. @viirya Sure. I followed the comment " Two limitations here might need more discussion:
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks @HeartSaVioR. At least I am glad that the discussion can go forward no matter which one you prefer to add. Honestly I think transform is a weird stuff and it is only for to have pipe feature under Hive SQL syntax. I don't like the transform syntax which is inconvenient to use and verbose. It is not as flexible as pipe's custom print-out function. BTW, for typed dataset, because transform is for untyped, so it is bound to its serialization row format. In the early discussion there are some comments against that, although it is clarified later pipe doesn't suffer from this issue. If we still cannot get a consensus, maybe I should raise a discussion on dev mailing list to decide pipe or transform top-level API should be added. @xuanyuanking @AngersZhuuuu The SQL syntax of transform "SELECT TRANSFORM(...)" is pretty confusing. It looks like expression but actually it is an operator, and IMHO you cannot turn it to an expression. If you force it to be an expression, you will create some inconsistency and weird cases. transform is like pipe and their input/output relation is not 1:1 or N:1 but arbitrary.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmmm, just a clarify, we mean we can add an expression (or function?) like input can be a list of input col such as Then when execute we can make it just run as default format such as
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you think this expression can cover all kind of external process output? Transform and pipe have arbitrary relation between input and output. External process can output a line for each input line, can do aggregation-like output like
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: any reason for
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is just for |
||
|
|
||
| checkAnswer(piped, Row("0") :: Row("1") :: Row("2") :: Row("3") :: Nil) | ||
|
|
||
| val piped2 = nums.pipe("wc -l", (l, printFunc) => printFunc(l.toString)).toDF.collect() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Why |
||
| assert(piped2.size == 2) | ||
| assert(piped2(0).getString(0).trim == "2") | ||
| assert(piped2(1).getString(0).trim == "2") | ||
| } | ||
|
|
||
| test("SPARK-34205: Pipe DataFrame") { | ||
| assume(TestUtils.testCommandAvailable("cat")) | ||
|
|
||
| val data = Seq((123, "first"), (4567, "second")).toDF("num", "word") | ||
|
|
||
| def printElement(row: Row, printFunc: (String) => Unit): Unit = { | ||
| val line = s"num: ${row.getInt(0)}, word: ${row.getString(1)}" | ||
| printFunc.apply(line) | ||
| } | ||
| val piped = data.pipe("cat", printElement).toDF | ||
| checkAnswer(piped, Row("num: 123, word: first") :: Row("num: 4567, word: second") :: Nil) | ||
| } | ||
|
|
||
| test("SPARK-34205: Pipe complex type Dataset") { | ||
| assume(TestUtils.testCommandAvailable("cat")) | ||
|
|
||
| val data = Seq(DoubleData(123, "first"), DoubleData(4567, "second")).toDS | ||
|
|
||
| def printElement(data: DoubleData, printFunc: (String) => Unit): Unit = { | ||
| val line = s"num: ${data.id}, word: ${data.val1}" | ||
| printFunc.apply(line) | ||
| } | ||
| val piped = data.pipe("cat", printElement).toDF | ||
| checkAnswer(piped, Row("num: 123, word: first") :: Row("num: 4567, word: second") :: Nil) | ||
| } | ||
|
|
||
| test("SPARK-34205: pipe Dataset with empty partition") { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for making this sure! |
||
| val data = Seq(123, 4567).toDF("num").repartition(8, $"num") | ||
| val piped = data.pipe("wc -l", (row, printFunc) => printFunc(row.getInt(0).toString)) | ||
| assert(piped.count == 8) | ||
| val lineCounts = piped.map(_.trim.toInt).collect().toSet | ||
| assert(Set(0, 1, 1) == lineCounts) | ||
| } | ||
| } | ||
|
|
||
| case class Bar(a: Int) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1264,6 +1264,20 @@ class StreamSuite extends StreamTest { | |
| } | ||
| } | ||
| } | ||
|
|
||
| test("SPARK-34205: Pipe Streaming Dataset") { | ||
| assume(TestUtils.testCommandAvailable("cat")) | ||
|
|
||
| val inputData = MemoryStream[Int] | ||
| val piped = inputData.toDS() | ||
| .pipe("cat", (n, printFunc) => printFunc(n.toString)).toDF | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Why |
||
|
|
||
| testStream(piped)( | ||
| AddData(inputData, 1, 2, 3), | ||
| CheckAnswer(Row("1"), Row("2"), Row("3")), | ||
| AddData(inputData, 4), | ||
| CheckNewAnswer(Row("4"))) | ||
| } | ||
| } | ||
|
|
||
| abstract class FakeSource extends StreamSourceProvider { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.