Skip to content

Conversation

@cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

Currently, RowDataSourceScanExec and FileSourceScanExec rely on a "metadata" string map to implement equality comparison, since the RDDs they depend on cannot be directly compared. This has resulted in a number of correctness bugs around exchange reuse, e.g. SPARK-17673 and SPARK-16818.

To make these comparisons less brittle, we should refactor these classes to compare constructor parameters directly instead of relying on the metadata map.

This PR refactors RowDataSourceScanExec, FileSourceScanExec will be fixed in the follow-up PR.

How was this patch tested?

existing tests

case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] =>
!SparkSession.getActiveSession.get.sessionState.conf.getConf(
SQLConf.PARQUET_VECTORIZED_READER_ENABLED)
case _: HadoopFsRelation => true
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HadoopFsRelation never goes into RowDataSourceScanExec

// Match original case of attributes.
.map(relation.attributeMap)
// Don't request columns that are only referenced by pushed filters.
.filterNot(handledSet.contains)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this branch, filterSet is a subset of projectSet, so it's a no-op.

@cloud-fan
Copy link
Contributor Author

cc @ericl @gatorsmile

@SparkQA
Copy link

SparkQA commented Jul 11, 2017

Test build #79523 has finished for PR 18600 at commit 5008eb6.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 11, 2017

Test build #79527 has finished for PR 18600 at commit 5008eb6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

override val tableIdentifier: Option[TableIdentifier])
extends DataSourceScanExec {

def output: Seq[Attribute] = requiredColumnsIndex.map(fullOutput)
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jul 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def output: Seq[Attribute] -> override def output: Seq[Attribute]?

scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation, UnknownPartitioning(0), metadata,
relation.catalogTable.map(_.identifier))
relation.relation, relation.catalogTable.map(_.identifier))
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jul 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit; can we make this into two lines during refactoring?

relation.relation,
relation.catalogTable.map(_.identifier))

scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation, UnknownPartitioning(0), metadata,
relation.catalogTable.map(_.identifier))
relation.relation, relation.catalogTable.map(_.identifier))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

output: Seq[Attribute],
fullOutput: Seq[Attribute],
requiredColumnsIndex: Seq[Int],
filters: Set[Filter],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Start it in this PR?

rdd: RDD[InternalRow],
@transient relation: BaseRelation,
override val outputPartitioning: Partitioning,
override val metadata: Map[String, String],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uh... This is not being used after our previous refactoring.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metadata is still needed. It is being used here.


// These metadata values make scan plans uniquely identifiable for equality checking.
// TODO(SPARK-17701) using strings for equality checking is brittle
val metadata: Map[String, String] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to keep it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The target of this cleanup PR is to remove the metadata...

@gatorsmile
Copy link
Member

LGTM except the above three comments.


/**
* @return Metadata that describes more details of this SparkPlan.
*/
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We introduced metadata to work around the equality issue of data source scan. Now it's fixed, and we can remove it.

val nodeName: String,
val simpleString: String,
val children: Seq[SparkPlanInfo],
val metadata: Map[String, String],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a developer api and I don't think users can do anything useful with metadata because it was just a hack. should be safe to remove it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that @LantaoJin brings back this for event loging at #22353 .

@SparkQA
Copy link

SparkQA commented Jul 12, 2017

Test build #79552 has finished for PR 18600 at commit 2dc4ce1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

LGTM

@gatorsmile
Copy link
Member

Thanks! Merging to master.

@asfgit asfgit closed this in 780586a Jul 12, 2017
asfgit pushed a commit that referenced this pull request Sep 15, 2017
## What changes were proposed in this pull request?

In #18600 we removed the `metadata` field from `SparkPlanInfo`. This causes a problem when we replay event logs that are generated by older Spark versions.

## How was this patch tested?

a regression test.

Author: Wenchen Fan <[email protected]>

Closes #19237 from cloud-fan/event.
asfgit pushed a commit that referenced this pull request Sep 13, 2018
…tion like file path to event log

## What changes were proposed in this pull request?

Field metadata removed from SparkPlanInfo in #18600 . Corresponding, many meta data was also removed from event SparkListenerSQLExecutionStart in Spark event log. If we want to analyze event log to get all input paths, we couldn't get them. Instead, simpleString of SparkPlanInfo JSON only display 100 characters, it won't help.

