Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,10 @@ object FileFormatWriter extends Logging {
override def execute(iter: Iterator[InternalRow]): Set[String] = {
var fileCounter = 0
var recordsInFile: Long = 0L
newOutputWriter(fileCounter)
// Skip the empty partition to avoid creating a mass of 'empty' files.
if (iter.hasNext) {
newOutputWriter(fileCounter)
Copy link
Member

@HyukjinKwon HyukjinKwon Mar 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I proposed the similar PR before (about a year ago?) but got reverted. In this case, Parquet would not write out the footer and schema information. Namely, this will break the case below:

spark.range(100).filter("id > 100").write.parquet("/tmp/abc")
spark.read.parquet("/tmp/abc").show()

Up to my knowledge, we don't have test cases for them if I haven't missed related PRs it seems now there is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon IIUC, this case should fail as expected, as there is no output. Am i missing something?

spark.range(100).filter("id > 100").write.parquet("/tmp/abc")
spark.read.parquet("/tmp/abc").show()

Copy link
Member

@HyukjinKwon HyukjinKwon Mar 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading empty data should be fine too. It should preserve the schema. I am pretty sure that we want this case because mine was reverted due to the case above.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your prompt. How about just left one empty file containing the metadata when df has empty partition? Furthmore, we may just left one metadata file?

Copy link
Member

@HyukjinKwon HyukjinKwon Mar 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was thinking in that way. I remember I did several tries at that time but failed to make a confident fix, and could not have some time to work on that further.

Another problem is, it might be related with a datasource-specific issue because, for example, ORC does not write out empty df. For example,

scala> spark.range(100).filter("id > 100").write.orc("/tmp/abc1")

scala> spark.read.orc("/tmp/abc1").show()
org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC. It must be specified manually.;
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:182)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:182)

This issue is described in https://issues.apache.org/jira/browse/SPARK-15474.

FWIW, I happened to see https://issues.apache.org/jira/browse/SPARK-15693 around that time and I kind of felt we may be able to consolidate this issue with it although it is a rough idea.

}
while (iter.hasNext) {
if (description.maxRecordsPerFile > 0 && recordsInFile >= description.maxRecordsPerFile) {
fileCounter += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,10 @@ class ParquetFileFormat
// initialized.
private val parquetLogRedirector = ParquetLogRedirector.INSTANCE

override def newInstance(
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
override def newInstance(
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
new ParquetOutputWriter(path, context)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.streaming.{MemoryStream, MetadataLogFileIndex}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType, TimestampType}
import org.apache.spark.util.Utils

class FileStreamSinkSuite extends StreamTest {
Expand Down Expand Up @@ -172,6 +172,10 @@ class FileStreamSinkSuite extends StreamTest {
.format("parquet")
.start(outputDir)

val userDefinedSchema = new StructType()
.add("start", TimestampType, nullable = true)
.add("end", TimestampType, nullable = true)
.add("count", LongType, nullable = true)

def addTimestamp(timestampInSecs: Int*): Unit = {
inputData.addData(timestampInSecs.map(_ * 1L): _*)
Expand All @@ -181,7 +185,7 @@ class FileStreamSinkSuite extends StreamTest {
}

def check(expectedResult: ((Long, Long), Long)*): Unit = {
val outputDf = spark.read.parquet(outputDir)
val outputDf = spark.read.schema(userDefinedSchema).parquet(outputDir)
.selectExpr(
"CAST(start as BIGINT) AS start",
"CAST(end as BIGINT) AS end",
Expand Down Expand Up @@ -212,7 +216,6 @@ class FileStreamSinkSuite extends StreamTest {

test("Update and Complete output mode not supported") {
val df = MemoryStream[Int].toDF().groupBy().count()
val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath

withTempDir { dir =>

Expand Down