Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented Jan 28, 2017

What changes were proposed in this pull request?

As reported, there is a sorting issue on relatively big datasets. This issue is not seen when using a smaller sized dataset.

For example, when trying to save a sorted dataset like:

val inpth = "...."
spark
    .read
    .option("inferSchema", "true")
    .option("header", "true")
    .csv(inpth)
    .repartition($"userId")
    .sortWithinPartitions("timestamp")
    .write
    .partitionBy("userId")
    .option("header", "true")
    .csv(inpth + "_sorted")

On relatively big datasets, the data will not be sorted anymore after writing out.

That is because when we write partitioned dataset, we will use UnsafeKVExternalSorter to sort data before writing. It sorts data based on partition columns + bucketId + sortBy columns. The sort ordering of the data to write is not considered.

Even in the above case where there is not bucket and sortBy, and data is already partitioned by "userId". In theory we won't change the order of data, but we still can't get correctly sorted data in output files on relatively big datasets. That is because if the data is big enough, UnsafeKVExternalSorter will spill data to files. When we merge the spilled files, we will interlace the readers of spilled files. Thus the order of data in the spilled files is not guaranteed, if we don't explicitly ask to sort on specified order.

This change extends `UnsafeKVExternalSorter` to accept a `List[SortOrder]` parameter used to control how the keys are sorted. Then it considers the output ordering of the data to write when constructing the `UnsafeKVExternalSorter`.

Note: one thing needs to discuss is when we have sortBy column with bucket, do we need to keep output sort ordering and how to do it? Should we sort by partition columns + bucketId + data sort ordering + sortBy columns? Or partition columns + bucketId + sort columns + data sort ordering?

With the goal not to change or add implicit penalty to existing API, this patch solves the issue of preserving sort order of written data. After this patch, you can save sorted and partitioned data as:

spark
    .read
    .option("inferSchema", "true")
    .option("header", "true")
    .csv(inpth)
    .repartition($"userId")
    .sortWithinPartitions($"userId", "timestamp")  // Explicitly ask to sort by partition column(s)
    .write
    .partitionBy("userId")
    .option("header", "true")
    .csv(inpth + "_sorted")

Once the data to write is already sorted by the partition columns and no bucketing is specified, FileFormatWriter will not sort the data again. And the sort order is preserved in output files.

How was this patch tested?

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a pull request

@viirya
Copy link
Member Author

viirya commented Jan 28, 2017

cc @cloud-fan @rxin @hvanhovell

@viirya viirya force-pushed the keep-sort-order-after-external-sorter branch from 93d3806 to b84c08b Compare January 28, 2017 02:30
@SparkQA
Copy link

SparkQA commented Jan 28, 2017

Test build #72096 has finished for PR 16724 at commit 93d3806.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya viirya force-pushed the keep-sort-order-after-external-sorter branch from b84c08b to aaa3c3d Compare January 28, 2017 04:22
@SparkQA
Copy link

SparkQA commented Jan 28, 2017

Test build #72097 has finished for PR 16724 at commit b84c08b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 28, 2017

Test build #72100 has finished for PR 16724 at commit aaa3c3d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya viirya force-pushed the keep-sort-order-after-external-sorter branch 2 times, most recently from 88a9e72 to c341cfa Compare January 28, 2017 13:46
@SparkQA
Copy link

SparkQA commented Jan 28, 2017

Test build #72106 has finished for PR 16724 at commit c341cfa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya viirya changed the title [SPARK-19352][WIP][SQL] Keep sort order of rows after external sorter when writing [SPARK-19352][SQL] Keep sort order of rows after external sorter when writing Jan 29, 2017
@viirya viirya force-pushed the keep-sort-order-after-external-sorter branch from c341cfa to 3c040b6 Compare January 29, 2017 01:38
@SparkQA
Copy link

SparkQA commented Jan 29, 2017

Test build #72116 has finished for PR 16724 at commit 3c040b6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Jan 29, 2017

retest this please.

@SparkQA
Copy link

SparkQA commented Jan 29, 2017

Test build #72119 has finished for PR 16724 at commit 3c040b6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

I don't think we should respect the ordering of input data in the writer. The current DataFrameWriter doesn't allow users to write data out orderly.

However, there is an opportunity for optimization: when the data is already partitioned, the writer doesn't need to sort the data by partition columns anymore.

@viirya
Copy link
Member Author

viirya commented Feb 3, 2017

