diff --git a/core/pom.xml b/core/pom.xml index a46292c13bcc0..8fac5eb62fbc6 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -51,6 +51,14 @@ com.twitter chill-java + + org.apache.parquet + parquet-avro + + + org.apache.parquet + parquet-hadoop + org.apache.hadoop hadoop-client diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index c6fef7f91f00c..95de09785ce16 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -155,6 +155,13 @@ class SparkEnv ( object SparkEnv extends Logging { @volatile private var env: SparkEnv = _ + // Let the user specify short names for shuffle managers + val shuffleManagerAliases = Map( + "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager", + "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager", + "tungsten-sort" -> "org.apache.spark.shuffle.unsafe.UnsafeShuffleManager", + "parquet" -> "org.apache.spark.shuffle.parquet.ParquetShuffleManager") + private[spark] val driverActorSystemName = "sparkDriver" private[spark] val executorActorSystemName = "sparkExecutor" @@ -314,13 +321,9 @@ object SparkEnv extends Logging { new MapOutputTrackerMasterEndpoint( rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf)) - // Let the user specify short names for shuffle managers - val shortShuffleMgrNames = Map( - "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager", - "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager", - "tungsten-sort" -> "org.apache.spark.shuffle.unsafe.UnsafeShuffleManager") val shuffleMgrName = conf.get("spark.shuffle.manager", "sort") - val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName) + val shuffleMgrClass = shuffleManagerAliases + .getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName) val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass) val shuffleMemoryManager = ShuffleMemoryManager.create(conf, numUsableCores) diff --git a/core/src/main/scala/org/apache/spark/shuffle/parquet/ErrorShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/parquet/ErrorShuffleManager.scala new file mode 100644 index 0000000000000..437f9aae9b680 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/shuffle/parquet/ErrorShuffleManager.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.shuffle.parquet + +import org.apache.spark.network.buffer.ManagedBuffer +import org.apache.spark.shuffle._ +import org.apache.spark.storage.ShuffleBlockId +import org.apache.spark.{ShuffleDependency, TaskContext} + +class ErrorShuffleManager extends ShuffleManager { + + private def throwError(error: String) = { + throw new NotImplementedError( + s"${ParquetShuffleConfig.fallbackShuffleManager} not defined: ${error}") + } + + /** + * Register a shuffle with the manager and obtain a handle for it to pass to tasks. + */ + override def registerShuffle[K, V, C](shuffleId: Int, + numMaps: Int, + dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { + throwError(s"Unable to register shuffle for keyClass=${dependency.keyClassName} " + + s"valueClass=${dependency.valueClassName} combineClass=${dependency.combinerClassName}") + } + + /** + * Return a resolver capable of retrieving shuffle block data based on block coordinates. + */ + override def shuffleBlockResolver: ShuffleBlockResolver = new ShuffleBlockResolver { + + override def stop(): Unit = {} // no-op + + /** + * Retrieve the data for the specified block. If the data for that block is not available, + * throws an unspecified exception. + */ + override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { + throwError(s"Unable to get block data for ${blockId}") + } + } + + /** Shut down this ShuffleManager. */ + override def stop(): Unit = {} // no-op + + /** + * Remove a shuffle's metadata from the ShuffleManager. + * @return true if the metadata removed successfully, otherwise false. + */ + override def unregisterShuffle(shuffleId: Int): Boolean = { + throwError("Unable to unregister shuffle") + } + + /** Get a writer for a given partition. Called on executors by map tasks. */ + override def getWriter[K, V](handle: ShuffleHandle, + mapId: Int, + context: TaskContext): ShuffleWriter[K, V] = { + throwError("Unable to get a writer") + } + + /** + * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive). + * Called on executors by reduce tasks. + */ + override def getReader[K, C](handle: ShuffleHandle, + startPartition: Int, + endPartition: Int, + context: TaskContext): ShuffleReader[K, C] = { + throwError("Unable to get a reader") + } +} diff --git a/core/src/main/scala/org/apache/spark/shuffle/parquet/ParquetShuffleConfig.scala b/core/src/main/scala/org/apache/spark/shuffle/parquet/ParquetShuffleConfig.scala new file mode 100644 index 0000000000000..e999d3363471e --- /dev/null +++ b/core/src/main/scala/org/apache/spark/shuffle/parquet/ParquetShuffleConfig.scala @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.shuffle.parquet + +import org.apache.parquet.hadoop.ParquetWriter +import org.apache.parquet.hadoop.metadata.CompressionCodecName + +import org.apache.spark.shuffle.ShuffleManager +import org.apache.spark.util.Utils +import org.apache.spark.{SparkConf, SparkEnv} + +object ParquetShuffleConfig { + private val sparkManagerConfKey = "spark.shuffle.manager" + private val parquetManagerAlias = "parquet" + private val namespace = "spark.shuffle.parquet." + private val compressionKey = namespace + "compression" + private val blocksizeKey = namespace + "blocksize" + private val pagesizeKey = namespace + "pagesize" + private val enableDictionaryKey = namespace + "enabledictionary" + private[parquet] val fallbackShuffleManager = namespace + "fallback" + + def isParquetShuffleEnabled: Boolean = { + isParquetShuffleEnabled(SparkEnv.get.conf) + } + + def isParquetShuffleEnabled(conf: SparkConf): Boolean = { + val confValue = conf.get(sparkManagerConfKey, "") + confValue == parquetManagerAlias || confValue == classOf[ParquetShuffleManager].getName + } + + def enableParquetShuffle(): Unit = { + enableParquetShuffle(SparkEnv.get.conf) + } + + def enableParquetShuffle(conf: SparkConf): Unit = { + conf.set(ParquetShuffleConfig.sparkManagerConfKey, classOf[ParquetShuffleManager].getName) + } + + def getCompression: CompressionCodecName = { + getCompression(SparkEnv.get.conf) + } + + def getCompression(conf: SparkConf): CompressionCodecName = { + val confValue = conf.get(compressionKey, null) + if (confValue == null) { + ParquetWriter.DEFAULT_COMPRESSION_CODEC_NAME + } else { + CompressionCodecName.fromConf(confValue) + } + } + + def getBlockSize: Int = { + getBlockSize(SparkEnv.get.conf) + } + + def getBlockSize(conf: SparkConf): Int = { + val confValue = conf.get(blocksizeKey, null) + if (confValue == null) { + ParquetWriter.DEFAULT_BLOCK_SIZE + } else { + confValue.toInt + } + } + + def getPageSize: Int = { + getPageSize(SparkEnv.get.conf) + } + + def getPageSize(conf: SparkConf): Int = { + val confValue = conf.get(pagesizeKey, null) + if (confValue == null) { + ParquetWriter.DEFAULT_PAGE_SIZE + } else { + confValue.toInt + } + } + + def isDictionaryEnabled: Boolean = { + isDictionaryEnabled(SparkEnv.get.conf) + } + + def isDictionaryEnabled(conf: SparkConf): Boolean = { + val confValue = conf.get(enableDictionaryKey, null) + if (confValue == null) { + ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED + } else { + confValue.toBoolean + } + } + + def setFallbackShuffleManager(managerName: String): Unit = { + setFallbackShuffleManager(SparkEnv.get.conf, managerName) + } + + def setFallbackShuffleManager(conf: SparkConf, managerName: String): Unit = { + conf.set(fallbackShuffleManager, managerName) + } + + def getFallbackShuffleManager: ShuffleManager = { + getFallbackShuffleManager(SparkEnv.get.conf) + } + + def getFallbackShuffleManager(conf: SparkConf): ShuffleManager = { + val confValue = conf.get(fallbackShuffleManager, null) + if (confValue == null) { + new ErrorShuffleManager + } else { + val fullName = SparkEnv.shuffleManagerAliases.getOrElse(confValue, confValue) + val cls = Utils.classForName(fullName) + cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[ShuffleManager] + } + } + +} diff --git a/core/src/main/scala/org/apache/spark/shuffle/parquet/ParquetShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/parquet/ParquetShuffleManager.scala new file mode 100644 index 0000000000000..3e216cbe27e36 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/shuffle/parquet/ParquetShuffleManager.scala @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.shuffle.parquet + +import java.util.Collections +import java.util.concurrent.ConcurrentHashMap + +import scala.util.{Success, Try} + +import org.apache.avro.Schema +import org.apache.avro.generic.IndexedRecord + +import org.apache.spark._ +import org.apache.spark.network.buffer.ManagedBuffer +import org.apache.spark.shuffle._ +import org.apache.spark.shuffle.parquet.avro.AvroPair +import org.apache.spark.storage.ShuffleBlockId +import org.apache.spark.util.Utils + +// Returned on shuffle registration, contains schema information for readers/writers +private[spark] class ParquetShuffleHandle[K, V, C](shuffleId: Int, + numMaps: Int, + dependency: ShuffleDependency[K, V, C], + val avroPairSchema: String) + extends BaseShuffleHandle(shuffleId, numMaps, dependency) + +private[spark] object ParquetShuffleManager extends Logging { + + def parquetShuffleCanBeUsed[K, V, C](shuffleId: Int, + numMaps: Int, + dependency: ShuffleDependency[K, V, C]): Option[Schema] = { + + def getSchema(className: String): Option[Schema] = { + Try(Utils.classForName(className).newInstance().asInstanceOf[IndexedRecord]) match { + case Success(indexedRecord) => Some(indexedRecord.getSchema) + case _ => None + } + } + + getSchema(dependency.keyClassName) match { + case None => + // Can't use Parquet, the key class has no schema + None + case Some(keySchema) => + if (dependency.mapSideCombine) { + dependency.aggregator match { + case None => + throw new AssertionError("Map-Side combine requested but no aggregator defined!") + case Some(aggregator) => + dependency.combinerClassName.map(getSchema) + .map(schema => AvroPair.makePairSchema(keySchema, schema.get)) + } + } else { + // We are *not* doing a map-side combine + getSchema(dependency.valueClassName) match { + case None => + // We can't use Parquet, the value class has no schema + None + case Some(valueSchema) => + // Parquet shuffle files will contain key and value class pairs + Some(AvroPair.makePairSchema(keySchema, valueSchema)) + } + } + } + } +} + +private[spark] class ParquetShuffleManager(conf: SparkConf) extends ShuffleManager with Logging { + private val fileShuffleBlockManager = new FileShuffleBlockResolver(conf) + private val fallbackManager = ParquetShuffleConfig.getFallbackShuffleManager(conf) + private val fallbackShuffleIds = + Collections.newSetFromMap(new ConcurrentHashMap[Int, java.lang.Boolean]()) + private val delegatingShuffleBlockResolver = new ShuffleBlockResolver { + + override def stop(): Unit = { + fallbackManager.shuffleBlockResolver.stop() + fileShuffleBlockManager.stop() + } + + /** + * Retrieve the data for the specified block. If the data for that block is not available, + * throws an unspecified exception. + */ + override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { + if (fallbackShuffleIds.contains(blockId.shuffleId)) { + fallbackManager.shuffleBlockResolver.getBlockData(blockId) + } else { + fileShuffleBlockManager.getBlockData(blockId) + } + } + } + + /** + * Register a shuffle with the manager and obtain a handle for it to pass to tasks. + */ + override def registerShuffle[K, V, C](shuffleId: Int, + numMaps: Int, + dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { + // If Parquet is supported for this shuffle, use it + ParquetShuffleManager.parquetShuffleCanBeUsed(shuffleId, numMaps, dependency) match { + case Some(schema) => + new ParquetShuffleHandle(shuffleId, numMaps, dependency, schema.toString) + case _ => + // ... otherwise, use the fallback shuffle manager + fallbackShuffleIds.add(shuffleId) + fallbackManager.registerShuffle(shuffleId, numMaps, dependency) + } + } + + /** Shut down this ShuffleManager. */ + override def stop(): Unit = delegatingShuffleBlockResolver.stop() + + /** + * Remove a shuffle's metadata from the ShuffleManager. + * @return true if the metadata removed successfully, otherwise false. + */ + override def unregisterShuffle(shuffleId: Int): Boolean = { + if (fallbackShuffleIds.remove(shuffleId)) { + // Notify the fallback shuffle manager, if it was used for this shuffle + fallbackManager.unregisterShuffle(shuffleId) + } else { + // Otherwise, remove it from the Parquet block resolver + fileShuffleBlockManager.removeShuffle(shuffleId) + } + } + + /** Get a writer for a given partition. Called on executors by map tasks. */ + override def getWriter[K, V](handle: ShuffleHandle, + mapId: Int, + context: TaskContext): ShuffleWriter[K, V] = { + handle match { + case parquetHandle: ParquetShuffleHandle[K, V, _] => + new ParquetShuffleWriter[K, V](fileShuffleBlockManager, parquetHandle, mapId, context) + case _ => + fallbackManager.getWriter(handle, mapId, context) + } + } + + override def shuffleBlockResolver: ShuffleBlockResolver = delegatingShuffleBlockResolver + + /** + * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive). + * Called on executors by reduce tasks. + */ + override def getReader[K, C](handle: ShuffleHandle, + startPartition: Int, + endPartition: Int, + context: TaskContext): ShuffleReader[K, C] = { + handle match { + case parquetHandle: ParquetShuffleHandle[K, _, C] => + new ParquetShuffleReader(parquetHandle, startPartition, endPartition, context) + case _ => + fallbackManager.getReader(handle, startPartition, endPartition, context) + } + } +} diff --git a/core/src/main/scala/org/apache/spark/shuffle/parquet/ParquetShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/parquet/ParquetShuffleReader.scala new file mode 100644 index 0000000000000..685be3deeff5c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/shuffle/parquet/ParquetShuffleReader.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.shuffle.parquet + +import java.nio.file.Files + +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetReader + +import org.apache.spark.serializer.Serializer +import org.apache.spark.shuffle.parquet.avro.AvroPair + +import org.apache.spark.shuffle.ShuffleReader +import org.apache.spark.storage.{ShuffleBlockFetcherIterator, BlockManager} +import org.apache.spark.util.CompletionIterator +import org.apache.spark.util.collection.ExternalSorter +import org.apache.spark._ + +class ParquetShuffleReader[K, V, C]( + handle: ParquetShuffleHandle[K, _, C], + startPartition: Int, + endPartition: Int, + context: TaskContext, + blockManager: BlockManager = SparkEnv.get.blockManager, + mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker) + extends ShuffleReader[K, C] with Logging { + require(endPartition == startPartition + 1, + "Parquet shuffle currently only supports fetching one partition") + + private val dep = handle.dependency + private val shuffleId = handle.shuffleId + private val reduceId = startPartition + + /** Read the combined key-values for this reduce task */ + override def read(): Iterator[Product2[K, C]] = { + val blockStreams = new ShuffleBlockFetcherIterator( + context, + blockManager.shuffleClient, + blockManager, + mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition), + // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility + SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024) + + val readMetrics = context.taskMetrics().createShuffleReadMetricsForDependency() + + val recordIterator = CompletionIterator[Product2[Any, Any], + Iterator[Product2[Any, Any]]]( + for ((blockId, inputStream) <- blockStreams; + record <- { + // Parquet needs to work with Files instead of InputStreams, so we + // (1) Request a local, temporary block to write the remote data to + val (tempBlockId, tempBlock) = blockManager.diskBlockManager.createTempLocalBlock() + // (2) Copy all data from the InputStream to the local, temporary block File. + Files.copy(inputStream, tempBlock.toPath) + // (3) Close the InputStream, and + inputStream.close() + // (4) Read the Parquet records from the local temporary block File + val reader = AvroParquetReader.builder[AvroPair[K, Any]]( + new Path(tempBlock.getCanonicalPath)) + .build() + val iterator = Iterator.continually(reader.read()).takeWhile(_ != null) + CompletionIterator[Product2[Any, Any], Iterator[Product2[Any, Any]]](iterator, { + reader.close() + tempBlock.delete() + }) + }) yield { + // Update the read metrics for each record that is read + readMetrics.incRecordsRead(1) + record + }, + // When the iterator completes, update all the shuffle metrics + context.taskMetrics().updateShuffleReadMetrics()) + + // An interruptible iterator must be used here in order to support task cancellation + val interruptibleIter = new InterruptibleIterator[Product2[Any, Any]](context, recordIterator) + + val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) { + if (dep.mapSideCombine) { + // We are reading values that are already combined + val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]] + dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context) + } else { + // We don't know the value type, but also don't care -- the dependency *should* + // have made sure its compatible w/ this aggregator, which will convert the value + // type to the combined type C + val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]] + dep.aggregator.get.combineValuesByKey(keyValuesIterator, context) + } + } else { + require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!") + interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]] + } + + // Sort the output if there is a sort ordering defined. + dep.keyOrdering match { + case Some(keyOrd: Ordering[K]) => + // TODO: Create a sorter that can spill to Parquet files + val ser = Serializer.getSerializer(dep.serializer) + val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser)) + sorter.insertAll(aggregatedIter) + context.taskMetrics.incMemoryBytesSpilled(sorter.memoryBytesSpilled) + context.taskMetrics.incDiskBytesSpilled(sorter.diskBytesSpilled) + sorter.iterator + case None => + aggregatedIter + } + } +} diff --git a/core/src/main/scala/org/apache/spark/shuffle/parquet/ParquetShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/parquet/ParquetShuffleWriter.scala new file mode 100644 index 0000000000000..1c6697748e69a --- /dev/null +++ b/core/src/main/scala/org/apache/spark/shuffle/parquet/ParquetShuffleWriter.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.shuffle.parquet + +import java.io.File + +import scala.util.{Failure, Success, Try} + +import org.apache.avro.Schema +import org.apache.hadoop.fs.Path +import org.apache.parquet.avro.AvroParquetWriter + +import org.apache.spark.executor.ShuffleWriteMetrics +import org.apache.spark.scheduler.MapStatus +import org.apache.spark.serializer.Serializer +import org.apache.spark.shuffle.parquet.avro.AvroPair +import org.apache.spark.shuffle.{FileShuffleBlockResolver, ShuffleWriter} +import org.apache.spark.storage.ShuffleBlockId +import org.apache.spark.{Logging, SparkEnv, TaskContext} + +case class AvroFileWriter[K](file: File, writer: AvroParquetWriter[AvroPair[K, Any]]) + +class ParquetShuffleWriter[K, V](shuffleBlockResolver: FileShuffleBlockResolver, + handle: ParquetShuffleHandle[K, V, _], + mapId: Int, + context: TaskContext) extends ShuffleWriter[K, V] with Logging { + private val dep = handle.dependency + private val numOutputSplits = dep.partitioner.numPartitions + private val blockManager = SparkEnv.get.blockManager + private var stopping = false + + private val metrics = context.taskMetrics() + private val writeMetrics = new ShuffleWriteMetrics() + metrics.shuffleWriteMetrics = Some(writeMetrics) + + // Parse the serialized avro schema (json) into an Avro Schema object + private val avroSchema = new Schema.Parser().parse(handle.avroPairSchema) + + private val ser = Serializer.getSerializer(dep.serializer.orNull) + private val writers = Array.tabulate[AvroFileWriter[K]](numOutputSplits) { + bucketId => + val blockId = ShuffleBlockId(dep.shuffleId, mapId, bucketId) + val outputFile = blockManager.diskBlockManager.getFile(blockId) + val outputPath = new Path(outputFile.getCanonicalPath) + AvroFileWriter(outputFile, + new AvroParquetWriter[AvroPair[K, Any]](outputPath, + avroSchema, ParquetShuffleConfig.getCompression, ParquetShuffleConfig.getBlockSize, + ParquetShuffleConfig.getPageSize, ParquetShuffleConfig.isDictionaryEnabled)) + } + + /** Write a bunch of records to this task's output */ + override def write(records: Iterator[_ <: Product2[K, V]]): Unit = { + val iter = if (dep.mapSideCombine) { + dep.aggregator match { + case None => + throw new AssertionError("Map-size combine requested with an aggregator") + case Some(aggregator) => + aggregator.combineValuesByKey(records, context) + } + } else { + records + } + + for (elem <- iter) { + val bucketId = dep.partitioner.getPartition(elem._1) + writers(bucketId).writer.write( + new AvroPair[K, Any](elem._1, elem._2, avroSchema)) + writeMetrics.incShuffleRecordsWritten(1) + } + } + + /** Close this writer, passing along whether the map completed */ + override def stop(initiallySuccess: Boolean): Option[MapStatus] = { + var success = initiallySuccess + stopping match { + case true => None + case false => + stopping = true + val status = Try(writers.map { avro: AvroFileWriter[K] => + avro.writer.close() + val bytesWritten = avro.file.length() + writeMetrics.incShuffleBytesWritten(bytesWritten) + bytesWritten + }) + status match { + case Success(sizes) => + Some(MapStatus(blockManager.shuffleServerId, sizes)) + case f: Failure[Array[Long]] => + throw f.exception + } + } + } +} diff --git a/core/src/main/scala/org/apache/spark/shuffle/parquet/avro/AvroPair.scala b/core/src/main/scala/org/apache/spark/shuffle/parquet/avro/AvroPair.scala new file mode 100644 index 0000000000000..792c1ba90e2db --- /dev/null +++ b/core/src/main/scala/org/apache/spark/shuffle/parquet/avro/AvroPair.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.shuffle.parquet.avro + +import scala.collection.JavaConverters._ + +import org.apache.avro.Schema +import org.apache.avro.Schema.Field +import org.apache.avro.generic.IndexedRecord +import org.apache.avro.specific.SpecificData.SchemaConstructable + +/** + * Helper class for wrapping two Avro objects inside a key-value object + */ +object AvroPair { + private val PAIR: String = classOf[AvroPair[_, _]].getName + private val KEY: String = "key" + private val VALUE: String = "value" + private val NULL_SCHEMA = Schema.create(Schema.Type.NULL) + + def checkIsPairSchema(schema: Schema): Boolean = PAIR == schema.getFullName + + /** + * Creates a pair schema with the key and value fields being optional to + * support null values + * @param keySchema The Avro schema for the key + * @param valueSchema The Avro schema for the value + * @return The combined pair schema + */ + def makePairSchema(keySchema: Schema, valueSchema: Schema): Schema = { + val pair: Schema = Schema.createRecord(PAIR, null, null, false) + pair.setFields(List( + new Schema.Field(KEY, Schema.createUnion(List(NULL_SCHEMA, keySchema).asJava), "", null), + new Schema.Field(VALUE, Schema.createUnion(List(NULL_SCHEMA, keySchema).asJava), "", null, + Field.Order.IGNORE)).asJava) + pair + } +} + +class AvroPair[K, V](var _1: K, var _2: V, schema: Schema) + extends IndexedRecord with Product2[K, V] with SchemaConstructable { + assert(AvroPair.checkIsPairSchema(schema), + "AvroPair can only be created with a pair schema") + + // ctor for SchemaConstructable + def this(schema: Schema) = this(null.asInstanceOf[K], null.asInstanceOf[V], schema) + + def update(key: K, value: V): AvroPair[K, V] = { + this._1 = key + this._2 = value + this + } + + override def get(i: Int): AnyRef = i match { + case 0 => _1.asInstanceOf[AnyRef] + case 1 => _2.asInstanceOf[AnyRef] + case _ => new IndexOutOfBoundsException(i.toString) + } + + override def put(i: Int, v: scala.Any): Unit = i match { + case 0 => _1 = v.asInstanceOf[K] + case 1 => _2 = v.asInstanceOf[V] + case _ => new IndexOutOfBoundsException(i.toString) + } + + override def getSchema: Schema = schema + + override def canEqual(that: Any): Boolean = that.isInstanceOf[AvroPair[_, _]] +} diff --git a/core/src/test/java/org/apache/spark/shuffle/parquet/avro/AvroTestEntity.java b/core/src/test/java/org/apache/spark/shuffle/parquet/avro/AvroTestEntity.java new file mode 100644 index 0000000000000..7d880335ba99c --- /dev/null +++ b/core/src/test/java/org/apache/spark/shuffle/parquet/avro/AvroTestEntity.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package org.apache.spark.shuffle.parquet.avro; +@SuppressWarnings("all") +@org.apache.avro.specific.AvroGenerated +public class AvroTestEntity extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + private static final long serialVersionUID = 6618460632626642454L; + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"AvroTestEntity\",\"namespace\":\"org.apache.spark.shuffle.parquet.avro\",\"fields\":[{\"name\":\"a\",\"type\":[\"null\",\"string\"]},{\"name\":\"b\",\"type\":[\"null\",\"int\"]}]}"); + public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } + @Deprecated public java.lang.CharSequence a; + @Deprecated public java.lang.Integer b; + + /** + * Default constructor. Note that this does not initialize fields + * to their default values from the schema. If that is desired then + * one should use newBuilder(). + */ + public AvroTestEntity() {} + + /** + * All-args constructor. + */ + public AvroTestEntity(java.lang.CharSequence a, java.lang.Integer b) { + this.a = a; + this.b = b; + } + + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + // Used by DatumWriter. Applications should not call. + public java.lang.Object get(int field$) { + switch (field$) { + case 0: return a; + case 1: return b; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + // Used by DatumReader. Applications should not call. + @SuppressWarnings(value="unchecked") + public void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: a = (java.lang.CharSequence)value$; break; + case 1: b = (java.lang.Integer)value$; break; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + + /** + * Gets the value of the 'a' field. + */ + public java.lang.CharSequence getA() { + return a; + } + + /** + * Sets the value of the 'a' field. + * @param value the value to set. + */ + public void setA(java.lang.CharSequence value) { + this.a = value; + } + + /** + * Gets the value of the 'b' field. + */ + public java.lang.Integer getB() { + return b; + } + + /** + * Sets the value of the 'b' field. + * @param value the value to set. + */ + public void setB(java.lang.Integer value) { + this.b = value; + } + + /** Creates a new AvroTestEntity RecordBuilder */ + public static org.apache.spark.shuffle.parquet.avro.AvroTestEntity.Builder newBuilder() { + return new org.apache.spark.shuffle.parquet.avro.AvroTestEntity.Builder(); + } + + /** Creates a new AvroTestEntity RecordBuilder by copying an existing Builder */ + public static org.apache.spark.shuffle.parquet.avro.AvroTestEntity.Builder newBuilder(org.apache.spark.shuffle.parquet.avro.AvroTestEntity.Builder other) { + return new org.apache.spark.shuffle.parquet.avro.AvroTestEntity.Builder(other); + } + + /** Creates a new AvroTestEntity RecordBuilder by copying an existing AvroTestEntity instance */ + public static org.apache.spark.shuffle.parquet.avro.AvroTestEntity.Builder newBuilder(org.apache.spark.shuffle.parquet.avro.AvroTestEntity other) { + return new org.apache.spark.shuffle.parquet.avro.AvroTestEntity.Builder(other); + } + + /** + * RecordBuilder for AvroTestEntity instances. + */ + public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase + implements org.apache.avro.data.RecordBuilder { + + private java.lang.CharSequence a; + private java.lang.Integer b; + + /** Creates a new Builder */ + private Builder() { + super(org.apache.spark.shuffle.parquet.avro.AvroTestEntity.SCHEMA$); + } + + /** Creates a Builder by copying an existing Builder */ + private Builder(org.apache.spark.shuffle.parquet.avro.AvroTestEntity.Builder other) { + super(other); + if (isValidValue(fields()[0], other.a)) { + this.a = data().deepCopy(fields()[0].schema(), other.a); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.b)) { + this.b = data().deepCopy(fields()[1].schema(), other.b); + fieldSetFlags()[1] = true; + } + } + + /** Creates a Builder by copying an existing AvroTestEntity instance */ + private Builder(org.apache.spark.shuffle.parquet.avro.AvroTestEntity other) { + super(org.apache.spark.shuffle.parquet.avro.AvroTestEntity.SCHEMA$); + if (isValidValue(fields()[0], other.a)) { + this.a = data().deepCopy(fields()[0].schema(), other.a); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.b)) { + this.b = data().deepCopy(fields()[1].schema(), other.b); + fieldSetFlags()[1] = true; + } + } + + /** + * Gets the value of the 'a' field. + */ + public java.lang.CharSequence getA() { + return a; + } + + /** + * Sets the value of the 'a' field. + * @param value the value to set. + */ + public org.apache.spark.shuffle.parquet.avro.AvroTestEntity.Builder setA(java.lang.CharSequence value) { + validate(fields()[0], value); + this.a = value; + fieldSetFlags()[0] = true; + return this; + } + + /** + * Checks whether the 'a' field has been set. + */ + public boolean hasA() { + return fieldSetFlags()[0]; + } + + + /** + * Clears the value of the 'a' field. + */ + public org.apache.spark.shuffle.parquet.avro.AvroTestEntity.Builder clearA() { + a = null; + fieldSetFlags()[0] = false; + return this; + } + + /** + * Gets the value of the 'b' field. + */ + public java.lang.Integer getB() { + return b; + } + + /** + * Sets the value of the 'b' field. + * @param value the value to set. + */ + public org.apache.spark.shuffle.parquet.avro.AvroTestEntity.Builder setB(java.lang.Integer value) { + validate(fields()[1], value); + this.b = value; + fieldSetFlags()[1] = true; + return this; + } + + /** + * Checks whether the 'b' field has been set. + */ + public boolean hasB() { + return fieldSetFlags()[1]; + } + + + /** + * Clears the value of the 'b' field. + */ + public org.apache.spark.shuffle.parquet.avro.AvroTestEntity.Builder clearB() { + b = null; + fieldSetFlags()[1] = false; + return this; + } + + @Override + public AvroTestEntity build() { + try { + AvroTestEntity record = new AvroTestEntity(); + record.a = fieldSetFlags()[0] ? this.a : (java.lang.CharSequence) defaultValue(fields()[0]); + record.b = fieldSetFlags()[1] ? this.b : (java.lang.Integer) defaultValue(fields()[1]); + return record; + } catch (Exception e) { + throw new org.apache.avro.AvroRuntimeException(e); + } + } + } + + private static final org.apache.avro.io.DatumWriter + WRITER$ = new org.apache.avro.specific.SpecificDatumWriter(SCHEMA$); + + private static final org.apache.avro.io.DatumReader + READER$ = new org.apache.avro.specific.SpecificDatumReader(SCHEMA$); + +} diff --git a/core/src/test/resources/org/apache/spark/shuffle/parquet/avro/tests.avdl b/core/src/test/resources/org/apache/spark/shuffle/parquet/avro/tests.avdl new file mode 100644 index 0000000000000..e04526774d20f --- /dev/null +++ b/core/src/test/resources/org/apache/spark/shuffle/parquet/avro/tests.avdl @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// avrotools idl tests.avdl > tests.json +// avrotools compile protocol tests.json core/src/test/java/ + +@namespace("org.apache.spark.shuffle.parquet.avro") +protocol AvroParquetTest { + +record AvroTestEntity { + union {null, string} a; + union {null, int} b; +} + +} diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index d91b799ecfc08..491a874f1cb8e 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -296,7 +296,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC assert(metrics.recordsRead === numRecords) assert(metrics.recordsWritten === numRecords) - assert(metrics.bytesWritten === metrics.byresRead) + assert(metrics.bytesWritten === metrics.bytesRead) assert(metrics.bytesWritten > 0) } @@ -312,7 +312,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC assert(metrics.recordsRead === numRecords) assert(metrics.recordsWritten === numRecords) - assert(metrics.bytesWritten === metrics.byresRead) + assert(metrics.bytesWritten === metrics.bytesRead) assert(metrics.bytesWritten > 0) } } @@ -333,7 +333,7 @@ object ShuffleSuite { recordsWritten: Long, recordsRead: Long, bytesWritten: Long, - byresRead: Long) + bytesRead: Long) def runAndReturnMetrics(sc: SparkContext)(job: => Unit): AggregatedShuffleMetrics = { @volatile var recordsWritten: Long = 0 diff --git a/core/src/test/scala/org/apache/spark/shuffle/parquet/ParquetShuffleSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/parquet/ParquetShuffleSuite.scala new file mode 100644 index 0000000000000..2036349437267 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/shuffle/parquet/ParquetShuffleSuite.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.parquet + +import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.shuffle.parquet.avro.AvroTestEntity +import org.apache.spark._ + +class ParquetShuffleSuite extends SparkFunSuite with LocalSparkContext { + + def newConf(withFallback: Boolean = false): SparkConf = { + val conf = new SparkConf() + ParquetShuffleConfig.enableParquetShuffle(conf) + if(withFallback) { + ParquetShuffleConfig.setFallbackShuffleManager(conf, "sort") + } + conf.set("spark.serializer", classOf[KryoSerializer].getName) + } + + val fallbackConf = newConf(withFallback = true) + val noFallbackConf = newConf(withFallback = false) + + test("fallback shuffle without aggregation") { + sc = new SparkContext("local", "test", fallbackConf) + val numRecords = 10000 + + val metrics = ShuffleSuite.runAndReturnMetrics(sc) { + sc.parallelize(1 to numRecords, 4) + .map(key => (key, 1)) + .groupByKey() + .collect() + } + + assert(metrics.recordsRead === numRecords) + assert(metrics.recordsWritten === numRecords) + assert(metrics.bytesWritten === metrics.bytesRead) + assert(metrics.bytesWritten > 0) + } + + test("fallback for shuffle with aggregation") { + sc = new SparkContext("local", "test", fallbackConf) + val numRecords = 10000 + + val metrics = ShuffleSuite.runAndReturnMetrics(sc) { + sc.parallelize(1 to numRecords, 4) + .flatMap(key => Array.fill(100)((key, 1))) + .countByKey() + } + + assert(metrics.recordsRead === numRecords) + assert(metrics.recordsWritten === numRecords) + assert(metrics.bytesWritten === metrics.bytesRead) + assert(metrics.bytesWritten > 0) + } + + test("shuffle without aggregation") { + sc = new SparkContext("local", "test", noFallbackConf) + val numRecords = 10000 + val records = for (i <- 1 to numRecords) yield { + val obj = AvroTestEntity.newBuilder().setA("test").setB(i).build() + (obj, if (i % 10 == 0) null else obj) + } + + val metrics = ShuffleSuite.runAndReturnMetrics(sc) { + sc.parallelize(records, 4) + .groupByKey() + .collect() + } + + assert(metrics.recordsRead === numRecords) + assert(metrics.recordsWritten === numRecords) + assert(metrics.bytesWritten === metrics.bytesRead) + assert(metrics.bytesWritten > 0) + } + + test("shuffle with aggregation") { + sc = new SparkContext("local", "test", noFallbackConf) + val numRecords = 10000 + val records = for (i <- 1 to numRecords) yield { + val obj = AvroTestEntity.newBuilder().setA("agg").setB(i).build() + (obj, if (i % 10 == 0) null else obj) + } + + val metrics = ShuffleSuite.runAndReturnMetrics(sc) { + sc.parallelize(records, 4) + .reduceByKey({(a, b) => AvroTestEntity.newBuilder().setA("agg").build()}) + .collect() + } + + assert(metrics.recordsRead === numRecords) + assert(metrics.recordsWritten === numRecords) + assert(metrics.bytesWritten === metrics.bytesRead) + assert(metrics.bytesWritten > 0) + } + +} diff --git a/docs/configuration.md b/docs/configuration.md index 1a701f18881fe..9d29ac5dba21e 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -447,12 +447,12 @@ Apart from these, the following properties are also available, and may be useful spark.shuffle.manager sort - Implementation to use for shuffling data. There are three implementations available: - sort, hash and the new (1.5+) tungsten-sort. - Sort-based shuffle is more memory-efficient and is the default option starting in 1.2. - Tungsten-sort is similar to the sort based shuffle, with a direct binary cache-friendly - implementation with a fall back to regular sort based shuffle if its requirements are not - met. + Implementation to use for shuffling data. There are four implementations available: + sort, hash, the new (1.5+) tungsten-sort, and + parquet. Sort-based shuffle is more memory-efficient and is the default + option starting in 1.2. Tungsten-sort is similar to the sort based shuffle, with a direct + binary cache-friendly implementation with a fall back to regular sort based shuffle if + its requirements are not met. diff --git a/pom.xml b/pom.xml index 88ebceca769e9..afe5b56089e19 100644 --- a/pom.xml +++ b/pom.xml @@ -1601,7 +1601,7 @@ org.apache.parquet parquet-avro ${parquet.version} - ${parquet.test.deps.scope} + ${parquet.deps.scope} com.twitter