diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index 93bef8d5bb7e..31fcfabb9cac 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -493,7 +493,7 @@ returned by `SparkSession.readStream()`. In [R](api/R/read.stream.html), with th
#### Input Sources
There are a few built-in sources.
- - **File source** - Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations.
+ - **File source** - Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, orc, parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations.
- **Kafka source** - Reads data from Kafka. It's compatible with Kafka broker versions 0.10.0 or higher. See the [Kafka Integration Guide](structured-streaming-kafka-integration.html) for more details.
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 0cf702143c77..d0aba28788ac 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -490,6 +490,23 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
else:
raise TypeError("path can be only a single string")
+ @since(2.3)
+ def orc(self, path):
+ """Loads a ORC file stream, returning the result as a :class:`DataFrame`.
+
+ .. note:: Evolving.
+
+ >>> orc_sdf = spark.readStream.schema(sdf_schema).orc(tempfile.mkdtemp())
+ >>> orc_sdf.isStreaming
+ True
+ >>> orc_sdf.schema == sdf_schema
+ True
+ """
+ if isinstance(path, basestring):
+ return self._df(self._jreader.orc(path))
+ else:
+ raise TypeError("path can be only a single string")
+
@since(2.0)
def parquet(self, path):
"""Loads a Parquet file stream, returning the result as a :class:`DataFrame`.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index a42e28053a96..41aa02c2b5e3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -298,6 +298,21 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
*/
def csv(path: String): DataFrame = format("csv").load(path)
+ /**
+ * Loads a ORC file stream, returning the result as a `DataFrame`.
+ *
+ * You can set the following ORC-specific option(s) for reading ORC files:
+ *
+ * - `maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be
+ * considered in every trigger.
+ *
+ *
+ * @since 2.3.0
+ */
+ def orc(path: String): DataFrame = {
+ format("orc").load(path)
+ }
+
/**
* Loads a Parquet file stream, returning the result as a `DataFrame`.
*
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 08db06b94904..2a2552211857 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -305,6 +305,10 @@ class FileStreamSinkSuite extends StreamTest {
testFormat(Some("parquet"))
}
+ test("orc") {
+ testFormat(Some("orc"))
+ }
+
test("text") {
testFormat(Some("text"))
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index b6baaed1927e..af18e03a7c34 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -86,6 +86,28 @@ abstract class FileStreamSourceTest
}
}
+ case class AddOrcFileData(data: DataFrame, src: File, tmp: File) extends AddFileData {
+ override def addData(source: FileStreamSource): Unit = {
+ AddOrcFileData.writeToFile(data, src, tmp)
+ }
+ }
+
+ object AddOrcFileData {
+ def apply(seq: Seq[String], src: File, tmp: File): AddOrcFileData = {
+ AddOrcFileData(seq.toDS().toDF(), src, tmp)
+ }
+
+ /** Write orc files in a temp dir, and move the individual files to the 'src' dir */
+ def writeToFile(df: DataFrame, src: File, tmp: File): Unit = {
+ val tmpDir = Utils.tempFileWith(new File(tmp, "orc"))
+ df.write.orc(tmpDir.getCanonicalPath)
+ src.mkdirs()
+ tmpDir.listFiles().foreach { f =>
+ f.renameTo(new File(src, s"${f.getName}"))
+ }
+ }
+ }
+
case class AddParquetFileData(data: DataFrame, src: File, tmp: File) extends AddFileData {
override def addData(source: FileStreamSource): Unit = {
AddParquetFileData.writeToFile(data, src, tmp)
@@ -248,6 +270,42 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}
+ // =============== ORC file stream schema tests ================
+
+ test("FileStreamSource schema: orc, existing files, no schema") {
+ withTempDir { src =>
+ Seq("a", "b", "c").toDS().as("userColumn").toDF().write
+ .mode(org.apache.spark.sql.SaveMode.Overwrite)
+ .orc(src.getCanonicalPath)
+
+ // Without schema inference, should throw error
+ withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "false") {
+ intercept[IllegalArgumentException] {
+ createFileStreamSourceAndGetSchema(
+ format = Some("orc"), path = Some(src.getCanonicalPath), schema = None)
+ }
+ }
+
+ // With schema inference, should infer correct schema
+ withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
+ val schema = createFileStreamSourceAndGetSchema(
+ format = Some("orc"), path = Some(src.getCanonicalPath), schema = None)
+ assert(schema === new StructType().add("value", StringType))
+ }
+ }
+ }
+
+ test("FileStreamSource schema: orc, existing files, schema") {
+ withTempPath { src =>
+ Seq("a", "b", "c").toDS().as("oldUserColumn").toDF()
+ .write.orc(new File(src, "1").getCanonicalPath)
+ val userSchema = new StructType().add("userColumn", StringType)
+ val schema = createFileStreamSourceAndGetSchema(
+ format = Some("orc"), path = Some(src.getCanonicalPath), schema = Some(userSchema))
+ assert(schema === userSchema)
+ }
+ }
+
// =============== Parquet file stream schema tests ================
test("FileStreamSource schema: parquet, existing files, no schema") {
@@ -507,6 +565,59 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}
+ // =============== ORC file stream tests ================
+
+ test("read from orc files") {
+ withTempDirs { case (src, tmp) =>
+ val fileStream = createFileStream("orc", src.getCanonicalPath, Some(valueSchema))
+ val filtered = fileStream.filter($"value" contains "keep")
+
+ testStream(filtered)(
+ AddOrcFileData(Seq("drop1", "keep2", "keep3"), src, tmp),
+ CheckAnswer("keep2", "keep3"),
+ StopStream,
+ AddOrcFileData(Seq("drop4", "keep5", "keep6"), src, tmp),
+ StartStream(),
+ CheckAnswer("keep2", "keep3", "keep5", "keep6"),
+ AddOrcFileData(Seq("drop7", "keep8", "keep9"), src, tmp),
+ CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
+ )
+ }
+ }
+
+ test("read from orc files with changing schema") {
+ withTempDirs { case (src, tmp) =>
+ withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
+
+ // Add a file so that we can infer its schema
+ AddOrcFileData.writeToFile(Seq("value0").toDF("k"), src, tmp)
+
+ val fileStream = createFileStream("orc", src.getCanonicalPath)
+
+ // FileStreamSource should infer the column "k"
+ assert(fileStream.schema === StructType(Seq(StructField("k", StringType))))
+
+ // After creating DF and before starting stream, add data with different schema
+ // Should not affect the inferred schema any more
+ AddOrcFileData.writeToFile(Seq(("value1", 0)).toDF("k", "v"), src, tmp)
+
+ testStream(fileStream)(
+ // Should not pick up column v in the file added before start
+ AddOrcFileData(Seq("value2").toDF("k"), src, tmp),
+ CheckAnswer("value0", "value1", "value2"),
+
+ // Should read data in column k, and ignore v
+ AddOrcFileData(Seq(("value3", 1)).toDF("k", "v"), src, tmp),
+ CheckAnswer("value0", "value1", "value2", "value3"),
+
+ // Should ignore rows that do not have the necessary k column
+ AddOrcFileData(Seq("value5").toDF("v"), src, tmp),
+ CheckAnswer("value0", "value1", "value2", "value3", null)
+ )
+ }
+ }
+ }
+
// =============== Parquet file stream tests ================
test("read from parquet files") {