Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ returned by `SparkSession.readStream()`. In [R](api/R/read.stream.html), with th
#### Input Sources
There are a few built-in sources.

- **File source** - Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations.
- **File source** - Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, orc, parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations.

- **Kafka source** - Reads data from Kafka. It's compatible with Kafka broker versions 0.10.0 or higher. See the [Kafka Integration Guide](structured-streaming-kafka-integration.html) for more details.

Expand Down
17 changes: 17 additions & 0 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,23 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
else:
raise TypeError("path can be only a single string")

@since(2.3)
def orc(self, path):
"""Loads a ORC file stream, returning the result as a :class:`DataFrame`.

.. note:: Evolving.

>>> orc_sdf = spark.readStream.schema(sdf_schema).orc(tempfile.mkdtemp())
>>> orc_sdf.isStreaming
True
>>> orc_sdf.schema == sdf_schema
True
"""
if isinstance(path, basestring):
return self._df(self._jreader.orc(path))
else:
raise TypeError("path can be only a single string")

@since(2.0)
def parquet(self, path):
"""Loads a Parquet file stream, returning the result as a :class:`DataFrame`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,21 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
*/
def csv(path: String): DataFrame = format("csv").load(path)

/**
* Loads a ORC file stream, returning the result as a `DataFrame`.
*
* You can set the following ORC-specific option(s) for reading ORC files:
* <ul>
* <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be
* considered in every trigger.</li>
* </ul>
*
* @since 2.3.0
*/
def orc(path: String): DataFrame = {
format("orc").load(path)
}

/**
* Loads a Parquet file stream, returning the result as a `DataFrame`.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,10 @@ class FileStreamSinkSuite extends StreamTest {
testFormat(Some("parquet"))
}

test("orc") {
testFormat(Some("orc"))
}

test("text") {
testFormat(Some("text"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,28 @@ abstract class FileStreamSourceTest
}
}

case class AddOrcFileData(data: DataFrame, src: File, tmp: File) extends AddFileData {
override def addData(source: FileStreamSource): Unit = {
AddOrcFileData.writeToFile(data, src, tmp)
}
}

object AddOrcFileData {
def apply(seq: Seq[String], src: File, tmp: File): AddOrcFileData = {
AddOrcFileData(seq.toDS().toDF(), src, tmp)
}

/** Write orc files in a temp dir, and move the individual files to the 'src' dir */
def writeToFile(df: DataFrame, src: File, tmp: File): Unit = {
val tmpDir = Utils.tempFileWith(new File(tmp, "orc"))
df.write.orc(tmpDir.getCanonicalPath)
src.mkdirs()
tmpDir.listFiles().foreach { f =>
f.renameTo(new File(src, s"${f.getName}"))
}
}
}

case class AddParquetFileData(data: DataFrame, src: File, tmp: File) extends AddFileData {
override def addData(source: FileStreamSource): Unit = {
AddParquetFileData.writeToFile(data, src, tmp)
Expand Down Expand Up @@ -248,6 +270,42 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}

// =============== ORC file stream schema tests ================

test("FileStreamSource schema: orc, existing files, no schema") {
withTempDir { src =>
Seq("a", "b", "c").toDS().as("userColumn").toDF().write
.mode(org.apache.spark.sql.SaveMode.Overwrite)
.orc(src.getCanonicalPath)

// Without schema inference, should throw error
withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "false") {
intercept[IllegalArgumentException] {
createFileStreamSourceAndGetSchema(
format = Some("orc"), path = Some(src.getCanonicalPath), schema = None)
}
}

// With schema inference, should infer correct schema
withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
val schema = createFileStreamSourceAndGetSchema(
format = Some("orc"), path = Some(src.getCanonicalPath), schema = None)
assert(schema === new StructType().add("value", StringType))
}
}
}

test("FileStreamSource schema: orc, existing files, schema") {
withTempPath { src =>
Seq("a", "b", "c").toDS().as("oldUserColumn").toDF()
.write.orc(new File(src, "1").getCanonicalPath)
val userSchema = new StructType().add("userColumn", StringType)
val schema = createFileStreamSourceAndGetSchema(
format = Some("orc"), path = Some(src.getCanonicalPath), schema = Some(userSchema))
assert(schema === userSchema)
}
}

// =============== Parquet file stream schema tests ================

test("FileStreamSource schema: parquet, existing files, no schema") {
Expand Down Expand Up @@ -507,6 +565,59 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}

// =============== ORC file stream tests ================

test("read from orc files") {
withTempDirs { case (src, tmp) =>
val fileStream = createFileStream("orc", src.getCanonicalPath, Some(valueSchema))
val filtered = fileStream.filter($"value" contains "keep")

testStream(filtered)(
AddOrcFileData(Seq("drop1", "keep2", "keep3"), src, tmp),
CheckAnswer("keep2", "keep3"),
StopStream,
AddOrcFileData(Seq("drop4", "keep5", "keep6"), src, tmp),
StartStream(),
CheckAnswer("keep2", "keep3", "keep5", "keep6"),
AddOrcFileData(Seq("drop7", "keep8", "keep9"), src, tmp),
CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
)
}
}

test("read from orc files with changing schema") {
withTempDirs { case (src, tmp) =>
withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {

// Add a file so that we can infer its schema
AddOrcFileData.writeToFile(Seq("value0").toDF("k"), src, tmp)

val fileStream = createFileStream("orc", src.getCanonicalPath)

// FileStreamSource should infer the column "k"
assert(fileStream.schema === StructType(Seq(StructField("k", StringType))))

// After creating DF and before starting stream, add data with different schema
// Should not affect the inferred schema any more
AddOrcFileData.writeToFile(Seq(("value1", 0)).toDF("k", "v"), src, tmp)

testStream(fileStream)(
// Should not pick up column v in the file added before start
AddOrcFileData(Seq("value2").toDF("k"), src, tmp),
CheckAnswer("value0", "value1", "value2"),

// Should read data in column k, and ignore v
AddOrcFileData(Seq(("value3", 1)).toDF("k", "v"), src, tmp),
CheckAnswer("value0", "value1", "value2", "value3"),

// Should ignore rows that do not have the necessary k column
AddOrcFileData(Seq("value5").toDF("v"), src, tmp),
CheckAnswer("value0", "value1", "value2", "value3", null)
)
}
}
}

// =============== Parquet file stream tests ================

test("read from parquet files") {
Expand Down