Before 2.3, the fragment of SparkListenerSQLExecutionStart in event log looks like below (It contains the metadata field which has the intact information):
>{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart", Location: InMemoryFileIndex[hdfs://cluster1/sys/edw/test1/test2/test3/test4..., "metadata": {"Location": "InMemoryFileIndex[hdfs://cluster1/sys/edw/test1/test2/test3/test4/test5/snapshot/dt=20180904]","ReadSchema":"struct<snpsht_start_dt:date,snpsht_end_dt:date,am_ntlogin_name:string,am_first_name:string,am_last_name:string,isg_name:string,CRE_DATE:date,CRE_USER:string,UPD_DATE:timestamp,UPD_USER:string>"}

After #18600, metadata field was removed.
>{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart", Location: InMemoryFileIndex[hdfs://cluster1/sys/edw/test1/test2/test3/test4...,

So I add this field back to SparkPlanInfo class. Then it will log out the meta data to event log. Intact information in event log is very useful for offline job analysis.

## How was this patch tested?
Unit test

Closes #22353 from LantaoJin/SPARK-25357.

Authored-by: LantaoJin <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 6dc5921)
Signed-off-by: Wenchen Fan <[email protected]>
asfgit pushed a commit that referenced this pull request Sep 13, 2018
…tion like file path to event log

## What changes were proposed in this pull request?

Field metadata removed from SparkPlanInfo in #18600 . Corresponding, many meta data was also removed from event SparkListenerSQLExecutionStart in Spark event log. If we want to analyze event log to get all input paths, we couldn't get them. Instead, simpleString of SparkPlanInfo JSON only display 100 characters, it won't help.

Before 2.3, the fragment of SparkListenerSQLExecutionStart in event log looks like below (It contains the metadata field which has the intact information):
>{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart", Location: InMemoryFileIndex[hdfs://cluster1/sys/edw/test1/test2/test3/test4..., "metadata": {"Location": "InMemoryFileIndex[hdfs://cluster1/sys/edw/test1/test2/test3/test4/test5/snapshot/dt=20180904]","ReadSchema":"struct<snpsht_start_dt:date,snpsht_end_dt:date,am_ntlogin_name:string,am_first_name:string,am_last_name:string,isg_name:string,CRE_DATE:date,CRE_USER:string,UPD_DATE:timestamp,UPD_USER:string>"}

After #18600, metadata field was removed.
>{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart", Location: InMemoryFileIndex[hdfs://cluster1/sys/edw/test1/test2/test3/test4...,

So I add this field back to SparkPlanInfo class. Then it will log out the meta data to event log. Intact information in event log is very useful for offline job analysis.

## How was this patch tested?
Unit test

Closes #22353 from LantaoJin/SPARK-25357.

Authored-by: LantaoJin <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 6dc5921)
Signed-off-by: Wenchen Fan <[email protected]>
fjh100456 pushed a commit to fjh100456/spark that referenced this pull request Sep 13, 2018
…tion like file path to event log

## What changes were proposed in this pull request?

Field metadata removed from SparkPlanInfo in apache#18600 . Corresponding, many meta data was also removed from event SparkListenerSQLExecutionStart in Spark event log. If we want to analyze event log to get all input paths, we couldn't get them. Instead, simpleString of SparkPlanInfo JSON only display 100 characters, it won't help.

Before 2.3, the fragment of SparkListenerSQLExecutionStart in event log looks like below (It contains the metadata field which has the intact information):
>{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart", Location: InMemoryFileIndex[hdfs://cluster1/sys/edw/test1/test2/test3/test4..., "metadata": {"Location": "InMemoryFileIndex[hdfs://cluster1/sys/edw/test1/test2/test3/test4/test5/snapshot/dt=20180904]","ReadSchema":"struct<snpsht_start_dt:date,snpsht_end_dt:date,am_ntlogin_name:string,am_first_name:string,am_last_name:string,isg_name:string,CRE_DATE:date,CRE_USER:string,UPD_DATE:timestamp,UPD_USER:string>"}

After apache#18600, metadata field was removed.
>{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart", Location: InMemoryFileIndex[hdfs://cluster1/sys/edw/test1/test2/test3/test4...,

So I add this field back to SparkPlanInfo class. Then it will log out the meta data to event log. Intact information in event log is very useful for offline job analysis.

## How was this patch tested?
Unit test

Closes apache#22353 from LantaoJin/SPARK-25357.

Authored-by: LantaoJin <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants