From 66475b737e2db96cb2bf491cf22d133bdcfe9b02 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 13 Dec 2017 20:14:53 -0800 Subject: [PATCH 1/4] [SPARK-22781][SS] Support creating streaming dataset with ORC file format --- .../structured-streaming-programming-guide.md | 2 +- .../sql/streaming/DataStreamReader.scala | 9 ++ .../sql/streaming/FileStreamSourceSuite.scala | 111 ++++++++++++++++++ 3 files changed, 121 insertions(+), 1 deletion(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 93bef8d5bb7e2..31fcfabb9cacc 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -493,7 +493,7 @@ returned by `SparkSession.readStream()`. In [R](api/R/read.stream.html), with th #### Input Sources There are a few built-in sources. - - **File source** - Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations. + - **File source** - Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, orc, parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations. - **Kafka source** - Reads data from Kafka. It's compatible with Kafka broker versions 0.10.0 or higher. See the [Kafka Integration Guide](structured-streaming-kafka-integration.html) for more details. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index a42e28053a96a..4d37a28c1ad83 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -298,6 +298,15 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo */ def csv(path: String): DataFrame = format("csv").load(path) + /** + * Loads a ORC file stream, returning the result as a `DataFrame`. + * + * @since 2.3.0 + */ + def orc(path: String): DataFrame = { + format("orc").load(path) + } + /** * Loads a Parquet file stream, returning the result as a `DataFrame`. * diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index b6baaed1927e4..af18e03a7c34d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -86,6 +86,28 @@ abstract class FileStreamSourceTest } } + case class AddOrcFileData(data: DataFrame, src: File, tmp: File) extends AddFileData { + override def addData(source: FileStreamSource): Unit = { + AddOrcFileData.writeToFile(data, src, tmp) + } + } + + object AddOrcFileData { + def apply(seq: Seq[String], src: File, tmp: File): AddOrcFileData = { + AddOrcFileData(seq.toDS().toDF(), src, tmp) + } + + /** Write orc files in a temp dir, and move the individual files to the 'src' dir */ + def writeToFile(df: DataFrame, src: File, tmp: File): Unit = { + val tmpDir = Utils.tempFileWith(new File(tmp, "orc")) + df.write.orc(tmpDir.getCanonicalPath) + src.mkdirs() + tmpDir.listFiles().foreach { f => + f.renameTo(new File(src, s"${f.getName}")) + } + } + } + case class AddParquetFileData(data: DataFrame, src: File, tmp: File) extends AddFileData { override def addData(source: FileStreamSource): Unit = { AddParquetFileData.writeToFile(data, src, tmp) @@ -248,6 +270,42 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + // =============== ORC file stream schema tests ================ + + test("FileStreamSource schema: orc, existing files, no schema") { + withTempDir { src => + Seq("a", "b", "c").toDS().as("userColumn").toDF().write + .mode(org.apache.spark.sql.SaveMode.Overwrite) + .orc(src.getCanonicalPath) + + // Without schema inference, should throw error + withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "false") { + intercept[IllegalArgumentException] { + createFileStreamSourceAndGetSchema( + format = Some("orc"), path = Some(src.getCanonicalPath), schema = None) + } + } + + // With schema inference, should infer correct schema + withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { + val schema = createFileStreamSourceAndGetSchema( + format = Some("orc"), path = Some(src.getCanonicalPath), schema = None) + assert(schema === new StructType().add("value", StringType)) + } + } + } + + test("FileStreamSource schema: orc, existing files, schema") { + withTempPath { src => + Seq("a", "b", "c").toDS().as("oldUserColumn").toDF() + .write.orc(new File(src, "1").getCanonicalPath) + val userSchema = new StructType().add("userColumn", StringType) + val schema = createFileStreamSourceAndGetSchema( + format = Some("orc"), path = Some(src.getCanonicalPath), schema = Some(userSchema)) + assert(schema === userSchema) + } + } + // =============== Parquet file stream schema tests ================ test("FileStreamSource schema: parquet, existing files, no schema") { @@ -507,6 +565,59 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + // =============== ORC file stream tests ================ + + test("read from orc files") { + withTempDirs { case (src, tmp) => + val fileStream = createFileStream("orc", src.getCanonicalPath, Some(valueSchema)) + val filtered = fileStream.filter($"value" contains "keep") + + testStream(filtered)( + AddOrcFileData(Seq("drop1", "keep2", "keep3"), src, tmp), + CheckAnswer("keep2", "keep3"), + StopStream, + AddOrcFileData(Seq("drop4", "keep5", "keep6"), src, tmp), + StartStream(), + CheckAnswer("keep2", "keep3", "keep5", "keep6"), + AddOrcFileData(Seq("drop7", "keep8", "keep9"), src, tmp), + CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9") + ) + } + } + + test("read from orc files with changing schema") { + withTempDirs { case (src, tmp) => + withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { + + // Add a file so that we can infer its schema + AddOrcFileData.writeToFile(Seq("value0").toDF("k"), src, tmp) + + val fileStream = createFileStream("orc", src.getCanonicalPath) + + // FileStreamSource should infer the column "k" + assert(fileStream.schema === StructType(Seq(StructField("k", StringType)))) + + // After creating DF and before starting stream, add data with different schema + // Should not affect the inferred schema any more + AddOrcFileData.writeToFile(Seq(("value1", 0)).toDF("k", "v"), src, tmp) + + testStream(fileStream)( + // Should not pick up column v in the file added before start + AddOrcFileData(Seq("value2").toDF("k"), src, tmp), + CheckAnswer("value0", "value1", "value2"), + + // Should read data in column k, and ignore v + AddOrcFileData(Seq(("value3", 1)).toDF("k", "v"), src, tmp), + CheckAnswer("value0", "value1", "value2", "value3"), + + // Should ignore rows that do not have the necessary k column + AddOrcFileData(Seq("value5").toDF("v"), src, tmp), + CheckAnswer("value0", "value1", "value2", "value3", null) + ) + } + } + } + // =============== Parquet file stream tests ================ test("read from parquet files") { From 1ab78ed24c81903ae33d48f34d2e89f0a4eaa24e Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 13 Dec 2017 20:38:46 -0800 Subject: [PATCH 2/4] Add a ORC test case into FileStreamSinkSuite, too. --- .../org/apache/spark/sql/streaming/FileStreamSinkSuite.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 08db06b94904b..2a2552211857a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -305,6 +305,10 @@ class FileStreamSinkSuite extends StreamTest { testFormat(Some("parquet")) } + test("orc") { + testFormat(Some("orc")) + } + test("text") { testFormat(Some("text")) } From d595aebe5139fc190492d5033ee6c0fd7fc8b076 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 13 Dec 2017 20:45:47 -0800 Subject: [PATCH 3/4] Add a supported option. --- .../org/apache/spark/sql/streaming/DataStreamReader.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 4d37a28c1ad83..41aa02c2b5e35 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -301,6 +301,12 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo /** * Loads a ORC file stream, returning the result as a `DataFrame`. * + * You can set the following ORC-specific option(s) for reading ORC files: + *
    + *
  • `maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be + * considered in every trigger.
  • + *
+ * * @since 2.3.0 */ def orc(path: String): DataFrame = { From 29094cc79d0025989df34061832b908880c64323 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 14 Dec 2017 00:12:10 -0800 Subject: [PATCH 4/4] Add python api, too. --- python/pyspark/sql/streaming.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 0cf702143c773..d0aba28788ac9 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -490,6 +490,23 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, else: raise TypeError("path can be only a single string") + @since(2.3) + def orc(self, path): + """Loads a ORC file stream, returning the result as a :class:`DataFrame`. + + .. note:: Evolving. + + >>> orc_sdf = spark.readStream.schema(sdf_schema).orc(tempfile.mkdtemp()) + >>> orc_sdf.isStreaming + True + >>> orc_sdf.schema == sdf_schema + True + """ + if isinstance(path, basestring): + return self._df(self._jreader.orc(path)) + else: + raise TypeError("path can be only a single string") + @since(2.0) def parquet(self, path): """Loads a Parquet file stream, returning the result as a :class:`DataFrame`.