A data save API which doesn't respect the ordering of data sounds strange for me. If this is true, then, extremely said, we can remove the final Sort operator if any, when we want to output the data. Besides, you can't perform a simple task like "write out the csv files for customers sorted by buying amount in each city".

Actually, when the data is not partitioned, then saving sorted data will keep the ordering, as we don't do the external sorting, if I think it correctly. For end-users, it might be hard for them to figure out why saving partitioned and sorted data can't keep the ordering.

@viirya
Copy link
Member Author

viirya commented Feb 3, 2017

The point of optimization is good for me. I will create another JIRA/PR for it if the current change is not considered to merge in the end.

@cloud-fan
Copy link
Contributor

I think it's hard to reason about how/when to preserve the data ordering when writing, considering partitioning and bucketing. For now it looks like the ordering should be preserved if the writer doesn't specify partitioning and bucketing, or the specified partitioning is same as the input data.

If we implement the optimization mentioned above, then we can preserve the ordering if the specified partitioning is same as the input data.

@igozali
Copy link
Contributor

igozali commented Feb 3, 2017

My original use case for sorting the output files based on timestamp using Spark was to use the output files with some other machine learning framework which might not readily work well with very large data files, like TensorFlow or Theano. The benefit that I was trying to get was to offload the sorting to Spark, since even if I ended up with large CSV files I could potentially mmap the CSV files to be used with the subsequent frameworks (TF/Theano).

I thought this could be a relatively common use case, but from the impressions I'm getting from this discussion, I wonder if this is not a paradigm that Spark supports or encourages?

@viirya
Copy link
Member Author

viirya commented Feb 4, 2017

@cloud-fan Yeah, I think it is reasonable that the ordering should be preserved if the writer doesn't specify partitioning and bucketing (we already did this now), or the specified partitioning is same as the input data (currently we don't do this). It is reasonable is because bucketing can be thought as a kind of data partition. And the data ordering after a data partition can't be guaranteed, I think.

@igozali I think you don't need to bucket the data in each partition, right?

@viirya
Copy link
Member Author

viirya commented Feb 10, 2017

@cloud-fan Even the specified partitioning is same as the input data, we still need the sorting. Because the rows with same partition values are not guaranteed to be continuous in all rows, so you can't write all rows of same partition values at once with an outputwriter.

@cloud-fan
Copy link
Contributor

oh good catch. Then it seems like df.repartition($"userId").sortWithinPartitions("timestamp") won't produce a result set as we expected.

So there is no way to write out partitioned sorted data currently, @viirya can you think of a workaround? Adding a new API maybe not a good idea

@viirya
Copy link
Member Author

viirya commented Feb 11, 2017

@cloud-fan Is the current change not suitable? We can change it to only preserve data order when specifying partitioning and no bucketing for the output.

This change only adds a new constructor to UnsafeKVExternalSorter. No other API change I think. As the data output is going through this external sorter, it definitely changes the data order without this change. I think we may not be able to preserve data order with a workaround which doesn't touch UnsafeKVExternalSorter.

@cloud-fan
Copy link
Contributor

If we admit that preserving the sort order is not guaranteed by the API, then the change in this PR is not reasonable, as it has performance penalty.

@viirya
Copy link
Member Author

viirya commented Feb 12, 2017

@cloud-fan OK. I see. If we don't want to add implicit penalty into the existing API, the only way I can think now, is a config to preserve the sort order. This config can be in SQLConf, or we can just have an option for it in DataFrameWriter like maxRecordsPerFile.

E.g.,

val df = spark.range(100)
  .select($"id", explode(array(col("id") + 1, col("id") + 2, col("id") + 3)).as("value"))
  .repartition($"id")
  .sortWithinPartitions($"value".desc).toDF()

df.write
  .option("perserveSortOrder", true) // default is false
  .partitionBy("id")
  .parquet(tempDir)

@cloud-fan
Copy link
Contributor

oh good catch. Then it seems like df.repartition($"userId").sortWithinPartitions("timestamp") won't produce a result set as we expected.

Just realized df.repartition($"userId").sortWithinPartitions("userId", "timestamp") will produce a result set as we expected, can we optimize this case?

@viirya
Copy link
Member Author

viirya commented Feb 12, 2017

Just realized df.repartition($"userId").sortWithinPartitions("userId", "timestamp") will produce a result set as we expected, can we optimize this case?

I may not understand you correctly. The result sets should be partitioned by "userId" and sorted by "timestamp". But in each partition, the rows with the same "userId" are not continuous.

