From 6dc2c9e18072756d51c4318b9d8a3e99fd4e2c4d Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Sat, 5 Nov 2016 21:27:26 +0800 Subject: [PATCH 1/2] MemorySink statistics for joinintg --- .../sql/execution/streaming/memory.scala | 6 ++++- .../spark/sql/streaming/MemorySinkSuite.scala | 25 +++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 48d9791faf1e9..613c7ccdd226a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -27,7 +27,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.encoders.encoderFor import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.LeafNode +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils @@ -212,4 +212,8 @@ class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink wi */ case class MemoryPlan(sink: MemorySink, output: Seq[Attribute]) extends LeafNode { def this(sink: MemorySink) = this(sink, sink.schema.toAttributes) + + private val sizePerRow = sink.schema.toAttributes.map(_.dataType.defaultSize).sum + + override def statistics: Statistics = Statistics(sizePerRow * sink.allData.size) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala index 310d75630272b..ec8d2c2009e68 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala @@ -187,6 +187,31 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter { query.stop() } + test("MemoryPlan statistics for joining") { + val input = MemoryStream[Int] + val query = input.toDF() + .writeStream + .format("memory") + .queryName("memStream") + .start() + + val memStream = spark.table("memStream").as[Int] + + input.addData(1) + query.processAllAvailable() + checkDatasetUnorderly( + memStream.crossJoin(memStream.withColumnRenamed("value", "value2")).as[(Int, Int)], + (1, 1)) + + input.addData(2) + query.processAllAvailable() + checkDatasetUnorderly( + memStream.crossJoin(memStream.withColumnRenamed("value", "value2")).as[(Int, Int)], + (1, 1), (1, 2), (2, 1), (2, 2)) + + query.stop() + } + ignore("stress test") { // Ignore the stress test as it takes several minutes to run (0 until 1000).foreach { _ => From a7a6f6bbe3d7b4245a1a568cd16332ce6af61e73 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Sun, 6 Nov 2016 10:59:53 +0800 Subject: [PATCH 2/2] Address comments --- .../spark/sql/streaming/MemorySinkSuite.scala | 31 +++++++------------ 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala index ec8d2c2009e68..4e9fba9dbaa1e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala @@ -187,29 +187,20 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter { query.stop() } - test("MemoryPlan statistics for joining") { - val input = MemoryStream[Int] - val query = input.toDF() - .writeStream - .format("memory") - .queryName("memStream") - .start() - - val memStream = spark.table("memStream").as[Int] + test("MemoryPlan statistics") { + implicit val schema = new StructType().add(new StructField("value", IntegerType)) + val sink = new MemorySink(schema, InternalOutputModes.Append) + val plan = new MemoryPlan(sink) - input.addData(1) - query.processAllAvailable() - checkDatasetUnorderly( - memStream.crossJoin(memStream.withColumnRenamed("value", "value2")).as[(Int, Int)], - (1, 1)) + // Before adding data, check output + checkAnswer(sink.allData, Seq.empty) + assert(plan.statistics.sizeInBytes === 0) - input.addData(2) - query.processAllAvailable() - checkDatasetUnorderly( - memStream.crossJoin(memStream.withColumnRenamed("value", "value2")).as[(Int, Int)], - (1, 1), (1, 2), (2, 1), (2, 2)) + sink.addBatch(0, 1 to 3) + assert(plan.statistics.sizeInBytes === 12) - query.stop() + sink.addBatch(1, 4 to 6) + assert(plan.statistics.sizeInBytes === 24) } ignore("stress test") {