Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,11 @@ case class CatalogTable(
StructType(partitionFields)
}

/** Return true if the table is stream table */
def isStreaming: Boolean = {
provider.isDefined && storage.properties.getOrElse("isStreaming", "false").toBoolean
}

/**
* schema of this table's data columns
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,33 @@ object SQLConf {
.intConf
.createWithDefault(200)

val SQLSTREAM_WATERMARK_ENABLE = buildConf("spark.sql.streaming.watermark.enable")
.doc("Whether use watermark in sqlstreaming.")
.booleanConf
.createWithDefault(false)

val SQLSTREAM_OUTPUTMODE = buildConf("spark.sql.streaming.outputMode")
.doc("The output mode used in sqlstreaming")
.stringConf
.createWithDefault("append")

val SQLSTREAM_TRIGGER = buildConf("spark.sql.streaming.trigger")
.doc("The structstreaming trigger used in sqlstreaming")
.stringConf
.createWithDefault("0s")

val SQLSTREAM_QUERY_NAME = buildConf("spark.sql.streaming.queryName")
.doc("The structstreaming query name used in sqlstreaming. " +
"User must use spark.sql.streaming.checkpointLocation and " +
"spark.sqlstreaming.queryName to ensure the unique checkpointLocation")
.stringConf
.createOptional

val SQLSTREAM_QUERY_ENABLE = buildConf("spark.sql.streaming.query.enable")
.doc("Whether to enable use sqlstreaming in spark")
.booleanConf
.createWithDefault(false)

// This is used to set the default data source
val DEFAULT_DATA_SOURCE_NAME = buildConf("spark.sql.sources.default")
.doc("The default data source to use in input/output.")
Expand Down Expand Up @@ -1884,6 +1911,16 @@ class SQLConf extends Serializable with Logging {

def broadcastTimeout: Long = getConf(BROADCAST_TIMEOUT)

def sqlStreamWaterMarkEnable: Boolean = getConf(SQLSTREAM_WATERMARK_ENABLE)

def sqlStreamOutputMode: String = getConf(SQLSTREAM_OUTPUTMODE)

def sqlStreamTrigger: String = getConf(SQLSTREAM_TRIGGER)

def sqlStreamQueryName: Option[String] = getConf(SQLSTREAM_QUERY_NAME)

def sqlStreamQueryEnable: Boolean = getConf(SQLSTREAM_QUERY_ENABLE)

def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME)

def convertCTAS: Boolean = getConf(CONVERT_CTAS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,10 @@ object DDLUtils {
table.provider.isDefined && table.provider.get.toLowerCase(Locale.ROOT) != HIVE_PROVIDER
}

def isStreamingTable(table: CatalogTable): Boolean = {
table.isStreaming && table.provider.get.toLowerCase(Locale.ROOT) != HIVE_PROVIDER
}

def readHiveTable(table: CatalogTable): HiveTableRelation = {
HiveTableRelation(
table,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,16 @@ import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.streaming.{SQLStreamingSink, StreamingRelation}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}

/**
* Replaces generic operations with specific variants that are designed to work with Spark
Expand Down Expand Up @@ -221,6 +224,10 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
* data source.
*/
class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
private val sqlConf = sparkSession.sqlContext.conf
private val WATERMARK_COLUMN = "watermark.column"
private val WATERMARK_DEALY = "watermark.delay"

private def readDataSourceTable(table: CatalogTable): LogicalPlan = {
val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table)
val catalog = sparkSession.sessionState.catalog
Expand All @@ -239,12 +246,55 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
options = table.storage.properties ++ pathOption,
catalogTable = Some(table))

LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table)
if (table.isStreaming && sqlConf.sqlStreamQueryEnable) {
val relation =
StreamingRelation(dataSource, table.provider.get, table.schema.toAttributes)
withWatermark(relation, table)
} else {
LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table)
}
}
})
}

/**
* Check watermark enable. If true, add watermark to relation.
* @param relation the basic streaming relation
* @param metadata table meta
* @return
*/
private def withWatermark(relation: LogicalPlan, metadata: CatalogTable): LogicalPlan = {
if (sqlConf.sqlStreamWaterMarkEnable) {
logInfo("Using watermark in sqlstreaming")
val options = metadata.storage.properties
val column = options.getOrElse(WATERMARK_COLUMN,
throw new IllegalArgumentException(s"$WATERMARK_COLUMN is empty"))
val delay = options.getOrElse(WATERMARK_DEALY,
throw new IllegalArgumentException(s"$WATERMARK_DEALY is empty"))
EventTimeWatermark(
UnresolvedAttribute(column),
CalendarInterval.fromString(s"interval $delay"),
relation
)
} else {
logInfo("None watermark found in sqlstreaming")
relation
}
}

private def readHiveTable(table: CatalogTable): LogicalPlan = {
HiveTableRelation(
table,
// Hive table columns are always nullable.
table.dataSchema.asNullable.toAttributes,
table.partitionSchema.asNullable.toAttributes)
}