But we want the rows with the same "userId" are continuous in each partition and their timestamp values are sorted.

@cloud-fan
Copy link
Contributor

sortWithinPartitions("userId", "timestamp") doesn't it make the userId continuous?

@viirya
Copy link
Member Author

viirya commented Feb 12, 2017

sortWithinPartitions("userId", "timestamp") doesn't it make the userId continuous?

Oh, yes. I miss looking...

So you recommend that we only optimize this case to preserve the sort order. Sounds good.

@SparkQA
Copy link

SparkQA commented Feb 12, 2017

Test build #72751 has finished for PR 16724 at commit b1ce030.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya viirya changed the title [SPARK-19352][SQL] Keep sort order of rows after external sorter when writing [SPARK-19352][SQL] Preserve sort order when saving dataset if data is sorted by partition columns Feb 12, 2017
description.dataColumns, description.allColumns)

override def execute(iter: Iterator[InternalRow]): Set[String] = {
val outputOrderingExprs = description.outputOrdering.map(_.child)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this duplicates too much code

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly is because there are two types of iterators, one is [UnsafeRow, UnsafeRow], another is just [UnsafeRow].

}
}

test("SPARK-19352: Keep sort order of rows after external sorter when writing") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, this is not guaranteed, we should not test it.

This is an optimization and advanced users can leverage this to preserve the sort order, but it may change in the future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it.

@cloud-fan
Copy link
Contributor

@viirya can you review #16898 instead? thanks!

@viirya
Copy link
Member Author

viirya commented Feb 12, 2017

@cloud-fan ok. I will look into it.

@viirya
Copy link
Member Author

viirya commented Feb 12, 2017

Close this in favor of #16898.

@viirya viirya closed this Feb 12, 2017
ghost pushed a commit to dbtsai/spark that referenced this pull request Feb 20, 2017
## What changes were proposed in this pull request?

In `FileFormatWriter`, we will sort the input rows by partition columns and bucket id and sort columns, if we want to write data out partitioned or bucketed.

However, if the data is already sorted, we will sort it again, which is unnecssary.

This PR removes the sorting logic in `FileFormatWriter` and use `SortExec` instead. We will not add `SortExec` if the data is already sorted.

## How was this patch tested?

I did a micro benchmark manually
```
val df = spark.range(10000000).select($"id", $"id" % 10 as "part").sort("part")
spark.time(df.write.partitionBy("part").parquet("/tmp/test"))
```
The result was about 6.4 seconds before this PR, and is 5.7 seconds afterwards.

close apache#16724

Author: Wenchen Fan <[email protected]>

Closes apache#16898 from cloud-fan/writer.
Yunni pushed a commit to Yunni/spark that referenced this pull request Feb 27, 2017
## What changes were proposed in this pull request?

In `FileFormatWriter`, we will sort the input rows by partition columns and bucket id and sort columns, if we want to write data out partitioned or bucketed.

However, if the data is already sorted, we will sort it again, which is unnecssary.

This PR removes the sorting logic in `FileFormatWriter` and use `SortExec` instead. We will not add `SortExec` if the data is already sorted.

## How was this patch tested?

I did a micro benchmark manually
```
val df = spark.range(10000000).select($"id", $"id" % 10 as "part").sort("part")
spark.time(df.write.partitionBy("part").parquet("/tmp/test"))
```
The result was about 6.4 seconds before this PR, and is 5.7 seconds afterwards.

close apache#16724

Author: Wenchen Fan <[email protected]>

Closes apache#16898 from cloud-fan/writer.
jzhuge pushed a commit to jzhuge/spark that referenced this pull request Aug 20, 2018
In `FileFormatWriter`, we will sort the input rows by partition columns and bucket id and sort columns, if we want to write data out partitioned or bucketed.

However, if the data is already sorted, we will sort it again, which is unnecssary.

This PR removes the sorting logic in `FileFormatWriter` and use `SortExec` instead. We will not add `SortExec` if the data is already sorted.

I did a micro benchmark manually
```
val df = spark.range(10000000).select($"id", $"id" % 10 as "part").sort("part")
spark.time(df.write.partitionBy("part").parquet("/tmp/test"))
```
The result was about 6.4 seconds before this PR, and is 5.7 seconds afterwards.

close apache#16724

Author: Wenchen Fan <[email protected]>

Closes apache#16898 from cloud-fan/writer.
@viirya viirya deleted the keep-sort-order-after-external-sorter branch December 27, 2023 18:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants