-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-2094][SQL] "Exactly once" semantics for DDL and command statements #1071
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0ad343a
74789c1
cc64f32
48aa2e5
5c7e680
1d00937
f6c7715
d005b03
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,45 +22,69 @@ import org.apache.spark.rdd.RDD | |
| import org.apache.spark.sql.{SQLContext, Row} | ||
| import org.apache.spark.sql.catalyst.expressions.{GenericRow, Attribute} | ||
|
|
||
| trait Command { | ||
| /** | ||
| * A concrete command should override this lazy field to wrap up any side effects caused by the | ||
| * command or any other computation that should be evaluated exactly once. The value of this field | ||
| * can be used as the contents of the corresponding RDD generated from the physical plan of this | ||
| * command. | ||
| * | ||
| * The `execute()` method of all the physical command classes should reference `sideEffectResult` | ||
| * so that the command can be executed eagerly right after the command query is created. | ||
| */ | ||
| protected[sql] lazy val sideEffectResult: Seq[Any] = Seq.empty[Any] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By default, maybe we could have this return "OK" like hive does?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense. And a little off topic, why is
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 for following the same-name convention
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @marmbrus Checked Hive 0.12 source code and confirmed that the "OK" is actually a log rather than the result. Thus essentially we can't distinguish a command (e.g.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, fine to not return OK. Regarding emptyResult that is a bad name, but it is used for things like
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After some thought I think it's not only about naming, the semantics is wrong: en There does exist a Currently
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
We use a Project to build the actual result. However, looks like I already did the separation... so that plan sounds good to me :) |
||
| } | ||
|
|
||
| /** | ||
| * :: DeveloperApi :: | ||
| */ | ||
| @DeveloperApi | ||
| case class SetCommandPhysical(key: Option[String], value: Option[String], output: Seq[Attribute]) | ||
| (@transient context: SQLContext) extends LeafNode { | ||
| def execute(): RDD[Row] = (key, value) match { | ||
| // Set value for key k; the action itself would | ||
| // have been performed in QueryExecution eagerly. | ||
| case (Some(k), Some(v)) => context.emptyResult | ||
| case class SetCommand( | ||
| key: Option[String], value: Option[String], output: Seq[Attribute])( | ||
| @transient context: SQLContext) | ||
| extends LeafNode with Command { | ||
|
|
||
| override protected[sql] lazy val sideEffectResult: Seq[(String, String)] = (key, value) match { | ||
| // Set value for key k. | ||
| case (Some(k), Some(v)) => | ||
| context.set(k, v) | ||
| Array(k -> v) | ||
|
|
||
| // Query the value bound to key k. | ||
| case (Some(k), None) => | ||
| val resultString = context.getOption(k) match { | ||
| case Some(v) => s"$k=$v" | ||
| case None => s"$k is undefined" | ||
| } | ||
| context.sparkContext.parallelize(Seq(new GenericRow(Array[Any](resultString))), 1) | ||
| case (Some(k), _) => | ||
| Array(k -> context.getOption(k).getOrElse("<undefined>")) | ||
|
|
||
| // Query all key-value pairs that are set in the SQLConf of the context. | ||
| case (None, None) => | ||
| val pairs = context.getAll | ||
| val rows = pairs.map { case (k, v) => | ||
| new GenericRow(Array[Any](s"$k=$v")) | ||
| }.toSeq | ||
| // Assume config parameters can fit into one split (machine) ;) | ||
| context.sparkContext.parallelize(rows, 1) | ||
| // The only other case is invalid semantics and is impossible. | ||
| case _ => context.emptyResult | ||
| context.getAll | ||
|
|
||
| case _ => | ||
| throw new IllegalArgumentException() | ||
| } | ||
|
|
||
| def execute(): RDD[Row] = { | ||
| val rows = sideEffectResult.map { case (k, v) => new GenericRow(Array[Any](k, v)) } | ||
| context.sparkContext.parallelize(rows, 1) | ||
| } | ||
|
|
||
| override def otherCopyArgs = context :: Nil | ||
| } | ||
|
|
||
| /** | ||
| * :: DeveloperApi :: | ||
| */ | ||
| @DeveloperApi | ||
| case class ExplainCommandPhysical(child: SparkPlan, output: Seq[Attribute]) | ||
| (@transient context: SQLContext) extends UnaryNode { | ||
| case class ExplainCommand( | ||
| child: SparkPlan, output: Seq[Attribute])( | ||
| @transient context: SQLContext) | ||
| extends UnaryNode with Command { | ||
|
|
||
| // Actually "EXPLAIN" command doesn't cause any side effect. | ||
| override protected[sql] lazy val sideEffectResult: Seq[String] = this.toString.split("\n") | ||
|
|
||
| def execute(): RDD[Row] = { | ||
| val planString = new GenericRow(Array[Any](child.toString)) | ||
| context.sparkContext.parallelize(Seq(planString)) | ||
| val explanation = sideEffectResult.mkString("\n") | ||
| context.sparkContext.parallelize(Seq(new GenericRow(Array[Any](explanation))), 1) | ||
| } | ||
|
|
||
| override def otherCopyArgs = context :: Nil | ||
|
|
@@ -70,19 +94,20 @@ case class ExplainCommandPhysical(child: SparkPlan, output: Seq[Attribute]) | |
| * :: DeveloperApi :: | ||
| */ | ||
| @DeveloperApi | ||
| case class CacheCommandPhysical(tableName: String, doCache: Boolean)(@transient context: SQLContext) | ||
| extends LeafNode { | ||
| case class CacheCommand(tableName: String, doCache: Boolean)(@transient context: SQLContext) | ||
| extends LeafNode with Command { | ||
|
|
||
| lazy val commandSideEffect = { | ||
| override protected[sql] lazy val sideEffectResult = { | ||
| if (doCache) { | ||
| context.cacheTable(tableName) | ||
| } else { | ||
| context.uncacheTable(tableName) | ||
| } | ||
| Seq.empty[Any] | ||
| } | ||
|
|
||
| override def execute(): RDD[Row] = { | ||
| commandSideEffect | ||
| sideEffectResult | ||
| context.emptyResult | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Realized that many
SchemaRDDactions other thancollect()and DSL methods reuseslogicalPlanand breaks the "exactly once" constraints when planning the local plan (new physical plan node for DDL/command statements are created, causing the side effect taking place again).So I replaced
logicalPlanwith the executed physical plan wrapped with aSparkLogicalPlanto prevent multiple physical plan instantiations for the same DDL/command statement.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. This is probably the problem I was seeing with double "UNCACHE TABLE".