Skip to content

Conversation

@cloud-fan
Copy link
Contributor

This PR adds bucket write support to Spark SQL. User can specify bucketing columns, numBuckets and sorting columns with or without partition columns. For example:

df.write.partitionBy("year").bucketBy(8, "country").sortBy("amount").saveAsTable("sales")

When bucketing is used, we will calculate bucket id for each record, and group the records by bucket id. For each group, we will create a file with bucket id in its name, and write data into it. For each bucket file, if sorting columns are specified, the data will be sorted before write.

Note that there may be multiply files for one bucket, as the data is distributed.

Currently we store the bucket metadata at hive metastore in a non-hive-compatible way. We use different bucketing hash function compared to hive, so we can't be compatible anyway.

Limitations:

  • Can't write bucketed data without hive metastore.
  • Can't insert bucketed data into existing hive tables.

@cloud-fan
Copy link
Contributor Author

cc @yhuai @nongli

@SparkQA
Copy link

SparkQA commented Dec 28, 2015

Test build #48367 has finished for PR 10498 at commit 8cb2494.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class Hash(children: Seq[Expression]) extends Expression\n * class TextOutputWriter(\n

@yhuai
Copy link
Contributor

yhuai commented Dec 28, 2015

This one also includes #10435, right?

@nongli
Copy link
Contributor

nongli commented Dec 28, 2015

@cloud-fan

currently we don't shuffle before writing partitioned data, which means we will have same partition data in different RDD blocks, and that's why we have multi-files for one partition, and we will also have multi-files for one bucket, is that safe?

This is safe but how can we get in this state from a single write. There must have been a partitionBy before right?

hive support having bucketing without partitioning, should we support it?

Why not? If this hard to support?

@rxin
Copy link
Contributor

rxin commented Dec 28, 2015

BTW in github you can use square brackets to create a checklist, e.g.

- [ ] item a
- [ ] item b

becomes

  • item a
  • item b

@cloud-fan
Copy link
Contributor Author

This one also includes #10435, we can merge that first.

@SparkQA
Copy link

SparkQA commented Dec 29, 2015

Test build #48415 has finished for PR 10498 at commit a9dc997.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class BucketSpec(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and the @SInCE annotations. and comments. These are public APIs

@cloud-fan cloud-fan changed the title [SPARK-12539][SQL][WIP] support writing bucketed table [SPARK-12539][SQL] support writing bucketed table Dec 30, 2015
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method can be simplified to:

if (sortingColumns.isDefined) {
  require(numBuckets.isDefined, "sortBy must be used together with bucketBy")
}

for {
  n <- numBuckets
  cols <- normalizedBucketCols
} yield {
  require(n > 0, "Bucket number must be greater than 0.")
  BucketSpec(n, cols, normalizedSortCols)
}

(require throws IllegalArgumentException when the condition is not met.)

@SparkQA
Copy link

SparkQA commented Dec 30, 2015

Test build #48489 has finished for PR 10498 at commit d2dc9b3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little worried here. It was a simple != operator for non-bucket path before, but now it's a function call.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine. The rest of this path is much more expensive than this function call.

@SparkQA
Copy link

SparkQA commented Jan 6, 2016

Test build #48854 has finished for PR 10498 at commit d3200cf.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we put the bucket id at the very last? i.e. after the file extension, so that it's much easier to get the bucket id given a file name. e.g. part-r-00009-ea518ad4-455a-4431-b471-d24e03814677.gz.parquet.00002

cc @nongli

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion here. Let's go either way for now and talk another pass before shipping this. We should try this with hive as well just to get another data point.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having the bucket id at the very last could break some other applications, that rely on the file extension (to recognized the file format), so don't do that.

@SparkQA
Copy link

SparkQA commented Jan 6, 2016

Test build #48856 has finished for PR 10498 at commit 1afd3ee.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd remove this TODO.

@nongli
Copy link
Contributor

nongli commented Jan 6, 2016

This looks good to me to merge.

@davies
Copy link
Contributor

davies commented Jan 6, 2016

@cloud-fan Can we write bucketed table without partitions?

Just saw you have a test case for that, but didn't see you update the DefaultWriterContainer, how can that work?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor question: Why do we support bucketing for json writer? The bucketing can only be recognized by Spark SQL, in this case, parquet is much more efficient.

As we embed CSV into Spark, I think we don't need to support bucketing for that as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just because we can... I felt it's cheap to add bucketing support for JSON so I went for it.

@rxin
Copy link
Contributor

rxin commented Jan 7, 2016

I'm going to merge this. @cloud-fan can you create a follow-up pr to address some of the comments above?

@asfgit asfgit closed this in 917d3fc Jan 7, 2016
asfgit pushed a commit that referenced this pull request Jan 11, 2016
address comments in #10498 , especially #10498 (comment)

Author: Wenchen Fan <[email protected]>

This patch had conflicts when merged, resolved by
Committer: Reynold Xin <[email protected]>

Closes #10638 from cloud-fan/bucket-write.
@l15k4
Copy link

l15k4 commented May 21, 2016

Guys do you have a rough guess about when bucketing is to be implemented for org.apache.spark.sql.DataFrameWriter#save ?

@infinitymittal
Copy link

infinitymittal commented Jan 13, 2017

Hi,

There is the limitation of "Can't insert bucketed data into existing hive tables.". Do we have any plans to relax the same? I want to insert data using a query into an already existing table.

@cloud-fan Do we have a Jira for the same?

@tejasapatil
Copy link
Contributor

@infinitymittal
Copy link

infinitymittal commented Jan 13, 2017

@tejasapatil Thanks for the response. SPARK-17729 says "Spark still won't produce bucketed data as per Hive's bucketing guarantees". I want the data to be bucketed when written. Any further leads?

Just to be clear, even with "hive.enforce.bucketing" set to true, the data won't be written. Is that correct? Referencing pull request 15300's comments "Added test to ensure that INSERTs fail if strict bucket / sort is enforced".

@tejasapatil
Copy link
Contributor

@infinitymittal : It will take time to have a fully functional support added. I had initiated a design proposal to get consensus on this could be done : https://issues.apache.org/jira/browse/SPARK-19256

In Spark, "hive.enforce.bucketing" is not respected. #15300 won't guarantee that the data written adheres to Hive's bucketing spec so approach taken there is to fail in user sets configs to enforce bucketing. This will avoid wrong data being written when user is expecting correct outputs after setting "hive.enforce.bucketing" to true. The longer term plan is to get rid of these configs and always write properly bucketed data (hive 2.x follows this model).

@FelixKJose
Copy link

@tejasapatil Is there any update on this regarding "always write properly bucketed data (hive 2.x follows this model)". Does spark provides this or your MR is ready to be merged into master?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.