Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.annotation.meta.getter

import org.apache.spark.{SPARK_VERSION, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.kyuubi.SparkDatasetHelper
import org.apache.spark.util.kvstore.KVIndex

import org.apache.kyuubi.Logging
Expand All @@ -48,7 +49,7 @@ object KyuubiSparkUtil extends Logging {
interruptOnCancel = true)
debug(s"Execute initialization sql: $sql")
try {
spark.sql(sql).isEmpty
SparkDatasetHelper.commandResultOptimized(spark.sql(sql)).isEmpty
} finally {
spark.sparkContext.clearJobGroup()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.network.util.{ByteUnit, JavaUtils}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.catalyst.plans.logical.GlobalLimit
import org.apache.spark.sql.catalyst.plans.logical.{CommandResult, GlobalLimit, LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.execution.{CollectLimitExec, CommandResultExec, HiveResult, LocalTableScanExec, QueryExecution, SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
Expand Down Expand Up @@ -290,4 +290,13 @@ object SparkDatasetHelper extends Logging {
nodeName == "org.apache.spark.sql.execution.command.ExecutedCommandExec" ||
nodeName == "org.apache.spark.sql.execution.CommandResultExec"
}

/** SPARK-47270: Returns a optimized plan for CommandResult, convert to `LocalRelation`. */
def commandResultOptimized[T](dataset: Dataset[T]): Dataset[T] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm -0 on this change, I would consider it a Spark side issue. Spark’s master branch is undergoing a major refactoring, I'm worried about accessing Spark's non-public API in the engine's "core code path".

For users who want to avoid triggering executor launch, they can either patch their spark or set init SQL as something like: SET spark.app.id

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I will cherry-pick the pr into our managed spark 3.5.

dataset.logicalPlan match {
case c: CommandResult =>
Dataset(dataset.sparkSession, LocalRelation(c.output, c.rows))(dataset.encoder)
case _ => dataset
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import org.apache.spark.sql.SparkSession

import org.apache.kyuubi.{KyuubiFunSuite, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_ENGINE_RUNTIME_VERSION

trait WithSparkSQLEngine extends KyuubiFunSuite {
protected var spark: SparkSession = _
Expand All @@ -35,7 +34,9 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {
// Behavior is affected by the initialization SQL: 'SHOW DATABASES'
// SPARK-35378 (3.2.0) makes it triggers job
// SPARK-43124 (4.0.0) makes it avoid triggering job
protected val initJobId: Int = if (SPARK_ENGINE_RUNTIME_VERSION >= "4.0") 0 else 1
// KYUUBI #6789 makes it avoid triggering job
// protected val initJobId: Int = if (SPARK_ENGINE_RUNTIME_VERSION >= "4.0") 0 else 1
protected val initJobId: Int = 0
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do you think about? @pan3793

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our use case, I set initial/mini executors to 0 for notebook connections.

If job triggered by initialization SQL, it will need at least 1 executor before the connection ready.

Copy link
Member Author

@turboFei turboFei Nov 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And for notebook connections, we will init the spark driver in a temporary queue, and then move it to the user queue eventually, so it should be fast to make the connection ready even the user queue has no available resources.

But if job triggered during initialization, it might make it slow if the user queue has no available resources.


override def beforeAll(): Unit = {
startSparkEngine()
Expand Down
Loading