Skip to content

Conversation

@lw-lin
Copy link
Contributor

@lw-lin lw-lin commented Nov 5, 2016

What changes were proposed in this pull request?

Right now, there is no way to join the output of a memory sink with any table:

UnsupportedOperationException: LeafNode MemoryPlan must implement statistics

This patch adds statistics to MemorySink, making joining snapshots of memory streams with tables possible.

How was this patch tested?

Added a test case.

@lw-lin lw-lin changed the title [SPARK-18261][SS] Add statistics to MemorySink for joining [SPARK-18261][Structured Streaming] Add statistics to MemorySink for joining Nov 5, 2016
@SparkQA
Copy link

SparkQA commented Nov 5, 2016

Test build #68209 has finished for PR 15786 at commit 6dc2c9e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

memStream.crossJoin(memStream.withColumnRenamed("value", "value2")).as[(Int, Int)],
(1, 1), (1, 2), (2, 1), (2, 2))

query.stop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you put this in a try-finally block please?

query.stop()
}

test("MemoryPlan statistics for joining") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would also be nicer to just implement a test on the MemoryPlan itself, without needing to start a stream, purely on statistics calculation

@SparkQA
Copy link

SparkQA commented Nov 6, 2016

Test build #68221 has finished for PR 15786 at commit a7a6f6b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@lw-lin
Copy link
Contributor Author

lw-lin commented Nov 7, 2016

comments addressed; @brkyvz would you take another look


private val sizePerRow = sink.schema.toAttributes.map(_.dataType.defaultSize).sum

override def statistics: Statistics = Statistics(sizePerRow * sink.allData.size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

everywhere else, this is defined as a lazy val. Can you make it such that:

override lazy val statistics: Statistics  = {
  val sizePerRow = sink.schema.toAttributes.map(_.dataType.defaultSize).sum
  Statistics(sizeInBytes = sizePerRow * sink.allData.size)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @brkyvz. I'm afraid it should not be lazy val as the statistics should vary from batch to batch -- sink.allData.sizeshould vary from batch to batch. sizePerRow, however, does not vary from batch to batch, thus it's private val as it is now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yeah! you're right! good catch!

}

test("MemoryPlan statistics") {
implicit val schema = new StructType().add(new StructField("value", IntegerType))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks! This will be much faster than a test where we actually join DataFrames.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

happy to do this

@brkyvz
Copy link
Contributor

brkyvz commented Nov 7, 2016

Thanks @lw-lin ! Left one last comment. @davies Can you also please take a look? I think you implemented most of the statistics code

@lw-lin
Copy link
Contributor Author

lw-lin commented Nov 8, 2016

@zsxwing could you also take a look

@rxin
Copy link
Contributor

rxin commented Nov 8, 2016

LGTM - merging in master/branch-2.1.

asfgit pushed a commit that referenced this pull request Nov 8, 2016
…joining

## What changes were proposed in this pull request?

Right now, there is no way to join the output of a memory sink with any table:

> UnsupportedOperationException: LeafNode MemoryPlan must implement statistics

This patch adds statistics to MemorySink, making joining snapshots of memory streams with tables possible.

## How was this patch tested?

Added a test case.

Author: Liwei Lin <[email protected]>

Closes #15786 from lw-lin/memory-sink-stat.

(cherry picked from commit c1a0c66)
Signed-off-by: Reynold Xin <[email protected]>
@asfgit asfgit closed this in c1a0c66 Nov 8, 2016
@lw-lin lw-lin deleted the memory-sink-stat branch November 8, 2016 02:29
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…joining

## What changes were proposed in this pull request?

Right now, there is no way to join the output of a memory sink with any table:

> UnsupportedOperationException: LeafNode MemoryPlan must implement statistics

This patch adds statistics to MemorySink, making joining snapshots of memory streams with tables possible.

## How was this patch tested?

Added a test case.

Author: Liwei Lin <[email protected]>

Closes apache#15786 from lw-lin/memory-sink-stat.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants