From 751ded07c7f4e8ab888b3174e3310f603b79ae68 Mon Sep 17 00:00:00 2001 From: Salil Surendran Date: Fri, 20 Jan 2017 12:08:12 -0800 Subject: [PATCH 1/4] [SPARK-18120 ][SQL] Call QueryExecutionListener callback methods for DataFrameWriter methods QueryExecutionListener has two methods onSuccess() and onFailure() that takes a QueryExecution object as a parameter that gets called when a query is executed. It gets called for several of the DataSet methods like take, head, first, collect etc. but doesn't get called for any of the DataFrameWriter methods like saveAsTable, save etc. This commit fixes this issue and makes calls to these two methods from DataFrameWriter output methods. Also, added a new property "spark.sql.queryExecutionListeners" that can be used to specify instances of QueryExecutionListeners that should be attached to the SparkSession when the spark application starts up. Testing was done using unit tests. --- docs/sql-programming-guide.md | 15 +- project/MimaExcludes.scala | 8 +- .../apache/spark/sql/DataFrameWriter.scala | 87 +++++++++- .../org/apache/spark/sql/SparkSession.scala | 13 +- .../apache/spark/sql/internal/SQLConf.scala | 7 + .../sql/util/QueryExecutionListener.scala | 50 ++++-- .../SparkSQLQueryExecutionListenerSuite.scala | 149 ++++++++++++++++++ .../sql/util/DataFrameCallbackSuite.scala | 124 +++++++++++++-- 8 files changed, 423 insertions(+), 30 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/SparkSQLQueryExecutionListenerSuite.scala diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index f4c89e58fa431..fcf4f9cf43723 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1256,8 +1256,9 @@ Configuration of in-memory caching can be done using the `setConf` method on `Sp ## Other Configuration Options -The following options can also be used to tune the performance of query execution. It is possible -that these options will be deprecated in future release as more optimizations are performed automatically. +The following options can also be used to tune the performance of query execution and attaching +query execution listeners. It is possible that these options will be deprecated in future release as +more optimizations are performed automatically. @@ -1304,6 +1305,16 @@ that these options will be deprecated in future release as more optimizations ar Configures the number of partitions to use when shuffling data for joins or aggregations. + + + + +
Property NameDefaultMeaning
spark.sql.queryExecutionListeners + A comma-separated list of classes that implement QueryExecutionListener. When creating a SparkSession, + instances of these listeners will be added to it. These classes needs to have a zero-argument + constructor. If the specified class can't be found or the class specified doesn't have a valid + constructor the SparkSession creation will fail with an exception. +
# Distributed SQL Engine diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index bf628210a16e6..9815d41cb3b27 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -128,7 +128,13 @@ object MimaExcludes { ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.startOffset"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.endOffset"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.this"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryException.query") + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryException.query"), + + // [SPARK-18120 ][SQL] Call QueryExecutionListener callback methods for DataFrameWriter methods + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.util.QueryExecutionListener.onSuccess"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.util.QueryExecutionListener.onFailure"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.util.QueryExecutionListener.onSuccess"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.util.QueryExecutionListener.onFailure") ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index ff1f0177e8ba0..8738f281e2801 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -26,10 +26,12 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable +import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation} import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.{OutputParams, QueryExecutionListener} /** * Interface used to write a [[Dataset]] to external storage systems (e.g. file systems, @@ -189,6 +191,33 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { this } + /** + * Executes the query and calls the {@link org.apache.spark.sql.util.QueryExecutionListener} + * methods. + * + * @param funcName A identifier for the method executing the query + * @param qe the @see [[QueryExecution]] object associated with the + * query + * @param outputParams The output parameters useful for query analysis + * @param action the function that executes the query after which the listener methods gets + * called. + */ + private def executeAndCallQEListener( + funcName: String, + qe: QueryExecution, + outputParams: OutputParams)(action: => Unit) = { + try { + val start = System.nanoTime() + action + val end = System.nanoTime() + df.sparkSession.listenerManager.onSuccess(funcName, qe, end - start, Some(outputParams)) + } catch { + case e: Exception => + df.sparkSession.listenerManager.onFailure(funcName, qe, e, Some(outputParams)) + throw e + } + } + /** * Saves the content of the `DataFrame` at the specified path. * @@ -218,7 +247,17 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { bucketSpec = getBucketSpec, options = extraOptions.toMap) - dataSource.write(mode, df) + val destination = source match { + case "jdbc" => extraOptions.get("dbtable") + case _ => extraOptions.get("path") + } + + executeAndCallQEListener( + "save", + df.queryExecution, + OutputParams(source, destination, extraOptions.toMap)) { + dataSource.write(mode, df) + } } /** @@ -244,6 +283,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * * Because it inserts data to an existing table, format or options will be ignored. * + * Calls the callback methods of @see[[QueryExecutionListener]] after query execution with + * @see[[OutputParams]] having datasourceType set as the string parameter passed to the + * @see[[DataFrameWriter#format]] method and destination set as the name of the table into which + * data is being inserted into. + * * @since 1.4.0 */ def insertInto(tableName: String): Unit = { @@ -261,13 +305,19 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { ) } - df.sparkSession.sessionState.executePlan( + val qe = df.sparkSession.sessionState.executePlan( InsertIntoTable( table = UnresolvedRelation(tableIdent), partition = Map.empty[String, Option[String]], child = df.logicalPlan, overwrite = mode == SaveMode.Overwrite, - ifNotExists = false)).toRdd + ifNotExists = false)) + executeAndCallQEListener( + "insertInto", + qe, + new OutputParams(source, Some(tableIdent.unquotedString), extraOptions.toMap)) { + qe.toRdd + } } private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { cols => @@ -324,7 +374,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { private def assertNotPartitioned(operation: String): Unit = { if (partitioningColumns.isDefined) { - throw new AnalysisException( s"'$operation' does not support partitioning") + throw new AnalysisException(s"'$operation' does not support partitioning") } } @@ -359,6 +409,10 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * like Hive will be able to read this table. Otherwise, the table is persisted in a Spark SQL * specific format. * + * Calls the callback methods of @see[[QueryExecutionListener]] after query execution with a + * @see[[OutputParams]] object having datasourceType set as the string parameter passed to the + * @see[[DataFrameWriter#format]] and destination set as the name of the table being + * written to * @since 1.4.0 */ def saveAsTable(tableName: String): Unit = { @@ -428,8 +482,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { partitionColumnNames = partitioningColumns.getOrElse(Nil), bucketSpec = getBucketSpec ) - df.sparkSession.sessionState.executePlan( - CreateTable(tableDesc, mode, Some(df.logicalPlan))).toRdd + val qe = df.sparkSession.sessionState.executePlan( + CreateTable(tableDesc, mode, Some(df.logicalPlan))) + executeAndCallQEListener( + "saveAsTable", + qe, + new OutputParams(source, Some(tableIdent.unquotedString), extraOptions.toMap)) { + qe.toRdd + } } /** @@ -493,6 +553,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type. * + * Calls the callback methods in @see[[QueryExecutionListener]] methods after query execution with + * @see[[OutputParams]] having datasourceType set as string constant "json" and + * destination set as the path to which the data is written * * @since 1.4.0 */ @@ -514,6 +577,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * shorten names(none, `snappy`, `gzip`, and `lzo`). This will override * `spark.sql.parquet.compression.codec`. * + * Calls the callback methods in @see[[QueryExecutionListener]] methods after query execution with + * @see[[OutputParams]] having datasourceType set as string constant "parquet" and + * destination set as the path to which the data is written * * @since 1.4.0 */ @@ -534,6 +600,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * one of the known case-insensitive shorten names(`none`, `snappy`, `zlib`, and `lzo`). * This will override `orc.compress`. * + * Calls the callback methods in @see[[QueryExecutionListener]] methods after query execution with + * @see[[OutputParams]] having datasourceType set as string constant "orc" and + * destination set as the path to which the data is written * * @since 1.5.0 * @note Currently, this method can only be used after enabling Hive support @@ -560,6 +629,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`, * `snappy` and `deflate`). * + * Calls the callback methods in e@see[[QueryExecutionListener]] methods after query execution + * with @see[[OutputParams]] having datasourceType set as string constant "text" and + * destination set as the path to which the data is written * * @since 1.6.0 */ @@ -599,6 +671,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type. * + * Calls the callback methods in @see[[QueryExecutionListener]] methods after query execution with + * @see[[OutputParams]] having datasourceType set as string constant "csv" and + * destination set as the path to which the data is written * * @since 2.0.0 */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index f3dde480eabe0..f1d2c57c2d58d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -40,12 +40,12 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.ui.SQLListener -import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState} +import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState, SQLConf} import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.streaming._ import org.apache.spark.sql.types.{DataType, LongType, StructType} -import org.apache.spark.sql.util.ExecutionListenerManager +import org.apache.spark.sql.util.{ExecutionListenerManager, QueryExecutionListener} import org.apache.spark.util.Utils @@ -876,6 +876,9 @@ object SparkSession { } session = new SparkSession(sparkContext) options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) } + for (qeListener <- createQueryExecutionListeners(session.sparkContext.getConf)) { + session.listenerManager.register(qeListener) + } defaultSession.set(session) // Register a successfully instantiated context to the singleton. This should be at the @@ -893,6 +896,12 @@ object SparkSession { } } + private def createQueryExecutionListeners(conf: SparkConf): Seq[QueryExecutionListener] = { + conf.get(SQLConf.QUERY_EXECUTION_LISTENERS) + .map(Utils.classForName(_)) + .map(_.newInstance().asInstanceOf[QueryExecutionListener]) + } + /** * Creates a [[SparkSession.Builder]] for constructing a [[SparkSession]]. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 645b0fa13ee3f..a95b8c2d33caa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -655,6 +655,13 @@ object SQLConf { .booleanConf .createWithDefault(false) + val QUERY_EXECUTION_LISTENERS = + ConfigBuilder("spark.sql.queryExecutionListeners") + .doc("QueryExecutionListeners to be attached to the SparkSession") + .stringConf + .toSequence + .createWithDefault(Nil) + object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala index 26ad0eadd9d4c..c33a196f932b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala @@ -44,12 +44,15 @@ trait QueryExecutionListener { * @param qe the QueryExecution object that carries detail information like logical plan, * physical plan, etc. * @param durationNs the execution time for this query in nanoseconds. - * - * @note This can be invoked by multiple different threads. + * @param outputParams The output parameters in case the method is invoked as a result of a + * write operation. In case of a read will be @see[[None]] */ @DeveloperApi - def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit - + def onSuccess( + funcName: String, + qe: QueryExecution, + durationNs: Long, + outputParams: Option[OutputParams]): Unit /** * A callback function that will be called when a query execution failed. * @@ -57,14 +60,33 @@ trait QueryExecutionListener { * @param qe the QueryExecution object that carries detail information like logical plan, * physical plan, etc. * @param exception the exception that failed this query. + * @param outputParams The output parameters in case the method is invoked as a result of a + * write operation. In case of a read will be @see[[None]] * * @note This can be invoked by multiple different threads. */ @DeveloperApi - def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit + def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception, + outputParams: Option[OutputParams]): Unit } - +/** + * Contains extra information useful for query analysis passed on from the methods in + * @see[[org.apache.spark.sql.DataFrameWriter]] while writing to a datasource + * @param datasourceType type of data source written to like csv, parquet, json, hive, jdbc etc. + * @param destination path or table name written to + * @param options the map containing the output options for the underlying datasource + * specified by using the @see [[org.apache.spark.sql.DataFrameWriter#option]] method + * @param writeParams will contain any extra information that the write method wants to provide + */ +case class OutputParams( + datasourceType: String, + destination: Option[String], + options: Map[String, String], + writeParams: Map[String, String] = Map.empty) /** * :: Experimental :: * @@ -98,18 +120,26 @@ class ExecutionListenerManager private[sql] () extends Logging { listeners.clear() } - private[sql] def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { + private[sql] def onSuccess( + funcName: String, + qe: QueryExecution, + duration: Long, + outputParams: Option[OutputParams] = None): Unit = { readLock { withErrorHandling { listener => - listener.onSuccess(funcName, qe, duration) + listener.onSuccess(funcName, qe, duration, outputParams) } } } - private[sql] def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { + private[sql] def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception, + outputParams: Option[OutputParams] = None): Unit = { readLock { withErrorHandling { listener => - listener.onFailure(funcName, qe, exception) + listener.onFailure(funcName, qe, exception, outputParams) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSQLQueryExecutionListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSQLQueryExecutionListenerSuite.scala new file mode 100644 index 0000000000000..bbe804a33258c --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSQLQueryExecutionListenerSuite.scala @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql + +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mock.MockitoSugar + +import org.apache.spark.{SparkContext, SparkFunSuite} +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.util.{OutputParams, QueryExecutionListener} + +/** + * Test cases for the property 'spark.sql.queryExecutionListeners' that adds the + * @see[[QueryExecutionListener]] to a @see[[SparkSession]] + */ +class SparkSQLQueryExecutionListenerSuite + extends SparkFunSuite + with MockitoSugar + with BeforeAndAfterEach { + + override def afterEach(): Unit = { + SparkSession.clearActiveSession() + SparkSession.clearDefaultSession() + SparkContext.clearActiveContext() + } + + test("Creation of SparkContext with non-existent QueryExecutionListener class fails fast") { + intercept[ClassNotFoundException] { + SparkSession + .builder() + .master("local") + .config("spark.sql.queryExecutionListeners", "non.existent.QueryExecutionListener") + .getOrCreate() + } + assert(!SparkSession.getDefaultSession.isDefined) + } + + test("QueryExecutionListener that doesn't have a default constructor fails fast") { + intercept[InstantiationException] { + SparkSession + .builder() + .master("local") + .config("spark.sql.queryExecutionListeners", classOf[NoZeroArgConstructorListener].getName) + .getOrCreate() + } + assert(!SparkSession.getDefaultSession.isDefined) + } + + test("Normal QueryExecutionListeners gets added as listeners") { + val sparkSession = SparkSession + .builder() + .master("local") + .config("mykey", "myvalue") + .config("spark.sql.queryExecutionListeners", + classOf[NormalQueryExecutionListener].getName + " ," + + classOf[AnotherQueryExecutionListener].getName) + .getOrCreate() + assert(SparkSession.getDefaultSession.isDefined) + assert(NormalQueryExecutionListener.successCount === 0) + assert(NormalQueryExecutionListener.failureCount === 0) + assert(AnotherQueryExecutionListener.successCount === 0) + assert(AnotherQueryExecutionListener.failureCount === 0) + sparkSession.listenerManager.onSuccess("test1", mock[QueryExecution], 0) + assert(NormalQueryExecutionListener.successCount === 1) + assert(NormalQueryExecutionListener.failureCount === 0) + assert(AnotherQueryExecutionListener.successCount === 1) + assert(AnotherQueryExecutionListener.failureCount === 0) + sparkSession.listenerManager.onFailure("test2", mock[QueryExecution], new Exception) + assert(NormalQueryExecutionListener.successCount === 1) + assert(NormalQueryExecutionListener.failureCount === 1) + assert(AnotherQueryExecutionListener.successCount === 1) + assert(AnotherQueryExecutionListener.failureCount === 1) + } +} + +class NoZeroArgConstructorListener(myString: String) extends QueryExecutionListener { + + override def onSuccess( + funcName: String, + qe: QueryExecution, + durationNs: Long, + options: Option[OutputParams] + ): Unit = {} + + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception, + options: Option[OutputParams] + ): Unit = {} +} + +class NormalQueryExecutionListener extends QueryExecutionListener { + + override def onSuccess( + funcName: String, + qe: QueryExecution, + durationNs: Long, + options: Option[OutputParams] + ): Unit = { NormalQueryExecutionListener.successCount += 1 } + + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception, + options: Option[OutputParams] + ): Unit = { NormalQueryExecutionListener.failureCount += 1 } +} + +object NormalQueryExecutionListener { + var successCount = 0; + var failureCount = 0; +} + +class AnotherQueryExecutionListener extends QueryExecutionListener { + + override def onSuccess( + funcName: String, + qe: QueryExecution, + durationNs: Long, + options: Option[OutputParams] + ): Unit = { AnotherQueryExecutionListener.successCount += 1 } + + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception, + options: Option[OutputParams] + ): Unit = { AnotherQueryExecutionListener.failureCount += 1 } +} + +object AnotherQueryExecutionListener { + var successCount = 0; + var failureCount = 0; +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala index 3ae5ce610d2a6..5761d0c0b1fd4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.util import scala.collection.mutable.ArrayBuffer import org.apache.spark._ -import org.apache.spark.sql.{functions, QueryTest} +import org.apache.spark.sql.{functions, DataFrame, QueryTest} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Project} import org.apache.spark.sql.execution.{QueryExecution, WholeStageCodegenExec} import org.apache.spark.sql.test.SharedSQLContext @@ -33,9 +33,17 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { val metrics = ArrayBuffer.empty[(String, QueryExecution, Long)] val listener = new QueryExecutionListener { // Only test successful case here, so no need to implement `onFailure` - override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception, + outputParams: Option[OutputParams]): Unit = {} - override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { + override def onSuccess( + funcName: String, + qe: QueryExecution, + duration: Long, + outputParams: Option[OutputParams]): Unit = { metrics += ((funcName, qe, duration)) } } @@ -61,12 +69,20 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { test("execute callback functions when a DataFrame action failed") { val metrics = ArrayBuffer.empty[(String, QueryExecution, Exception)] val listener = new QueryExecutionListener { - override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception, + outputParams: Option[OutputParams]): Unit = { metrics += ((funcName, qe, exception)) } // Only test failed case here, so no need to implement `onSuccess` - override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {} + override def onSuccess( + funcName: String, + qe: QueryExecution, + duration: Long, + outputParams: Option[OutputParams]): Unit = {} } spark.listenerManager.register(listener) @@ -89,9 +105,17 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { val metrics = ArrayBuffer.empty[Long] val listener = new QueryExecutionListener { // Only test successful case here, so no need to implement `onFailure` - override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception, + outputParams: Option[OutputParams]): Unit = {} - override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { + override def onSuccess( + funcName: String, + qe: QueryExecution, + duration: Long, + outputParams: Option[OutputParams]): Unit = { val metric = qe.executedPlan match { case w: WholeStageCodegenExec => w.child.longMetric("numOutputRows") case other => other.longMetric("numOutputRows") @@ -114,6 +138,55 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { spark.listenerManager.unregister(listener) } + test("QueryExecutionListener gets called on DataFrameWriter.parquet method") { + callSave("parquet", (df: DataFrame, path: String) => df.write.parquet(path)) + } + + test("QueryExecutionListener gets called on DataFrameWriter.json method") { + callSave("json", (df: DataFrame, path: String) => df.write.json(path)) + } + + test("QueryExecutionListener gets called on DataFrameWriter.csv method") { + callSave("csv", (df: DataFrame, path: String) => df.write.csv(path)) + } + + test("QueryExecutionListener gets called on DataFrameWriter.saveAsTable method") { + var onWriteSuccessCalled = false + spark.listenerManager.register(new QueryExecutionListener { + + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception, + outputParams: Option[OutputParams]): Unit = {} + + override def onSuccess( + funcName: String, + qe: QueryExecution, + durationNs: Long, + outputParams: Option[OutputParams]): Unit = { + assert(durationNs > 0) + assert(qe ne null) + onWriteSuccessCalled = true + } + }) + withTable("bar") { + Seq(1 -> 100).toDF("x", "y").write.saveAsTable("bar") + } + assert(onWriteSuccessCalled) + spark.listenerManager.clear() + } + + private def callSave(source: String, callSaveFunction: (DataFrame, String) => Unit): Unit = { + val testQueryExecutionListener = new TestQueryExecutionListener(source) + spark.listenerManager.register(testQueryExecutionListener) + withTempPath { path => + callSaveFunction(Seq(1 -> 100).toDF("x", "y"), path.getAbsolutePath) + } + assert(testQueryExecutionListener.onWriteSuccessCalled) + spark.listenerManager.clear() + } + // TODO: Currently some LongSQLMetric use -1 as initial value, so if the accumulator is never // updated, we can filter it out later. However, when we aggregate(sum) accumulator values at // driver side for SQL physical operators, these -1 values will make our result smaller. @@ -123,9 +196,17 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { val metrics = ArrayBuffer.empty[Long] val listener = new QueryExecutionListener { // Only test successful case here, so no need to implement `onFailure` - override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception, + outputParams: Option[OutputParams]): Unit = {} - override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { + override def onSuccess( + funcName: String, + qe: QueryExecution, + duration: Long, + outputParams: Option[OutputParams]): Unit = { metrics += qe.executedPlan.longMetric("dataSize").value val bottomAgg = qe.executedPlan.children(0).children(0) metrics += bottomAgg.longMetric("dataSize").value @@ -159,4 +240,29 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { spark.listenerManager.unregister(listener) } + + class TestQueryExecutionListener(source: String) extends QueryExecutionListener { + var onWriteSuccessCalled = false + + // Only test successful case here, so no need to implement `onFailure` + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception, + outputParams: Option[OutputParams]): Unit = {} + + override def onSuccess( + funcName: String, + qe: QueryExecution, + durationNs: Long, + outputParams: Option[OutputParams]): Unit = { + assert(qe ne null) + assert(outputParams.isDefined) + assert(!outputParams.get.destination.isEmpty) + assert(!outputParams.get.datasourceType.isEmpty) + assert(durationNs > 0) + onWriteSuccessCalled = true + } + } + } From 752125a10253ca15e260f317868ef7aacd3c510e Mon Sep 17 00:00:00 2001 From: Salil Surendran Date: Mon, 30 Jan 2017 15:59:02 -0800 Subject: [PATCH 2/4] Fixing code review comments --- .../scala/org/apache/spark/sql/DataFrameWriter.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 8738f281e2801..93c9838ac3515 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -196,16 +196,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * methods. * * @param funcName A identifier for the method executing the query - * @param qe the @see [[QueryExecution]] object associated with the - * query + * @param qe the @see [[QueryExecution]] object associated with the query * @param outputParams The output parameters useful for query analysis * @param action the function that executes the query after which the listener methods gets * called. */ private def executeAndCallQEListener( - funcName: String, - qe: QueryExecution, - outputParams: OutputParams)(action: => Unit) = { + funcName: String, + qe: QueryExecution, + outputParams: OutputParams)(action: => Unit) = { try { val start = System.nanoTime() action From ecf9f34addb772e9a09936420c1ad43cdd930685 Mon Sep 17 00:00:00 2001 From: Salil Surendran Date: Fri, 3 Feb 2017 14:54:11 -0800 Subject: [PATCH 3/4] Committing to fix code review issues. --- docs/sql-programming-guide.md | 33 ++++++----- .../apache/spark/sql/DataFrameWriter.scala | 55 ++++--------------- .../org/apache/spark/sql/SparkSession.scala | 4 +- .../apache/spark/sql/internal/SQLConf.scala | 19 ++++--- .../sql/util/QueryExecutionListener.scala | 9 +-- .../SparkSQLQueryExecutionListenerSuite.scala | 2 +- .../sql/util/DataFrameCallbackSuite.scala | 7 ++- 7 files changed, 53 insertions(+), 76 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index faa3879e5b3dd..1a44cf20075dc 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1300,11 +1300,28 @@ Configuration of in-memory caching can be done using the `setConf` method on `Sp +## QueryExecutionListener Options +Use this configuration option to attach query execution listeners + + + + + + + + +
Property NameDefaultMeaning
spark.sql.queryExecutionListeners + A comma-separated list of classes that implement QueryExecutionListener. When creating a SparkSession, + instances of these listeners will be added to it. These classes needs to have a zero-argument + constructor. If the specified class can't be found or the class specified doesn't have a valid + constructor the SparkSession creation will fail with an exception. +
+ ## Other Configuration Options -The following options can also be used to tune the performance of query execution and attaching -query execution listeners. It is possible that these options will be deprecated in future release as -more optimizations are performed automatically. +The following options can also be used to tune the performance of query execution. It is possible +that these options will be deprecated in future release as more optimizations are performed +automatically. @@ -1351,16 +1368,6 @@ more optimizations are performed automatically. Configures the number of partitions to use when shuffling data for joins or aggregations. - - - - -
Property NameDefaultMeaning
spark.sql.queryExecutionListeners - A comma-separated list of classes that implement QueryExecutionListener. When creating a SparkSession, - instances of these listeners will be added to it. These classes needs to have a zero-argument - constructor. If the specified class can't be found or the class specified doesn't have a valid - constructor the SparkSession creation will fail with an exception. -
# Distributed SQL Engine diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 93c9838ac3515..8273cf7f355ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -192,16 +192,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { } /** - * Executes the query and calls the {@link org.apache.spark.sql.util.QueryExecutionListener} - * methods. + * Wrap a DataFrameWriter action to track the query execution and time cost, then report to the + * user-registered callback functions. * * @param funcName A identifier for the method executing the query - * @param qe the @see [[QueryExecution]] object associated with the query + * @param qe the @see `QueryExecution` object associated with the query * @param outputParams The output parameters useful for query analysis * @param action the function that executes the query after which the listener methods gets * called. */ - private def executeAndCallQEListener( + private def withAction( funcName: String, qe: QueryExecution, outputParams: OutputParams)(action: => Unit) = { @@ -250,11 +250,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { case "jdbc" => extraOptions.get("dbtable") case _ => extraOptions.get("path") } - - executeAndCallQEListener( - "save", - df.queryExecution, - OutputParams(source, destination, extraOptions.toMap)) { + val outputParams = OutputParams(source, destination, extraOptions.toMap) + withAction("save", df.queryExecution, outputParams) { dataSource.write(mode, df) } } @@ -282,11 +279,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * * Because it inserts data to an existing table, format or options will be ignored. * - * Calls the callback methods of @see[[QueryExecutionListener]] after query execution with - * @see[[OutputParams]] having datasourceType set as the string parameter passed to the - * @see[[DataFrameWriter#format]] method and destination set as the name of the table into which - * data is being inserted into. - * * @since 1.4.0 */ def insertInto(tableName: String): Unit = { @@ -311,12 +303,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { child = df.logicalPlan, overwrite = mode == SaveMode.Overwrite, ifNotExists = false)) - executeAndCallQEListener( - "insertInto", - qe, - new OutputParams(source, Some(tableIdent.unquotedString), extraOptions.toMap)) { - qe.toRdd - } + val outputParams = OutputParams(source, Some(tableIdent.unquotedString), extraOptions.toMap) + withAction("insertInto", qe, outputParams)(qe.toRdd) } private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { cols => @@ -408,10 +396,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * like Hive will be able to read this table. Otherwise, the table is persisted in a Spark SQL * specific format. * - * Calls the callback methods of @see[[QueryExecutionListener]] after query execution with a - * @see[[OutputParams]] object having datasourceType set as the string parameter passed to the - * @see[[DataFrameWriter#format]] and destination set as the name of the table being - * written to * @since 1.4.0 */ def saveAsTable(tableName: String): Unit = { @@ -483,12 +467,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { ) val qe = df.sparkSession.sessionState.executePlan( CreateTable(tableDesc, mode, Some(df.logicalPlan))) - executeAndCallQEListener( - "saveAsTable", - qe, - new OutputParams(source, Some(tableIdent.unquotedString), extraOptions.toMap)) { - qe.toRdd - } + val outputParams = new OutputParams(source, Some(tableIdent.unquotedString), extraOptions.toMap) + withAction("saveAsTable", qe, outputParams)(qe.toRdd) } /** @@ -552,9 +532,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type. * - * Calls the callback methods in @see[[QueryExecutionListener]] methods after query execution with - * @see[[OutputParams]] having datasourceType set as string constant "json" and - * destination set as the path to which the data is written * * @since 1.4.0 */ @@ -576,9 +553,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * shorten names(none, `snappy`, `gzip`, and `lzo`). This will override * `spark.sql.parquet.compression.codec`. * - * Calls the callback methods in @see[[QueryExecutionListener]] methods after query execution with - * @see[[OutputParams]] having datasourceType set as string constant "parquet" and - * destination set as the path to which the data is written * * @since 1.4.0 */ @@ -599,9 +573,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * one of the known case-insensitive shorten names(`none`, `snappy`, `zlib`, and `lzo`). * This will override `orc.compress`. * - * Calls the callback methods in @see[[QueryExecutionListener]] methods after query execution with - * @see[[OutputParams]] having datasourceType set as string constant "orc" and - * destination set as the path to which the data is written * * @since 1.5.0 * @note Currently, this method can only be used after enabling Hive support @@ -628,9 +599,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`, * `snappy` and `deflate`). * - * Calls the callback methods in e@see[[QueryExecutionListener]] methods after query execution - * with @see[[OutputParams]] having datasourceType set as string constant "text" and - * destination set as the path to which the data is written * * @since 1.6.0 */ @@ -670,9 +638,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type. * - * Calls the callback methods in @see[[QueryExecutionListener]] methods after query execution with - * @see[[OutputParams]] having datasourceType set as string constant "csv" and - * destination set as the path to which the data is written * * @since 2.0.0 */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index f1d2c57c2d58d..8a027c56cdf5b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.ui.SQLListener -import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState, SQLConf} +import org.apache.spark.sql.internal._ import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.streaming._ @@ -897,7 +897,7 @@ object SparkSession { } private def createQueryExecutionListeners(conf: SparkConf): Seq[QueryExecutionListener] = { - conf.get(SQLConf.QUERY_EXECUTION_LISTENERS) + conf.get(StaticSQLConf.QUERY_EXECUTION_LISTENERS) .map(Utils.classForName(_)) .map(_.newInstance().asInstanceOf[QueryExecutionListener]) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 3d1a8373388bc..b8b9a2e03f638 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -660,21 +660,12 @@ object SQLConf { .booleanConf .createWithDefault(false) - - val QUERY_EXECUTION_LISTENERS = - ConfigBuilder("spark.sql.queryExecutionListeners") - .doc("QueryExecutionListeners to be attached to the SparkSession") - .stringConf - .toSequence - .createWithDefault(Nil) - val SESSION_LOCAL_TIMEZONE = SQLConfigBuilder("spark.sql.session.timeZone") .doc("""The ID of session local timezone, e.g. "GMT", "America/Los_Angeles", etc.""") .stringConf .createWithDefault(TimeZone.getDefault().getID()) - object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" } @@ -1056,4 +1047,14 @@ object StaticSQLConf { "SQL configuration and the current database.") .booleanConf .createWithDefault(false) + + val QUERY_EXECUTION_LISTENERS = buildConf("spark.sql.queryExecutionListeners") + .doc("A comma-separated list of classes that implement QueryExecutionListener. When creating " + + "a SparkSession, instances of these listeners will be added to it. These classes " + + "needs to have a zero-argument constructor. If the specified class can't be found or" + + " the class specified doesn't have a valid constructor the SparkSession creation " + + "will fail with an exception.") + .stringConf + .toSequence + .createWithDefault(Nil) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala index c33a196f932b6..2f0b39d8a7990 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala @@ -45,7 +45,7 @@ trait QueryExecutionListener { * physical plan, etc. * @param durationNs the execution time for this query in nanoseconds. * @param outputParams The output parameters in case the method is invoked as a result of a - * write operation. In case of a read will be @see[[None]] + * write operation. In case of a read will be @see `None` */ @DeveloperApi def onSuccess( @@ -61,7 +61,7 @@ trait QueryExecutionListener { * physical plan, etc. * @param exception the exception that failed this query. * @param outputParams The output parameters in case the method is invoked as a result of a - * write operation. In case of a read will be @see[[None]] + * write operation. In case of a read will be @see `None` * * @note This can be invoked by multiple different threads. */ @@ -75,13 +75,14 @@ trait QueryExecutionListener { /** * Contains extra information useful for query analysis passed on from the methods in - * @see[[org.apache.spark.sql.DataFrameWriter]] while writing to a datasource + * @see `org.apache.spark.sql.DataFrameWriter` while writing to a datasource * @param datasourceType type of data source written to like csv, parquet, json, hive, jdbc etc. * @param destination path or table name written to * @param options the map containing the output options for the underlying datasource - * specified by using the @see [[org.apache.spark.sql.DataFrameWriter#option]] method + * specified by using the @see `org.apache.spark.sql.DataFrameWriter#option` method * @param writeParams will contain any extra information that the write method wants to provide */ +@DeveloperApi case class OutputParams( datasourceType: String, destination: Option[String], diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSQLQueryExecutionListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSQLQueryExecutionListenerSuite.scala index bbe804a33258c..1e823a9840f48 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSQLQueryExecutionListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSQLQueryExecutionListenerSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.util.{OutputParams, QueryExecutionListener} /** * Test cases for the property 'spark.sql.queryExecutionListeners' that adds the - * @see[[QueryExecutionListener]] to a @see[[SparkSession]] + * @see `QueryExecutionListener` to a @see `SparkSession` */ class SparkSQLQueryExecutionListenerSuite extends SparkFunSuite diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala index 5761d0c0b1fd4..d6bb793d9b620 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala @@ -174,7 +174,6 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { Seq(1 -> 100).toDF("x", "y").write.saveAsTable("bar") } assert(onWriteSuccessCalled) - spark.listenerManager.clear() } private def callSave(source: String, callSaveFunction: (DataFrame, String) => Unit): Unit = { @@ -184,7 +183,6 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { callSaveFunction(Seq(1 -> 100).toDF("x", "y"), path.getAbsolutePath) } assert(testQueryExecutionListener.onWriteSuccessCalled) - spark.listenerManager.clear() } // TODO: Currently some LongSQLMetric use -1 as initial value, so if the accumulator is never @@ -265,4 +263,9 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { } } + protected override def afterEach(): Unit = { + super.afterEach() + spark.listenerManager.clear() + } + } From a0c7c22e4097adbdf12e52db37c26a2246d4eddd Mon Sep 17 00:00:00 2001 From: Salil Surendran Date: Sun, 5 Feb 2017 14:05:46 -0800 Subject: [PATCH 4/4] Committing to fix code review comments --- .../main/scala/org/apache/spark/sql/DataFrameWriter.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 8273cf7f355ac..9e8afcff6c097 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -29,9 +29,10 @@ import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation} +import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.util.{OutputParams, QueryExecutionListener} +import org.apache.spark.sql.util.{OutputParams} /** * Interface used to write a [[Dataset]] to external storage systems (e.g. file systems, @@ -247,7 +248,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { options = extraOptions.toMap) val destination = source match { - case "jdbc" => extraOptions.get("dbtable") + case "jdbc" => extraOptions.get(JDBCOptions.JDBC_TABLE_NAME) case _ => extraOptions.get("path") } val outputParams = OutputParams(source, destination, extraOptions.toMap) @@ -467,7 +468,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { ) val qe = df.sparkSession.sessionState.executePlan( CreateTable(tableDesc, mode, Some(df.logicalPlan))) - val outputParams = new OutputParams(source, Some(tableIdent.unquotedString), extraOptions.toMap) + val outputParams = OutputParams(source, Some(tableIdent.unquotedString), extraOptions.toMap) withAction("saveAsTable", qe, outputParams)(qe.toRdd) }