override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, child, _, _)
if DDLUtils.isStreamingTable(tableMeta) && sqlConf.sqlStreamQueryEnable =>
SQLStreamingSink(sparkSession, tableMeta, child)

case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _)
if DDLUtils.isDatasourceTable(tableMeta) =>
i.copy(table = readDataSourceTable(tableMeta))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming

import java.util.concurrent.TimeUnit

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
import org.apache.spark.sql.sources.v2.StreamingWriteSupportProvider
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.util.Utils

/**
* The basic RunnableCommand for SQLStreaming, using Command.run to start a streaming query.
*
* @param sparkSession
* @param extraOptions
* @param partitionColumnNames
* @param child
*/
case class SQLStreamingSink(sparkSession: SparkSession,
table: CatalogTable,
child: LogicalPlan)
extends RunnableCommand {

private val sqlConf = sparkSession.sqlContext.conf

/**
* The given column name may not be equal to any of the existing column names if we were in
* case-insensitive context. Normalize the given column name to the real one so that we don't
* need to care about case sensitivity afterwards.
*/
private def normalize(df: DataFrame, columnName: String, columnType: String): String = {
val validColumnNames = df.logicalPlan.output.map(_.name)
validColumnNames.find(sparkSession.sessionState.analyzer.resolver(_, columnName))
.getOrElse(throw new AnalysisException(s"$columnType column $columnName not found in " +
s"existing columns (${validColumnNames.mkString(", ")})"))
}

/**
* Parse spark.sqlstreaming.trigger.seconds to Trigger
*/
private def parseTrigger(): Trigger = {
val trigger = Utils.timeStringAsMs(sqlConf.sqlStreamTrigger)
Trigger.ProcessingTime(trigger, TimeUnit.MILLISECONDS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Continuous processing mode is supported now, do you plan to support it? If so I think we can traverse the logical plan to find out whether this is a continuous query and create a ContinuousTrigger

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

/**
* Running by queryExecution.executeCollect()
* @param sparkSession
* @return return empty rdds, save as DDLCommands
*/
override def run(sparkSession: SparkSession): Seq[Row] = {

///////////////////////////////////////////////////////////////////////////////////////
// Builder pattern config options
///////////////////////////////////////////////////////////////////////////////////////
val df = Dataset.ofRows(sparkSession, child)
val outputMode = InternalOutputModes(sqlConf.sqlStreamOutputMode)
val normalizedParCols = table.partitionColumnNames.map {
normalize(df, _, "Partition")
}

val ds = DataSource.lookupDataSource(table.provider.get, sparkSession.sessionState.conf)
val disabledSources = sparkSession.sqlContext.conf.disabledV2StreamingWriters.split(",")
var options = table.storage.properties
val sink = ds.newInstance() match {
case w: StreamingWriteSupportProvider
if !disabledSources.contains(w.getClass.getCanonicalName) =>
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
w, df.sparkSession.sessionState.conf)
options = sessionOptions ++ options
w
case _ =>
val ds = DataSource(
df.sparkSession,
className = table.provider.get,
options = options,
partitionColumns = normalizedParCols)
ds.createSink(outputMode)
}

sparkSession.sessionState.streamingQueryManager.startQuery(
sqlConf.sqlStreamQueryName,
None,
df,
table.storage.properties,
sink,
outputMode,
trigger = parseTrigger()
).awaitTermination()

Seq.empty[Row]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// it to Hive. If it fails, treat it as not hive compatible and go back to 2.1
val tableProperties = tableMetaToTableProps(table)

if (table.isStreaming) {
tableProperties.put(DATASOURCE_STREAM_TABLE, "true")
}

// put table provider and partition provider in table properties.
tableProperties.put(DATASOURCE_PROVIDER, provider)
if (table.tracksPartitionsInCatalog) {
Expand Down Expand Up @@ -1332,6 +1336,7 @@ object HiveExternalCatalog {

val HIVE_GENERATED_TABLE_PROPERTIES = Set(DDL_TIME)
val HIVE_GENERATED_STORAGE_PROPERTIES = Set(SERIALIZATION_FORMAT)
val DATASOURCE_STREAM_TABLE = DATASOURCE_PREFIX + "isStreaming"

// When storing data source tables in hive metastore, we need to set data schema to empty if the
// schema is hive-incompatible. However we need a hack to preserve existing behavior. Before
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils

class StreamTableDDLCommandSuite extends SQLTestUtils with TestHiveSingleton {
private val catalog = spark.sessionState.catalog

test("CTAS: create data source stream table") {
withTempPath { dir =>
withTable("t") {
sql(
s"""CREATE TABLE t USING PARQUET
|OPTIONS (
|PATH = '${dir.toURI}',
|location = '${dir.toURI}',
|isStreaming = 'true')
|AS SELECT 1 AS a, 2 AS b, 3 AS c
""".stripMargin)
val streamTable = catalog.getTableMetadata(TableIdentifier("t"))
assert(streamTable.isStreaming)
}
}
}
}