From 00b9757716da5520c9cd5c974d8ed3618bfda8df Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Thu, 26 May 2016 15:58:34 +0900 Subject: [PATCH 1/4] Add a datasource-specific option 'minPartitions' --- .../datasources/FileSourceStrategySuite.scala | 27 ++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index d900ce7bb237..ae37a46dad29 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -274,6 +274,29 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi } } + test("datasource-specific minPartitions") { + val table = + createTable( + files = Seq( + "file1" -> 1, + "file2" -> 1, + "file3" -> 1, + "file4" -> 1, + "file5" -> 1, + "file6" -> 1, + "file7" -> 1, + "file8" -> 1, + "file9" -> 1), + options = Map("minPartitions" -> "3")) + + checkScan(table.select('c1)) { partitions => + assert(partitions.size == 3) + assert(partitions(0).files.size == 3) + assert(partitions(1).files.size == 3) + assert(partitions(2).files.size == 3) + } + } + test("Locality support for FileScanRDD") { val partition = FilePartition(0, Seq( PartitionedFile(InternalRow.empty, "fakePath0", 0, 10, Array("host0", "host1")), @@ -526,7 +549,8 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi */ def createTable( files: Seq[(String, Int)], - buckets: Int = 0): DataFrame = { + buckets: Int = 0, + options: Map[String, String] = Map.empty): DataFrame = { val tempDir = Utils.createTempDir() files.foreach { case (name, size) => @@ -537,6 +561,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi val df = spark.read .format(classOf[TestFileFormat].getName) + .options(options) .load(tempDir.getCanonicalPath) if (buckets > 0) { From ad5121fe940a0cd6cb1d823254af073c5477e698 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Fri, 27 May 2016 00:50:39 +0900 Subject: [PATCH 2/4] Override the options of SparkConf/SQLConf with ones for datasources --- core/src/main/scala/org/apache/spark/SparkConf.scala | 5 +++++ .../main/scala/org/apache/spark/sql/DataFrameReader.scala | 8 +++++++- .../execution/datasources/FileSourceStrategySuite.scala | 2 +- 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index c9c342df82c9..6e3436c429d7 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -20,6 +20,7 @@ package org.apache.spark import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ +import scala.collection.immutable import scala.collection.mutable.LinkedHashSet import org.apache.avro.{Schema, SchemaNormalization} @@ -384,6 +385,10 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria .map { case (k, v) => (k.substring(prefix.length), v) } } + /** Get all parameters as a Map */ + def getAllAsMap: immutable.Map[String, String] = { + settings.asScala.toMap + } /** Get a parameter as an integer, falling back to a default if not set */ def getInt(key: String, defaultValue: Int): Int = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index a77937efd7e1..e719e9860bc9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -149,7 +149,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { paths = paths, userSpecifiedSchema = userSpecifiedSchema, className = source, - options = extraOptions.toMap).resolveRelation()) + options = optionsOverriddenWith(extraOptions.toMap)).resolveRelation()) } /** @@ -551,4 +551,10 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { private var extraOptions = new scala.collection.mutable.HashMap[String, String] + // Returns all option set in the `SparkConf`, the `SQLConf`, and a given `options`. + // If the same keys exist, they are overridden with ones in the `options`. + private def optionsOverriddenWith(options: Map[String, String]): Map[String, String] = { + sparkSession.sparkContext.conf.getAllAsMap ++ sparkSession.conf.getAll ++ options + } + } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index ae37a46dad29..653e72fafbee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -287,7 +287,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi "file7" -> 1, "file8" -> 1, "file9" -> 1), - options = Map("minPartitions" -> "3")) + options = Map("spark.default.parallelism" -> "3")) checkScan(table.select('c1)) { partitions => assert(partitions.size == 3) From debd8ed10a82bf2108acebadccdb77e6d2e0084e Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Thu, 30 Jun 2016 13:18:41 -0700 Subject: [PATCH 3/4] Override all the options in datasources --- .../apache/spark/sql/DataFrameReader.scala | 2 +- .../datasources/WriterContainer.scala | 447 ++++++++++++++++++ .../parquet/ParquetFileFormat.scala | 12 +- .../datasources/parquet/ParquetOptions.scala | 8 +- 4 files changed, 460 insertions(+), 9 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index e719e9860bc9..4dd100623216 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -551,7 +551,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { private var extraOptions = new scala.collection.mutable.HashMap[String, String] - // Returns all option set in the `SparkConf`, the `SQLConf`, and a given `options`. + // Returns all option set in the `SparkConf`, the `SQLConf`, and a given data source `options`. // If the same keys exist, they are overridden with ones in the `options`. private def optionsOverriddenWith(options: Map[String, String]): Map[String, String] = { sparkSession.sparkContext.conf.getAllAsMap ++ sparkSession.conf.getAll ++ options diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala new file mode 100644 index 000000000000..e6c0f961266e --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.util.{Date, UUID} + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter} +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.mapred.SparkHadoopMapRedUtil +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.UnsafeKVExternalSorter +import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import org.apache.spark.util.{SerializableConfiguration, Utils} +import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter + + +/** A container for all the details required when writing to a table. */ +private[datasources] case class WriteRelation( + sparkSession: SparkSession, + dataSchema: StructType, + path: String, + prepareJobForWrite: Job => OutputWriterFactory, + bucketSpec: Option[BucketSpec]) + +private[datasources] abstract class BaseWriterContainer( + @transient val relation: WriteRelation, + @transient private val job: Job, + isAppend: Boolean) + extends Logging with Serializable { + + protected val dataSchema = relation.dataSchema + + protected val serializableConf = + new SerializableConfiguration(job.getConfiguration) + + // This UUID is used to avoid output file name collision between different appending write jobs. + // These jobs may belong to different SparkContext instances. Concrete data source implementations + // may use this UUID to generate unique file names (e.g., `part-r--.parquet`). + // The reason why this ID is used to identify a job rather than a single task output file is + // that, speculative tasks must generate the same output file name as the original task. + private val uniqueWriteJobId = UUID.randomUUID() + + // This is only used on driver side. + @transient private val jobContext: JobContext = job + + // The following fields are initialized and used on both driver and executor side. + @transient protected var outputCommitter: OutputCommitter = _ + @transient private var jobId: JobID = _ + @transient private var taskId: TaskID = _ + @transient private var taskAttemptId: TaskAttemptID = _ + @transient protected var taskAttemptContext: TaskAttemptContext = _ + + protected val outputPath: String = relation.path + + protected var outputWriterFactory: OutputWriterFactory = _ + + private var outputFormatClass: Class[_ <: OutputFormat[_, _]] = _ + + def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit + + def driverSideSetup(): Unit = { + setupIDs(0, 0, 0) + setupConf() + + // This UUID is sent to executor side together with the serialized `Configuration` object within + // the `Job` instance. `OutputWriters` on the executor side should use this UUID to generate + // unique task output files. + job.getConfiguration.set(DATASOURCE_WRITEJOBUUID, uniqueWriteJobId.toString) + + // Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor + // clones the Configuration object passed in. If we initialize the TaskAttemptContext first, + // configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext. + // + // Also, the `prepareJobForWrite` call must happen before initializing output format and output + // committer, since their initialization involve the job configuration, which can be potentially + // decorated in `prepareJobForWrite`. + outputWriterFactory = relation.prepareJobForWrite(job) + taskAttemptContext = new TaskAttemptContextImpl(serializableConf.value, taskAttemptId) + + outputFormatClass = job.getOutputFormatClass + outputCommitter = newOutputCommitter(taskAttemptContext) + outputCommitter.setupJob(jobContext) + } + + def executorSideSetup(taskContext: TaskContext): Unit = { + setupIDs(taskContext.stageId(), taskContext.partitionId(), taskContext.attemptNumber()) + setupConf() + taskAttemptContext = new TaskAttemptContextImpl(serializableConf.value, taskAttemptId) + outputCommitter = newOutputCommitter(taskAttemptContext) + outputCommitter.setupTask(taskAttemptContext) + } + + protected def getWorkPath: String = { + outputCommitter match { + // FileOutputCommitter writes to a temporary location returned by `getWorkPath`. + case f: MapReduceFileOutputCommitter => f.getWorkPath.toString + case _ => outputPath + } + } + + protected def newOutputWriter(path: String, bucketId: Option[Int] = None): OutputWriter = { + try { + outputWriterFactory.newInstance(path, bucketId, dataSchema, taskAttemptContext) + } catch { + case e: org.apache.hadoop.fs.FileAlreadyExistsException => + if (outputCommitter.getClass.getName.contains("Direct")) { + // SPARK-11382: DirectParquetOutputCommitter is not idempotent, meaning on retry + // attempts, the task will fail because the output file is created from a prior attempt. + // This often means the most visible error to the user is misleading. Augment the error + // to tell the user to look for the actual error. + throw new SparkException("The output file already exists but this could be due to a " + + "failure from an earlier attempt. Look through the earlier logs or stage page for " + + "the first error.\n File exists error: " + e, e) + } else { + throw e + } + } + } + + private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = { + val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context) + + if (isAppend) { + // If we are appending data to an existing dir, we will only use the output committer + // associated with the file output format since it is not safe to use a custom + // committer for appending. For example, in S3, direct parquet output committer may + // leave partial data in the destination dir when the appending job fails. + // + // See SPARK-8578 for more details + logInfo( + s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName} " + + "for appending.") + defaultOutputCommitter + } else { + val configuration = context.getConfiguration + val committerClass = configuration.getClass( + SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter]) + + Option(committerClass).map { clazz => + logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}") + + // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat + // has an associated output committer. To override this output committer, + // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS. + // If a data source needs to override the output committer, it needs to set the + // output committer in prepareForWrite method. + if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) { + // The specified output committer is a FileOutputCommitter. + // So, we will use the FileOutputCommitter-specified constructor. + val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext]) + ctor.newInstance(new Path(outputPath), context) + } else { + // The specified output committer is just an OutputCommitter. + // So, we will use the no-argument constructor. + val ctor = clazz.getDeclaredConstructor() + ctor.newInstance() + } + }.getOrElse { + // If output committer class is not set, we will use the one associated with the + // file output format. + logInfo( + s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName}") + defaultOutputCommitter + } + } + } + + private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = { + this.jobId = SparkHadoopWriter.createJobID(new Date, jobId) + this.taskId = new TaskID(this.jobId, TaskType.MAP, splitId) + this.taskAttemptId = new TaskAttemptID(taskId, attemptId) + } + + private def setupConf(): Unit = { + serializableConf.value.set("mapred.job.id", jobId.toString) + serializableConf.value.set("mapred.tip.id", taskAttemptId.getTaskID.toString) + serializableConf.value.set("mapred.task.id", taskAttemptId.toString) + serializableConf.value.setBoolean("mapred.task.is.map", true) + serializableConf.value.setInt("mapred.task.partition", 0) + } + + def commitTask(): Unit = { + SparkHadoopMapRedUtil.commitTask(outputCommitter, taskAttemptContext, jobId.getId, taskId.getId) + } + + def abortTask(): Unit = { + if (outputCommitter != null) { + outputCommitter.abortTask(taskAttemptContext) + } + logError(s"Task attempt $taskAttemptId aborted.") + } + + def commitJob(): Unit = { + outputCommitter.commitJob(jobContext) + logInfo(s"Job $jobId committed.") + } + + def abortJob(): Unit = { + if (outputCommitter != null) { + outputCommitter.abortJob(jobContext, JobStatus.State.FAILED) + } + logError(s"Job $jobId aborted.") + } +} + +/** + * A writer that writes all of the rows in a partition to a single file. + */ +private[datasources] class DefaultWriterContainer( + relation: WriteRelation, + job: Job, + isAppend: Boolean) + extends BaseWriterContainer(relation, job, isAppend) { + + def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = { + executorSideSetup(taskContext) + val configuration = taskAttemptContext.getConfiguration + configuration.set(DATASOURCE_OUTPUTPATH, outputPath) + var writer = newOutputWriter(getWorkPath) + writer.initConverter(dataSchema) + + // If anything below fails, we should abort the task. + try { + Utils.tryWithSafeFinallyAndFailureCallbacks { + while (iterator.hasNext) { + val internalRow = iterator.next() + writer.writeInternal(internalRow) + } + commitTask() + }(catchBlock = abortTask()) + } catch { + case t: Throwable => + throw new SparkException("Task failed while writing rows", t) + } + + def commitTask(): Unit = { + try { + if (writer != null) { + writer.close() + writer = null + } + super.commitTask() + } catch { + case cause: Throwable => + // This exception will be handled in `InsertIntoHadoopFsRelation.insert$writeRows`, and + // will cause `abortTask()` to be invoked. + throw new RuntimeException("Failed to commit task", cause) + } + } + + def abortTask(): Unit = { + try { + if (writer != null) { + writer.close() + } + } finally { + super.abortTask() + } + } + } +} + +/** + * A writer that dynamically opens files based on the given partition columns. Internally this is + * done by maintaining a HashMap of open files until `maxFiles` is reached. If this occurs, the + * writer externally sorts the remaining rows and then writes out them out one file at a time. + */ +private[datasources] class DynamicPartitionWriterContainer( + relation: WriteRelation, + job: Job, + partitionColumns: Seq[Attribute], + dataColumns: Seq[Attribute], + inputSchema: Seq[Attribute], + defaultPartitionName: String, + maxOpenFiles: Int, + isAppend: Boolean) + extends BaseWriterContainer(relation, job, isAppend) { + + private val bucketSpec = relation.bucketSpec + + private val bucketColumns: Seq[Attribute] = bucketSpec.toSeq.flatMap { + spec => spec.bucketColumnNames.map(c => inputSchema.find(_.name == c).get) + } + + private val sortColumns: Seq[Attribute] = bucketSpec.toSeq.flatMap { + spec => spec.sortColumnNames.map(c => inputSchema.find(_.name == c).get) + } + + private def bucketIdExpression: Option[Expression] = bucketSpec.map { spec => + // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can + // guarantee the data distribution is same between shuffle and bucketed data source, which + // enables us to only shuffle one side when join a bucketed table and a normal one. + HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression + } + + // Expressions that given a partition key build a string like: col1=val/col2=val/... + private def partitionStringExpression: Seq[Expression] = { + partitionColumns.zipWithIndex.flatMap { case (c, i) => + val escaped = + ScalaUDF( + PartitioningUtils.escapePathName _, + StringType, + Seq(Cast(c, StringType)), + Seq(StringType)) + val str = If(IsNull(c), Literal(defaultPartitionName), escaped) + val partitionName = Literal(c.name + "=") :: str :: Nil + if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName + } + } + + private def getBucketIdFromKey(key: InternalRow): Option[Int] = bucketSpec.map { _ => + key.getInt(partitionColumns.length) + } + + /** + * Open and returns a new OutputWriter given a partition key and optional bucket id. + * If bucket id is specified, we will append it to the end of the file name, but before the + * file extension, e.g. part-r-00009-ea518ad4-455a-4431-b471-d24e03814677-00002.gz.parquet + */ + private def newOutputWriter( + key: InternalRow, + getPartitionString: UnsafeProjection): OutputWriter = { + val configuration = taskAttemptContext.getConfiguration + val path = if (partitionColumns.nonEmpty) { + val partitionPath = getPartitionString(key).getString(0) + configuration.set(DATASOURCE_OUTPUTPATH, new Path(outputPath, partitionPath).toString) + new Path(getWorkPath, partitionPath).toString + } else { + configuration.set(DATASOURCE_OUTPUTPATH, outputPath) + getWorkPath + } + val bucketId = getBucketIdFromKey(key) + val newWriter = super.newOutputWriter(path, bucketId) + newWriter.initConverter(dataSchema) + newWriter + } + + def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = { + executorSideSetup(taskContext) + + // We should first sort by partition columns, then bucket id, and finally sorting columns. + val sortingExpressions: Seq[Expression] = partitionColumns ++ bucketIdExpression ++ sortColumns + val getSortingKey = UnsafeProjection.create(sortingExpressions, inputSchema) + + val sortingKeySchema = StructType(sortingExpressions.map { + case a: Attribute => StructField(a.name, a.dataType, a.nullable) + // The sorting expressions are all `Attribute` except bucket id. + case _ => StructField("bucketId", IntegerType, nullable = false) + }) + + // Returns the data columns to be written given an input row + val getOutputRow = UnsafeProjection.create(dataColumns, inputSchema) + + // Returns the partition path given a partition key. + val getPartitionString = + UnsafeProjection.create(Concat(partitionStringExpression) :: Nil, partitionColumns) + + // Sorts the data before write, so that we only need one writer at the same time. + // TODO: inject a local sort operator in planning. + val sorter = new UnsafeKVExternalSorter( + sortingKeySchema, + StructType.fromAttributes(dataColumns), + SparkEnv.get.blockManager, + SparkEnv.get.serializerManager, + TaskContext.get().taskMemoryManager().pageSizeBytes, + SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", + UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD)) + + while (iterator.hasNext) { + val currentRow = iterator.next() + sorter.insertKV(getSortingKey(currentRow), getOutputRow(currentRow)) + } + logInfo(s"Sorting complete. Writing out partition files one at a time.") + + val getBucketingKey: InternalRow => InternalRow = if (sortColumns.isEmpty) { + identity + } else { + UnsafeProjection.create(sortingExpressions.dropRight(sortColumns.length).zipWithIndex.map { + case (expr, ordinal) => BoundReference(ordinal, expr.dataType, expr.nullable) + }) + } + + val sortedIterator = sorter.sortedIterator() + + // If anything below fails, we should abort the task. + var currentWriter: OutputWriter = null + try { + Utils.tryWithSafeFinallyAndFailureCallbacks { + var currentKey: UnsafeRow = null + while (sortedIterator.next()) { + val nextKey = getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow] + if (currentKey != nextKey) { + if (currentWriter != null) { + currentWriter.close() + currentWriter = null + } + currentKey = nextKey.copy() + logDebug(s"Writing partition: $currentKey") + + currentWriter = newOutputWriter(currentKey, getPartitionString) + } + currentWriter.writeInternal(sortedIterator.getValue) + } + if (currentWriter != null) { + currentWriter.close() + currentWriter = null + } + + commitTask() + }(catchBlock = { + if (currentWriter != null) { + currentWriter.close() + } + abortTask() + }) + } catch { + case t: Throwable => + throw new SparkException("Task failed while writing rows", t) + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 031a0fe57893..e02751e22943 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -232,7 +232,7 @@ class ParquetFileFormat .orElse(filesByType.data.headOption) .toSeq } - ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession) + ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession, parameters) } case class FileTypes( @@ -561,9 +561,10 @@ object ParquetFileFormat extends Logging { * slow. And basically locality is not available when using S3 (you can't run computation on * S3 nodes). */ - def mergeSchemasInParallel( + private def mergeSchemasInParallel( filesToTouch: Seq[FileStatus], - sparkSession: SparkSession): Option[StructType] = { + sparkSession: SparkSession, + parameters: Map[String, String]): Option[StructType] = { val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp val writeLegacyParquetFormat = sparkSession.sessionState.conf.writeLegacyParquetFormat @@ -584,8 +585,9 @@ object ParquetFileFormat extends Logging { // Set the number of partitions to prevent following schema reads from generating many tasks // in case of a small number of parquet files. - val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1), - sparkSession.sparkContext.defaultParallelism) + val defaultParallelism = parameters.get("spark.default.parallelism").map(_.toInt) + .getOrElse(sparkSession.sparkContext.defaultParallelism) + val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1), defaultParallelism) // Issues a Spark job to read Parquet schema in parallel. val partiallyMergedSchemas = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala index a81a95d51085..e51b7e5fc4c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala @@ -40,7 +40,8 @@ private[parquet] class ParquetOptions( * Acceptable values are defined in [[shortParquetCompressionCodecNames]]. */ val compressionCodecClassName: String = { - val codecName = parameters.getOrElse("compression", sqlConf.parquetCompressionCodec).toLowerCase + val codecName = parameters.getOrElse("compression", parameters.getOrElse( + "spark.sql.parquet.compression.codec", sqlConf.parquetCompressionCodec)).toLowerCase if (!shortParquetCompressionCodecNames.contains(codecName)) { val availableCodecs = shortParquetCompressionCodecNames.keys.map(_.toLowerCase) throw new IllegalArgumentException(s"Codec [$codecName] " + @@ -55,8 +56,9 @@ private[parquet] class ParquetOptions( */ val mergeSchema: Boolean = parameters .get(MERGE_SCHEMA) - .map(_.toBoolean) - .getOrElse(sqlConf.isParquetSchemaMergingEnabled) + .map(_.toBoolean) + .getOrElse(parameters.get("spark.sql.parquet.mergeSchema").map(_.toBoolean) + .getOrElse(sqlConf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED))) } From ba3f2a092db5740e894b0cdc79278ef43d36fce7 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Sat, 20 Aug 2016 01:28:48 +0900 Subject: [PATCH 4/4] Fix a bug --- .../sql/execution/DataSourceScanExec.scala | 3 +- .../datasources/WriterContainer.scala | 447 ------------------ 2 files changed, 2 insertions(+), 448 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index e485b52b43f7..49450a062e09 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -470,7 +470,8 @@ case class FileSourceScanExec( val defaultMaxSplitBytes = fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes - val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism + val defaultParallelism = fsRelation.options.get("spark.default.parallelism").map(_.toInt) + .getOrElse(fsRelation.sparkSession.sparkContext.defaultParallelism) val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum val bytesPerCore = totalBytes / defaultParallelism diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala deleted file mode 100644 index e6c0f961266e..000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ /dev/null @@ -1,447 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources - -import java.util.{Date, UUID} - -import org.apache.hadoop.fs.Path -import org.apache.hadoop.mapreduce._ -import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter} -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl - -import org.apache.spark._ -import org.apache.spark.internal.Logging -import org.apache.spark.mapred.SparkHadoopMapRedUtil -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.catalog.BucketSpec -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.UnsafeKVExternalSorter -import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} -import org.apache.spark.util.{SerializableConfiguration, Utils} -import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter - - -/** A container for all the details required when writing to a table. */ -private[datasources] case class WriteRelation( - sparkSession: SparkSession, - dataSchema: StructType, - path: String, - prepareJobForWrite: Job => OutputWriterFactory, - bucketSpec: Option[BucketSpec]) - -private[datasources] abstract class BaseWriterContainer( - @transient val relation: WriteRelation, - @transient private val job: Job, - isAppend: Boolean) - extends Logging with Serializable { - - protected val dataSchema = relation.dataSchema - - protected val serializableConf = - new SerializableConfiguration(job.getConfiguration) - - // This UUID is used to avoid output file name collision between different appending write jobs. - // These jobs may belong to different SparkContext instances. Concrete data source implementations - // may use this UUID to generate unique file names (e.g., `part-r--.parquet`). - // The reason why this ID is used to identify a job rather than a single task output file is - // that, speculative tasks must generate the same output file name as the original task. - private val uniqueWriteJobId = UUID.randomUUID() - - // This is only used on driver side. - @transient private val jobContext: JobContext = job - - // The following fields are initialized and used on both driver and executor side. - @transient protected var outputCommitter: OutputCommitter = _ - @transient private var jobId: JobID = _ - @transient private var taskId: TaskID = _ - @transient private var taskAttemptId: TaskAttemptID = _ - @transient protected var taskAttemptContext: TaskAttemptContext = _ - - protected val outputPath: String = relation.path - - protected var outputWriterFactory: OutputWriterFactory = _ - - private var outputFormatClass: Class[_ <: OutputFormat[_, _]] = _ - - def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit - - def driverSideSetup(): Unit = { - setupIDs(0, 0, 0) - setupConf() - - // This UUID is sent to executor side together with the serialized `Configuration` object within - // the `Job` instance. `OutputWriters` on the executor side should use this UUID to generate - // unique task output files. - job.getConfiguration.set(DATASOURCE_WRITEJOBUUID, uniqueWriteJobId.toString) - - // Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor - // clones the Configuration object passed in. If we initialize the TaskAttemptContext first, - // configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext. - // - // Also, the `prepareJobForWrite` call must happen before initializing output format and output - // committer, since their initialization involve the job configuration, which can be potentially - // decorated in `prepareJobForWrite`. - outputWriterFactory = relation.prepareJobForWrite(job) - taskAttemptContext = new TaskAttemptContextImpl(serializableConf.value, taskAttemptId) - - outputFormatClass = job.getOutputFormatClass - outputCommitter = newOutputCommitter(taskAttemptContext) - outputCommitter.setupJob(jobContext) - } - - def executorSideSetup(taskContext: TaskContext): Unit = { - setupIDs(taskContext.stageId(), taskContext.partitionId(), taskContext.attemptNumber()) - setupConf() - taskAttemptContext = new TaskAttemptContextImpl(serializableConf.value, taskAttemptId) - outputCommitter = newOutputCommitter(taskAttemptContext) - outputCommitter.setupTask(taskAttemptContext) - } - - protected def getWorkPath: String = { - outputCommitter match { - // FileOutputCommitter writes to a temporary location returned by `getWorkPath`. - case f: MapReduceFileOutputCommitter => f.getWorkPath.toString - case _ => outputPath - } - } - - protected def newOutputWriter(path: String, bucketId: Option[Int] = None): OutputWriter = { - try { - outputWriterFactory.newInstance(path, bucketId, dataSchema, taskAttemptContext) - } catch { - case e: org.apache.hadoop.fs.FileAlreadyExistsException => - if (outputCommitter.getClass.getName.contains("Direct")) { - // SPARK-11382: DirectParquetOutputCommitter is not idempotent, meaning on retry - // attempts, the task will fail because the output file is created from a prior attempt. - // This often means the most visible error to the user is misleading. Augment the error - // to tell the user to look for the actual error. - throw new SparkException("The output file already exists but this could be due to a " + - "failure from an earlier attempt. Look through the earlier logs or stage page for " + - "the first error.\n File exists error: " + e, e) - } else { - throw e - } - } - } - - private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = { - val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context) - - if (isAppend) { - // If we are appending data to an existing dir, we will only use the output committer - // associated with the file output format since it is not safe to use a custom - // committer for appending. For example, in S3, direct parquet output committer may - // leave partial data in the destination dir when the appending job fails. - // - // See SPARK-8578 for more details - logInfo( - s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName} " + - "for appending.") - defaultOutputCommitter - } else { - val configuration = context.getConfiguration - val committerClass = configuration.getClass( - SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter]) - - Option(committerClass).map { clazz => - logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}") - - // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat - // has an associated output committer. To override this output committer, - // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS. - // If a data source needs to override the output committer, it needs to set the - // output committer in prepareForWrite method. - if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) { - // The specified output committer is a FileOutputCommitter. - // So, we will use the FileOutputCommitter-specified constructor. - val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext]) - ctor.newInstance(new Path(outputPath), context) - } else { - // The specified output committer is just an OutputCommitter. - // So, we will use the no-argument constructor. - val ctor = clazz.getDeclaredConstructor() - ctor.newInstance() - } - }.getOrElse { - // If output committer class is not set, we will use the one associated with the - // file output format. - logInfo( - s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName}") - defaultOutputCommitter - } - } - } - - private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = { - this.jobId = SparkHadoopWriter.createJobID(new Date, jobId) - this.taskId = new TaskID(this.jobId, TaskType.MAP, splitId) - this.taskAttemptId = new TaskAttemptID(taskId, attemptId) - } - - private def setupConf(): Unit = { - serializableConf.value.set("mapred.job.id", jobId.toString) - serializableConf.value.set("mapred.tip.id", taskAttemptId.getTaskID.toString) - serializableConf.value.set("mapred.task.id", taskAttemptId.toString) - serializableConf.value.setBoolean("mapred.task.is.map", true) - serializableConf.value.setInt("mapred.task.partition", 0) - } - - def commitTask(): Unit = { - SparkHadoopMapRedUtil.commitTask(outputCommitter, taskAttemptContext, jobId.getId, taskId.getId) - } - - def abortTask(): Unit = { - if (outputCommitter != null) { - outputCommitter.abortTask(taskAttemptContext) - } - logError(s"Task attempt $taskAttemptId aborted.") - } - - def commitJob(): Unit = { - outputCommitter.commitJob(jobContext) - logInfo(s"Job $jobId committed.") - } - - def abortJob(): Unit = { - if (outputCommitter != null) { - outputCommitter.abortJob(jobContext, JobStatus.State.FAILED) - } - logError(s"Job $jobId aborted.") - } -} - -/** - * A writer that writes all of the rows in a partition to a single file. - */ -private[datasources] class DefaultWriterContainer( - relation: WriteRelation, - job: Job, - isAppend: Boolean) - extends BaseWriterContainer(relation, job, isAppend) { - - def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = { - executorSideSetup(taskContext) - val configuration = taskAttemptContext.getConfiguration - configuration.set(DATASOURCE_OUTPUTPATH, outputPath) - var writer = newOutputWriter(getWorkPath) - writer.initConverter(dataSchema) - - // If anything below fails, we should abort the task. - try { - Utils.tryWithSafeFinallyAndFailureCallbacks { - while (iterator.hasNext) { - val internalRow = iterator.next() - writer.writeInternal(internalRow) - } - commitTask() - }(catchBlock = abortTask()) - } catch { - case t: Throwable => - throw new SparkException("Task failed while writing rows", t) - } - - def commitTask(): Unit = { - try { - if (writer != null) { - writer.close() - writer = null - } - super.commitTask() - } catch { - case cause: Throwable => - // This exception will be handled in `InsertIntoHadoopFsRelation.insert$writeRows`, and - // will cause `abortTask()` to be invoked. - throw new RuntimeException("Failed to commit task", cause) - } - } - - def abortTask(): Unit = { - try { - if (writer != null) { - writer.close() - } - } finally { - super.abortTask() - } - } - } -} - -/** - * A writer that dynamically opens files based on the given partition columns. Internally this is - * done by maintaining a HashMap of open files until `maxFiles` is reached. If this occurs, the - * writer externally sorts the remaining rows and then writes out them out one file at a time. - */ -private[datasources] class DynamicPartitionWriterContainer( - relation: WriteRelation, - job: Job, - partitionColumns: Seq[Attribute], - dataColumns: Seq[Attribute], - inputSchema: Seq[Attribute], - defaultPartitionName: String, - maxOpenFiles: Int, - isAppend: Boolean) - extends BaseWriterContainer(relation, job, isAppend) { - - private val bucketSpec = relation.bucketSpec - - private val bucketColumns: Seq[Attribute] = bucketSpec.toSeq.flatMap { - spec => spec.bucketColumnNames.map(c => inputSchema.find(_.name == c).get) - } - - private val sortColumns: Seq[Attribute] = bucketSpec.toSeq.flatMap { - spec => spec.sortColumnNames.map(c => inputSchema.find(_.name == c).get) - } - - private def bucketIdExpression: Option[Expression] = bucketSpec.map { spec => - // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can - // guarantee the data distribution is same between shuffle and bucketed data source, which - // enables us to only shuffle one side when join a bucketed table and a normal one. - HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression - } - - // Expressions that given a partition key build a string like: col1=val/col2=val/... - private def partitionStringExpression: Seq[Expression] = { - partitionColumns.zipWithIndex.flatMap { case (c, i) => - val escaped = - ScalaUDF( - PartitioningUtils.escapePathName _, - StringType, - Seq(Cast(c, StringType)), - Seq(StringType)) - val str = If(IsNull(c), Literal(defaultPartitionName), escaped) - val partitionName = Literal(c.name + "=") :: str :: Nil - if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName - } - } - - private def getBucketIdFromKey(key: InternalRow): Option[Int] = bucketSpec.map { _ => - key.getInt(partitionColumns.length) - } - - /** - * Open and returns a new OutputWriter given a partition key and optional bucket id. - * If bucket id is specified, we will append it to the end of the file name, but before the - * file extension, e.g. part-r-00009-ea518ad4-455a-4431-b471-d24e03814677-00002.gz.parquet - */ - private def newOutputWriter( - key: InternalRow, - getPartitionString: UnsafeProjection): OutputWriter = { - val configuration = taskAttemptContext.getConfiguration - val path = if (partitionColumns.nonEmpty) { - val partitionPath = getPartitionString(key).getString(0) - configuration.set(DATASOURCE_OUTPUTPATH, new Path(outputPath, partitionPath).toString) - new Path(getWorkPath, partitionPath).toString - } else { - configuration.set(DATASOURCE_OUTPUTPATH, outputPath) - getWorkPath - } - val bucketId = getBucketIdFromKey(key) - val newWriter = super.newOutputWriter(path, bucketId) - newWriter.initConverter(dataSchema) - newWriter - } - - def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = { - executorSideSetup(taskContext) - - // We should first sort by partition columns, then bucket id, and finally sorting columns. - val sortingExpressions: Seq[Expression] = partitionColumns ++ bucketIdExpression ++ sortColumns - val getSortingKey = UnsafeProjection.create(sortingExpressions, inputSchema) - - val sortingKeySchema = StructType(sortingExpressions.map { - case a: Attribute => StructField(a.name, a.dataType, a.nullable) - // The sorting expressions are all `Attribute` except bucket id. - case _ => StructField("bucketId", IntegerType, nullable = false) - }) - - // Returns the data columns to be written given an input row - val getOutputRow = UnsafeProjection.create(dataColumns, inputSchema) - - // Returns the partition path given a partition key. - val getPartitionString = - UnsafeProjection.create(Concat(partitionStringExpression) :: Nil, partitionColumns) - - // Sorts the data before write, so that we only need one writer at the same time. - // TODO: inject a local sort operator in planning. - val sorter = new UnsafeKVExternalSorter( - sortingKeySchema, - StructType.fromAttributes(dataColumns), - SparkEnv.get.blockManager, - SparkEnv.get.serializerManager, - TaskContext.get().taskMemoryManager().pageSizeBytes, - SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", - UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD)) - - while (iterator.hasNext) { - val currentRow = iterator.next() - sorter.insertKV(getSortingKey(currentRow), getOutputRow(currentRow)) - } - logInfo(s"Sorting complete. Writing out partition files one at a time.") - - val getBucketingKey: InternalRow => InternalRow = if (sortColumns.isEmpty) { - identity - } else { - UnsafeProjection.create(sortingExpressions.dropRight(sortColumns.length).zipWithIndex.map { - case (expr, ordinal) => BoundReference(ordinal, expr.dataType, expr.nullable) - }) - } - - val sortedIterator = sorter.sortedIterator() - - // If anything below fails, we should abort the task. - var currentWriter: OutputWriter = null - try { - Utils.tryWithSafeFinallyAndFailureCallbacks { - var currentKey: UnsafeRow = null - while (sortedIterator.next()) { - val nextKey = getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow] - if (currentKey != nextKey) { - if (currentWriter != null) { - currentWriter.close() - currentWriter = null - } - currentKey = nextKey.copy() - logDebug(s"Writing partition: $currentKey") - - currentWriter = newOutputWriter(currentKey, getPartitionString) - } - currentWriter.writeInternal(sortedIterator.getValue) - } - if (currentWriter != null) { - currentWriter.close() - currentWriter = null - } - - commitTask() - }(catchBlock = { - if (currentWriter != null) { - currentWriter.close() - } - abortTask() - }) - } catch { - case t: Throwable => - throw new SparkException("Task failed while writing rows", t) - } - } -}