Skip to content

Conversation

@massie
Copy link
Contributor

@massie massie commented Jul 7, 2015

This commit adds a new Spark shuffle manager which reads and writes shuffle data to Apache Parquet files. Parquet has a File interface (not a streaming interface) because it is column-oriented and seeks in a File for metadata information, e.g. schemas, statistics. As such, this implementation fetches remote data to local, temporary blocks before the data is passed to Parquet for reading.

This managers uses the following spark configuration parameters to configure Parquet: spark.shuffle.parquet.{compression, blocksize, pagesize, enabledictionary}.

There is a spark.shuffle.parquet.fallback configuration option which allows users to specify a fallback shuffle manager. If the Parquet manager finds that the classes being shuffled have no schema information, and therefore can't be used, it will fallback to the specified fallback manager. With this PR, only Avro IndexedRecords are supported in the Parquet shuffle; however, it is straight-forward to extend this to other serialization systems that Parquet supports, e.g. Apache Thrift. If there is no spark.shuffle.parquet.fallback defined, any shuffle objects which are not compatible with Parquet will cause an error to be thrown which lists the incompatible objects.

Because the ShuffleDependency forwards the key, value and combined class information, a full schema can be generated before the first read/write. This allows for less errors (since reflection isn't used) and makes support for null values possible without complex code.

The ExternalSorter, if needed, is setup to not spill to disk if Parquet is used. In the future, an ExternalSorter would need to be created that can read/write Parquet.

Only record-level metrics are supported at this time. Byte-level metrics are not currently supported and are complicated somewhat by column compression.

@massie
Copy link
Contributor Author

massie commented Jul 7, 2015

This PR currently only has four new tests to show that:

  • the fallback shuffle for non-Avro objects works
  • the Parquet shuffle is able to shuffle with and without map-side aggregation (combiner)
  • the Parquet shuffle is able to shuffle null values

I'm submitting this PR to solicit feedback about the overall approach. Once reviewer have given this approach their blessing, I'll spend a few days writing targeted tests. Please let me know what parts of this change concern you most to help inform my test writing.

@SparkQA
Copy link

SparkQA commented Jul 7, 2015

Test build #36713 has finished for PR 7265 at commit a6d276b.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    • class CoGroupedRDD[K: ClassTag](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]],
    • class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
    • class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
    • class ErrorShuffleManager extends ShuffleManager
    • class ParquetShuffleReader[K, V, C](
    • case class AvroFileWriter[K](file: File, writer: AvroParquetWriter[AvroPair[K, Any]])
    • class ParquetShuffleWriter[K, V](shuffleBlockResolver: FileShuffleBlockResolver,
    • class AvroPair[K, V](var _1: K, var _2: V, schema: Schema)

@sryza
Copy link
Contributor

sryza commented Jul 7, 2015

Is my understanding correct that, with this shuffle manager, we wouldn't be able to do reduce-side spilling that sorts records, or any map-side spilling (because the default shuffle writer involves sorting)?

@massie
Copy link
Contributor Author

massie commented Jul 7, 2015

Map-side aggregation is supported in this pull request. If you look at the ParquetShuffleManager.parquetShuffleCanBeUsed() method, you'll see that it generates an Avro schema which take into consideration which class of objects will be spilled. The ParquetShuffleWriter.write() method, aggregates and then writes using this schema (the ParquetShuffleReader.read() just uses the schema stored in Parquet by the writer).

The ExternalSorter currently uses the Spark Serializer interface to read/write objects during the spill. If you like, I can easily have the Parquet shuffle reader follow that same pattern for now since it will work, even though it's not ideal.

@sryza
Copy link
Contributor

sryza commented Jul 7, 2015

Having the Parquet shuffle reader follow that pattern seems preferable to me over failing when spilling would be required.

@massie
Copy link
Contributor Author

massie commented Jul 7, 2015

Done. The Parquet shuffle reader behaves identically to the hash shuffle reader now and uses the defined Spark Serializer.

@SparkQA
Copy link

SparkQA commented Jul 7, 2015

Test build #36723 has started for PR 7265 at commit 92852db.

@SparkQA
Copy link

SparkQA commented Jul 7, 2015

Test build #36718 has finished for PR 7265 at commit d9b72cc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@massie
Copy link
Contributor Author

massie commented Jul 7, 2015

Jenkins has just been reconfigured to fix a testing infra bug. I'm going to kill the current build and start a new one.

Jenkins, test this please.

@massie
Copy link
Contributor Author

massie commented Jul 7, 2015

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Jul 8, 2015

Test build #36739 has finished for PR 7265 at commit 92852db.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    • class CoGroupedRDD[K: ClassTag](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]],
    • class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
    • class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
    • class ErrorShuffleManager extends ShuffleManager
    • class ParquetShuffleReader[K, V, C](
    • case class AvroFileWriter[K](file: File, writer: AvroParquetWriter[AvroPair[K, Any]])
    • class ParquetShuffleWriter[K, V](shuffleBlockResolver: FileShuffleBlockResolver,
    • class AvroPair[K, V](var _1: K, var _2: V, schema: Schema)
    • class DecisionTreeClassificationModel(DecisionTreeModel):
    • class RandomForestClassificationModel(TreeEnsembleModels):
    • class GBTClassificationModel(TreeEnsembleModels):
    • class DecisionTreeModel(JavaModel):
    • class TreeEnsembleModels(JavaModel):
    • class DecisionTreeRegressionModel(DecisionTreeModel):
    • class RandomForestRegressionModel(TreeEnsembleModels):
    • class GBTRegressionModel(TreeEnsembleModels):

@massie
Copy link
Contributor Author

massie commented Jul 8, 2015

I'm not sure why Jenkins is calling out changes to

class DecisionTreeClassificationModel(DecisionTreeModel):
class RandomForestClassificationModel(TreeEnsembleModels):
class GBTClassificationModel(TreeEnsembleModels):
class DecisionTreeModel(JavaModel):
class TreeEnsembleModels(JavaModel):
class DecisionTreeRegressionModel(DecisionTreeModel):
class RandomForestRegressionModel(TreeEnsembleModels):
class GBTRegressionModel(TreeEnsembleModels):

since this PR makes no changes to them.

@massie massie force-pushed the parquet-shuffle branch from 92852db to 040517c Compare July 9, 2015 00:25
@massie
Copy link
Contributor Author

massie commented Jul 9, 2015

I just rebased on master to fix a pom.xml conflict. Both parquet-avro and parquet-thrift were added in #7231, so they don't need to be added here.

@SparkQA
Copy link

SparkQA commented Jul 9, 2015

Test build #36863 has finished for PR 7265 at commit 040517c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    • class CoGroupedRDD[K: ClassTag](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]],
    • class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
    • class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
    • class ErrorShuffleManager extends ShuffleManager
    • class ParquetShuffleReader[K, V, C](
    • case class AvroFileWriter[K](file: File, writer: AvroParquetWriter[AvroPair[K, Any]])
    • class ParquetShuffleWriter[K, V](shuffleBlockResolver: FileShuffleBlockResolver,
    • class AvroPair[K, V](var _1: K, var _2: V, schema: Schema)
    • class FPGrowthModel[Item: ClassTag](val freqItemsets: RDD[FreqItemset[Item]]) extends Serializable
    • public final class Interval implements Serializable

@massie
Copy link
Contributor Author

massie commented Jul 9, 2015

Jenkins, test this please.

@massie
Copy link
Contributor Author

massie commented Jul 9, 2015

Looks like Jenkins was in a bad state. I'll kick it to test again.

@massie
Copy link
Contributor Author

massie commented Jul 9, 2015

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Jul 9, 2015

Test build #36954 has finished for PR 7265 at commit 040517c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    • class CoGroupedRDD[K: ClassTag](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]],
    • class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
    • class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
    • class ErrorShuffleManager extends ShuffleManager
    • class ParquetShuffleReader[K, V, C](
    • case class AvroFileWriter[K](file: File, writer: AvroParquetWriter[AvroPair[K, Any]])
    • class ParquetShuffleWriter[K, V](shuffleBlockResolver: FileShuffleBlockResolver,
    • class AvroPair[K, V](var _1: K, var _2: V, schema: Schema)

@massie
Copy link
Contributor Author

massie commented Jul 14, 2015

I've opened PR #7403 which includes only the changes to the Spark shuffle, serializing the key, value and combiner class names. I'll rebase this PR to only have the Parquet shuffle implementation work. I'm hoping the separating these will make it easier to review.

@massie massie force-pushed the parquet-shuffle branch from 040517c to 56be3a2 Compare July 14, 2015 21:46
@SparkQA
Copy link

SparkQA commented Jul 14, 2015

Test build #37270 has finished for PR 7265 at commit 56be3a2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@massie
Copy link
Contributor Author

massie commented Jul 15, 2015

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Jul 15, 2015

Test build #37374 has finished for PR 7265 at commit 56be3a2.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    • class CoGroupedRDD[K: ClassTag](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]],
    • class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
    • class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
    • class ErrorShuffleManager extends ShuffleManager
    • class ParquetShuffleReader[K, V, C](
    • case class AvroFileWriter[K](file: File, writer: AvroParquetWriter[AvroPair[K, Any]])
    • class ParquetShuffleWriter[K, V](shuffleBlockResolver: FileShuffleBlockResolver,
    • class AvroPair[K, V](var _1: K, var _2: V, schema: Schema)

@massie
Copy link
Contributor Author

massie commented Jul 27, 2015

Once #7403 is merged, I'll rebase this PR on and fix the conflicts.

@SparkQA
Copy link

SparkQA commented Sep 10, 2015

Test build #42301 has finished for PR 7265 at commit 2f424f0.

  • This patch fails Scala style tests.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
    • class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    • class CoGroupedRDD[K: ClassTag](
    • class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
    • class ErrorShuffleManager extends ShuffleManager
    • class ParquetShuffleReader[K, V, C](
    • case class AvroFileWriter[K](file: File, writer: AvroParquetWriter[AvroPair[K, Any]])
    • class ParquetShuffleWriter[K, V](shuffleBlockResolver: FileShuffleBlockResolver,
    • class AvroPair[K, V](var _1: K, var _2: V, schema: Schema)

…arquet

This commit adds a new Spark shuffle manager which reads and writes shuffle data to Apache
Parquet files. Parquet has a File interface (not a streaming interface) because it is
column-oriented and seeks in a File for metadata information, e.g. schemas, statistics.
As such, this implementation fetches remote data to local, temporary blocks before the
data is passed to Parquet for reading.

This managers uses the following spark configuration parameters to configure Parquet:
spark.shuffle.parquet.{compression, blocksize, pagesize, enabledictionary}.

There is a spark.shuffle.parquet.fallback configuration option which allows users to
specify a fallback shuffle manager. If the Parquet manager finds that the classes
being shuffled have no schema information, and therefore can't be used, it will
fallback to the specified fallback manager. With this PR, only Avro IndexedRecords
are supported in the Parquet shuffle; however, it is straight-forward to extend
this to other serialization systems that Parquet supports, e.g. Apache Thrift.
If there is no spark.shuffle.parquet.fallback defined, any shuffle objects which are
not compatible with Parquet will cause an error to be thrown which lists the
incompatible objects.

Because the ShuffleDependency forwards the key, value and combined class information,
a full schema can be generated before the first read/write. This allows for less
errors (since reflection isn't used) and makes support for null values possible without
complex code.

The ExternalSorter, if needed, is setup to not spill to disk if Parquet is used. In
the future, an ExternalSorter would need to be created that can read/write Parquet.

Only record-level metrics are supported at this time. Byte-level metrics are not
currently supported and are complicated somewhat by column compression.
@massie
Copy link
Contributor Author

massie commented Sep 11, 2015

Now that #7403 is merged, I've rebased this PR on top of master. This PR is ready for review now, when someone has the time. Thanks.

@SparkQA
Copy link

SparkQA commented Sep 11, 2015

Test build #42302 has finished for PR 7265 at commit a670789.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
    • class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    • class CoGroupedRDD[K: ClassTag](
    • class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
    • class ErrorShuffleManager extends ShuffleManager
    • class ParquetShuffleReader[K, V, C](
    • case class AvroFileWriter[K](file: File, writer: AvroParquetWriter[AvroPair[K, Any]])
    • class ParquetShuffleWriter[K, V](shuffleBlockResolver: FileShuffleBlockResolver,
    • class AvroPair[K, V](var _1: K, var _2: V, schema: Schema)

@SparkQA
Copy link

SparkQA commented Sep 11, 2015

Test build #42306 has finished for PR 7265 at commit 0a4c028.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    • class CoGroupedRDD[K: ClassTag](
    • class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
    • class ErrorShuffleManager extends ShuffleManager
    • class ParquetShuffleReader[K, V, C](
    • case class AvroFileWriter[K](file: File, writer: AvroParquetWriter[AvroPair[K, Any]])
    • class ParquetShuffleWriter[K, V](shuffleBlockResolver: FileShuffleBlockResolver,
    • class AvroPair[K, V](var _1: K, var _2: V, schema: Schema)

@SparkQA
Copy link

SparkQA commented May 2, 2016

Test build #57552 has finished for PR 7265 at commit 0a4c028.

  • This patch fails R style tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@cpwais
Copy link

cpwais commented Jul 29, 2016

Might this make some 2.0.x release of Spark?

@SparkQA
Copy link

SparkQA commented Sep 29, 2016

Test build #66134 has finished for PR 7265 at commit 0a4c028.

  • This patch fails R style tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 23, 2016

Test build #69069 has finished for PR 7265 at commit 0a4c028.

  • This patch fails R style tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Dec 7, 2016

I'm gong to close this for now. Next year we might actually come back and revisit this - probably not with the current parquet implementation since it is not very efficient, but some sort of columnar format.

@asfgit asfgit closed this in 08d6441 Dec 7, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants