Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,9 @@ object InMemoryRelation {
ser.get
}

/* Visible for testing */
private[spark] def clearSerializer(): Unit = synchronized { ser = None }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, @dongjoon-hyun. What about just fixing CachedBatchSerializerSuite not to extend SharedSparkSessionBase? For example, like ExecutorSideSQLConfSuite or SparkSessionExtensionSuite. I think that would be simpler and a more isolated fix.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya. I thought that way first, but this is more general way because SPARK-32274 make SQL cache serialization pluggable. We may have another test suite in the future.

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Aug 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the test resource clean-up had better be centralized at SharedSparkSession in order to not to forget.

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Aug 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I'm still open to your idea. Let's see the original author and committer opinion. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it really help? InMemoryRelation.ser doesn't belong to any session and is global.

I think a simpler fix is to clear it in CachedBatchSerializerSuite.beforeAll and afterAll.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this question, yes. The root cause is that InMemoryRelation.ser is a kind of singleton. Since the new configuration is static conf, this will match with the semantic of InMemoryRelation.ser. So, the problem is the testing.

Does it really help? InMemoryRelation.ser doesn't belong to any session and is global.

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Aug 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me create another PR for @HyukjinKwon or @cloud-fan idea to compare with this.

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Aug 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all, @HyukjinKwon 's idea is unable to remove the failure because InMemoryRelation.ser is a singleton.

What about just fixing CachedBatchSerializerSuite not to extend SharedSparkSessionBase?

I'm moving to @cloud-fan 's proposal.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dongjoon-hyun!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like @cloud-fan proposal as it will make testing easier as well.


def convertToColumnarIfPossible(plan: SparkPlan): SparkPlan = plan match {
case gen: WholeStageCodegenExec => gen.child match {
case c2r: ColumnarToRowTransition => c2r.child match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.{DebugFilesystem, SparkConf}
import org.apache.spark.internal.config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
import org.apache.spark.sql.execution.columnar.InMemoryRelation.clearSerializer
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}

trait SharedSparkSession extends SQLTestUtils with SharedSparkSessionBase {
Expand Down Expand Up @@ -63,6 +64,7 @@ trait SharedSparkSessionBase
with Eventually { self: Suite =>

protected def sparkConf = {
clearSerializer()
val conf = new SparkConf()
.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
.set(UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true)
Expand Down