diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index e091634b4eb90..8133e7fc67d0a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -1228,244 +1228,6 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } } - ////////////////////////////////////////////////// - // CSD async saveAs(New)HadoopFile extensions // - // code is duplicated from synchronized version // - // to maintain binary-compatibility // - ////////////////////////////////////////////////// - - - /** - * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class - * supporting the key and value types K and V in this RDD. - */ - def saveAsHadoopFileAsync[F <: OutputFormat[K, V]]( - path: String)( - implicit fm: ClassTag[F], executor: ExecutionContext): FutureAction[Unit] = { - saveAsHadoopFileAsync(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]]) - } - - /** - * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class - * supporting the key and value types K and V in this RDD. Compress the result with the - * supplied codec. - */ - def saveAsHadoopFileAsync[F <: OutputFormat[K, V]]( - path: String, codec: Class[_ <: CompressionCodec])( - implicit fm: ClassTag[F], executor: ExecutionContext): FutureAction[Unit] = { - val runtimeClass = fm.runtimeClass - saveAsHadoopFileAsync(path, keyClass, valueClass, runtimeClass.asInstanceOf[Class[F]], codec) - } - - /** - * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class - * supporting the key and value types K and V in this RDD. Compress with the supplied codec. - */ - def saveAsHadoopFileAsync( - path: String, - keyClass: Class[_], - valueClass: Class[_], - outputFormatClass: Class[_ <: OutputFormat[_, _]], - codec: Class[_ <: CompressionCodec])( - implicit executor: ExecutionContext): FutureAction[Unit] = { - saveAsHadoopFileAsync(path, keyClass, valueClass, outputFormatClass, - new JobConf(self.context.hadoopConfiguration), Some(codec)) - } - - /** - * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class - * supporting the key and value types K and V in this RDD. - */ - def saveAsHadoopFileAsync( - path: String, - keyClass: Class[_], - valueClass: Class[_], - outputFormatClass: Class[_ <: OutputFormat[_, _]], - conf: JobConf = new JobConf(self.context.hadoopConfiguration), - codec: Option[Class[_ <: CompressionCodec]] = None)( - implicit executor: ExecutionContext): FutureAction[Unit] = { - // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). - val hadoopConf = conf - hadoopConf.setOutputKeyClass(keyClass) - hadoopConf.setOutputValueClass(valueClass) - // Doesn't work in Scala 2.9 due to what may be a generics bug - // TODO: Should we uncomment this for Scala 2.10? - // conf.setOutputFormat(outputFormatClass) - hadoopConf.set("mapred.output.format.class", outputFormatClass.getName) - for (c <- codec) { - hadoopConf.setCompressMapOutput(true) - hadoopConf.set("mapred.output.compress", "true") - hadoopConf.setMapOutputCompressorClass(c) - hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName) - hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString) - } - hadoopConf.setOutputCommitter(classOf[FileOutputCommitter]) - FileOutputFormat.setOutputPath(hadoopConf, - SparkHadoopWriter.createPathFromString(path, hadoopConf)) - saveAsHadoopDatasetAsync(hadoopConf) - } - - /** - * Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for - * that storage system. The JobConf should set an OutputFormat and any output paths required - * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop - * MapReduce job. - */ - def saveAsHadoopDatasetAsync(conf: JobConf)( - implicit executor: ExecutionContext): FutureAction[Unit] = { - // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). - val hadoopConf = conf - val outputFormatInstance = hadoopConf.getOutputFormat - val keyClass = hadoopConf.getOutputKeyClass - val valueClass = hadoopConf.getOutputValueClass - if (outputFormatInstance == null) { - throw new SparkException("Output format class not set") - } - if (keyClass == null) { - throw new SparkException("Output key class not set") - } - if (valueClass == null) { - throw new SparkException("Output value class not set") - } - SparkHadoopUtil.get.addCredentials(hadoopConf) - - logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " + - valueClass.getSimpleName + ")") - - if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) { - // FileOutputFormat ignores the filesystem parameter - val ignoredFs = FileSystem.get(hadoopConf) - hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf) - } - - val writer = new SparkHadoopWriter(hadoopConf) - writer.preSetup() - - val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => { - // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it - // around by taking a mod. We expect that no task will be attempted 2 billion times. - val attemptNumber = (context.attemptId % Int.MaxValue).toInt - - writer.setup(context.stageId, context.partitionId, attemptNumber) - writer.open() - try { - var count = 0 - while (iter.hasNext) { - val record = iter.next() - count += 1 - writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) - } - } finally { - writer.close() - } - writer.commit() - } - - self.context.submitJobWithTaskContext( - self, - writeToFile, - self.partitions.indices, - (_, _: Unit) => {}, - { writer.commitJob() } - ) - } - - /** - * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat` - * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD. - */ - def saveAsNewAPIHadoopFileAsync[F <: NewOutputFormat[K, V]]( - path: String)( - implicit fm: ClassTag[F]): FutureAction[Unit] = { - saveAsNewAPIHadoopFileAsync(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]]) - } - - /** - * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat` - * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD. - */ - def saveAsNewAPIHadoopFileAsync( - path: String, - keyClass: Class[_], - valueClass: Class[_], - outputFormatClass: Class[_ <: NewOutputFormat[_, _]], - conf: Configuration = self.context.hadoopConfiguration): FutureAction[Unit] = - { - // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). - val hadoopConf = conf - val job = new NewAPIHadoopJob(hadoopConf) - job.setOutputKeyClass(keyClass) - job.setOutputValueClass(valueClass) - job.setOutputFormatClass(outputFormatClass) - SparkHadoopUtil.get.getConfigurationFromJobContext(job).set("mapred.output.dir", path) - saveAsNewAPIHadoopDatasetAsync(SparkHadoopUtil.get.getConfigurationFromJobContext(job)) - } - - /** - * Output the RDD to any Hadoop-supported storage system with new Hadoop API, using a Hadoop - * Configuration object for that storage system. The Conf should set an OutputFormat and any - * output paths required (e.g. a table name to write to) in the same way as it would be - * configured for a Hadoop MapReduce job. - */ - def saveAsNewAPIHadoopDatasetAsync(conf: Configuration): FutureAction[Unit] = { - // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). - val hadoopConf = conf - val job = new NewAPIHadoopJob(hadoopConf) - val jobConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job) - val formatter = new SimpleDateFormat("yyyyMMddHHmm") - val jobtrackerID = formatter.format(new Date()) - val stageId = self.id - val wrappedConf = new SerializableWritable(jobConf) - val outfmt = job.getOutputFormatClass - val jobFormat = outfmt.newInstance - - if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) { - // FileOutputFormat ignores the filesystem parameter - jobFormat.checkOutputSpecs(job) - } - - val writeShard = (context: TaskContext, iter: Iterator[(K, V)]) => { - // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it - // around by taking a mod. We expect that no task will be attempted 2 billion times. - val attemptNumber = (context.attemptId % Int.MaxValue).toInt - /* "reduce task" */ - val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId, - attemptNumber) - val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId) - val format = outfmt.newInstance - format match { - case c: Configurable => c.setConf(wrappedConf.value) - case _ => () - } - val committer = format.getOutputCommitter(hadoopContext) - committer.setupTask(hadoopContext) - val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K, V]] - try { - while (iter.hasNext) { - val pair = iter.next() - writer.write(pair._1, pair._2) - } - } finally { - writer.close(hadoopContext) - } - committer.commitTask(hadoopContext) - 1 - } : Int - - val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0) - val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId) - val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext) - jobCommitter.setupJob(jobTaskContext) - - self.context.submitJobWithTaskContext( - self, - writeShard, - self.partitions.indices, - (_, _: Int) => {}, - { jobCommitter.commitJob(jobTaskContext) } - ) - } - /** * Return an RDD with the keys of each tuple. */