Skip to content

Commit 66475b7

Browse files
committed
[SPARK-22781][SS] Support creating streaming dataset with ORC file format
1 parent 2a29a60 commit 66475b7

File tree

3 files changed

+121
-1
lines changed

3 files changed

+121
-1
lines changed

docs/structured-streaming-programming-guide.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -493,7 +493,7 @@ returned by `SparkSession.readStream()`. In [R](api/R/read.stream.html), with th
493493
#### Input Sources
494494
There are a few built-in sources.
495495

496-
- **File source** - Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations.
496+
- **File source** - Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, orc, parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations.
497497

498498
- **Kafka source** - Reads data from Kafka. It's compatible with Kafka broker versions 0.10.0 or higher. See the [Kafka Integration Guide](structured-streaming-kafka-integration.html) for more details.
499499

sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,15 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
298298
*/
299299
def csv(path: String): DataFrame = format("csv").load(path)
300300

301+
/**
302+
* Loads a ORC file stream, returning the result as a `DataFrame`.
303+
*
304+
* @since 2.3.0
305+
*/
306+
def orc(path: String): DataFrame = {
307+
format("orc").load(path)
308+
}
309+
301310
/**
302311
* Loads a Parquet file stream, returning the result as a `DataFrame`.
303312
*

sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,28 @@ abstract class FileStreamSourceTest
8686
}
8787
}
8888

89+
case class AddOrcFileData(data: DataFrame, src: File, tmp: File) extends AddFileData {
90+
override def addData(source: FileStreamSource): Unit = {
91+
AddOrcFileData.writeToFile(data, src, tmp)
92+
}
93+
}
94+
95+
object AddOrcFileData {
96+
def apply(seq: Seq[String], src: File, tmp: File): AddOrcFileData = {
97+
AddOrcFileData(seq.toDS().toDF(), src, tmp)
98+
}
99+
100+
/** Write orc files in a temp dir, and move the individual files to the 'src' dir */
101+
def writeToFile(df: DataFrame, src: File, tmp: File): Unit = {
102+
val tmpDir = Utils.tempFileWith(new File(tmp, "orc"))
103+
df.write.orc(tmpDir.getCanonicalPath)
104+
src.mkdirs()
105+
tmpDir.listFiles().foreach { f =>
106+
f.renameTo(new File(src, s"${f.getName}"))
107+
}
108+
}
109+
}
110+
89111
case class AddParquetFileData(data: DataFrame, src: File, tmp: File) extends AddFileData {
90112
override def addData(source: FileStreamSource): Unit = {
91113
AddParquetFileData.writeToFile(data, src, tmp)
@@ -248,6 +270,42 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
248270
}
249271
}
250272

273+
// =============== ORC file stream schema tests ================
274+
275+
test("FileStreamSource schema: orc, existing files, no schema") {
276+
withTempDir { src =>
277+
Seq("a", "b", "c").toDS().as("userColumn").toDF().write
278+
.mode(org.apache.spark.sql.SaveMode.Overwrite)
279+
.orc(src.getCanonicalPath)
280+
281+
// Without schema inference, should throw error
282+
withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "false") {
283+
intercept[IllegalArgumentException] {
284+
createFileStreamSourceAndGetSchema(
285+
format = Some("orc"), path = Some(src.getCanonicalPath), schema = None)
286+
}
287+
}
288+
289+
// With schema inference, should infer correct schema
290+
withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
291+
val schema = createFileStreamSourceAndGetSchema(
292+
format = Some("orc"), path = Some(src.getCanonicalPath), schema = None)
293+
assert(schema === new StructType().add("value", StringType))
294+
}
295+
}
296+
}
297+
298+
test("FileStreamSource schema: orc, existing files, schema") {
299+
withTempPath { src =>
300+
Seq("a", "b", "c").toDS().as("oldUserColumn").toDF()
301+
.write.orc(new File(src, "1").getCanonicalPath)
302+
val userSchema = new StructType().add("userColumn", StringType)
303+
val schema = createFileStreamSourceAndGetSchema(
304+
format = Some("orc"), path = Some(src.getCanonicalPath), schema = Some(userSchema))
305+
assert(schema === userSchema)
306+
}
307+
}
308+
251309
// =============== Parquet file stream schema tests ================
252310

253311
test("FileStreamSource schema: parquet, existing files, no schema") {
@@ -507,6 +565,59 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
507565
}
508566
}
509567

568+
// =============== ORC file stream tests ================
569+
570+
test("read from orc files") {
571+
withTempDirs { case (src, tmp) =>
572+
val fileStream = createFileStream("orc", src.getCanonicalPath, Some(valueSchema))
573+
val filtered = fileStream.filter($"value" contains "keep")
574+
575+
testStream(filtered)(
576+
AddOrcFileData(Seq("drop1", "keep2", "keep3"), src, tmp),
577+
CheckAnswer("keep2", "keep3"),
578+
StopStream,
579+
AddOrcFileData(Seq("drop4", "keep5", "keep6"), src, tmp),
580+
StartStream(),
581+
CheckAnswer("keep2", "keep3", "keep5", "keep6"),
582+
AddOrcFileData(Seq("drop7", "keep8", "keep9"), src, tmp),
583+
CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
584+
)
585+
}
586+
}
587+
588+
test("read from orc files with changing schema") {
589+
withTempDirs { case (src, tmp) =>
590+
withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
591+
592+
// Add a file so that we can infer its schema
593+
AddOrcFileData.writeToFile(Seq("value0").toDF("k"), src, tmp)
594+
595+
val fileStream = createFileStream("orc", src.getCanonicalPath)
596+
597+
// FileStreamSource should infer the column "k"
598+
assert(fileStream.schema === StructType(Seq(StructField("k", StringType))))
599+
600+
// After creating DF and before starting stream, add data with different schema
601+
// Should not affect the inferred schema any more
602+
AddOrcFileData.writeToFile(Seq(("value1", 0)).toDF("k", "v"), src, tmp)
603+
604+
testStream(fileStream)(
605+
// Should not pick up column v in the file added before start
606+
AddOrcFileData(Seq("value2").toDF("k"), src, tmp),
607+
CheckAnswer("value0", "value1", "value2"),
608+
609+
// Should read data in column k, and ignore v
610+
AddOrcFileData(Seq(("value3", 1)).toDF("k", "v"), src, tmp),
611+
CheckAnswer("value0", "value1", "value2", "value3"),
612+
613+
// Should ignore rows that do not have the necessary k column
614+
AddOrcFileData(Seq("value5").toDF("v"), src, tmp),
615+
CheckAnswer("value0", "value1", "value2", "value3", null)
616+
)
617+
}
618+
}
619+
}
620+
510621
// =============== Parquet file stream tests ================
511622

512623
test("read from parquet files") {

0 commit comments

Comments
 (0)