Skip to content

Commit 1cfa38a

Browse files
committed
added apache headers, added datainputstream directly as an output option for more complicated readers (HDF5 perhaps), and renamed several of the functions and files to be more consistent. Also added parallel functions to the java api
1 parent 84035f1 commit 1cfa38a

File tree

5 files changed

+271
-108
lines changed

5 files changed

+271
-108
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import org.apache.mesos.MesosNativeLibrary
4040
import org.apache.spark.annotation.{DeveloperApi, Experimental}
4141
import org.apache.spark.broadcast.Broadcast
4242
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
43-
import org.apache.spark.input.{WholeTextFileInputFormat,ByteInputFormat}
43+
import org.apache.spark.input.{StreamInputFormat, StreamFileInputFormat, WholeTextFileInputFormat, ByteInputFormat}
4444
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
4545
import org.apache.spark.rdd._
4646
import org.apache.spark.scheduler._
@@ -517,11 +517,11 @@ class SparkContext(config: SparkConf) extends Logging {
517517
*
518518
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
519519
*/
520-
def byteFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, Array[Byte])] = {
520+
def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, Array[Byte])] = {
521521
val job = new NewHadoopJob(hadoopConfiguration)
522522
NewFileInputFormat.addInputPath(job, new Path(path))
523523
val updateConf = job.getConfiguration
524-
new BinaryFileRDD(
524+
new RawFileRDD(
525525
this,
526526
classOf[ByteInputFormat],
527527
classOf[String],
@@ -530,6 +530,28 @@ class SparkContext(config: SparkConf) extends Logging {
530530
minPartitions).setName(path)
531531
}
532532

533+
/**
534+
* Get an RDD for a Hadoop-readable dataset as DataInputStreams for each file (useful for binary data)
535+
* Care must be taken to close the files afterwards
536+
*
537+
* @param minPartitions A suggestion value of the minimal splitting number for input data.
538+
*
539+
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
540+
*/
541+
@DeveloperApi
542+
def dataStreamFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, DataInputStream)] = {
543+
val job = new NewHadoopJob(hadoopConfiguration)
544+
NewFileInputFormat.addInputPath(job, new Path(path))
545+
val updateConf = job.getConfiguration
546+
new RawFileRDD(
547+
this,
548+
classOf[StreamInputFormat],
549+
classOf[String],
550+
classOf[DataInputStream],
551+
updateConf,
552+
minPartitions).setName(path)
553+
}
554+
533555
/**
534556
* Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other
535557
* necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable),

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ package org.apache.spark.api.java
2020
import java.util
2121
import java.util.{Map => JMap}
2222

23+
import java.io.DataInputStream
24+
2325
import scala.collection.JavaConversions
2426
import scala.collection.JavaConversions._
2527
import scala.language.implicitConversions
@@ -180,6 +182,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
180182
def textFile(path: String, minPartitions: Int): JavaRDD[String] =
181183
sc.textFile(path, minPartitions)
182184

185+
186+
183187
/**
184188
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
185189
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
@@ -210,6 +214,66 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
210214
def wholeTextFiles(path: String, minPartitions: Int): JavaPairRDD[String, String] =
211215
new JavaPairRDD(sc.wholeTextFiles(path, minPartitions))
212216

217+
/**
218+
* Read a directory of binary files from HDFS, a local file system (available on all nodes), or any
219+
* Hadoop-supported file system URI as a byte array. Each file is read as a single record and returned in a
220+
* key-value pair, where the key is the path of each file, the value is the content of each file.
221+
*
222+
* <p> For example, if you have the following files:
223+
* {{{
224+
* hdfs://a-hdfs-path/part-00000
225+
* hdfs://a-hdfs-path/part-00001
226+
* ...
227+
* hdfs://a-hdfs-path/part-nnnnn
228+
* }}}
229+
*
230+
* Do `JavaPairRDD<String, byte[]> rdd = sparkContext.dataStreamFiles("hdfs://a-hdfs-path")`,
231+
*
232+
* <p> then `rdd` contains
233+
* {{{
234+
* (a-hdfs-path/part-00000, its content)
235+
* (a-hdfs-path/part-00001, its content)
236+
* ...
237+
* (a-hdfs-path/part-nnnnn, its content)
238+
* }}}
239+
*
240+
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
241+
*
242+
* @param minPartitions A suggestion value of the minimal splitting number for input data.
243+
*/
244+
def dataStreamFiles(path: String, minPartitions: Int = defaultMinPartitions): JavaPairRDD[String,DataInputStream] =
245+
new JavaPairRDD(sc.dataStreamFiles(path,minPartitions))
246+
247+
/**
248+
* Read a directory of files as DataInputStreams from HDFS, a local file system (available on all nodes), or any
249+
* Hadoop-supported file system URI as a byte array. Each file is read as a single record and returned in a
250+
* key-value pair, where the key is the path of each file, the value is the content of each file.
251+
*
252+
* <p> For example, if you have the following files:
253+
* {{{
254+
* hdfs://a-hdfs-path/part-00000
255+
* hdfs://a-hdfs-path/part-00001
256+
* ...
257+
* hdfs://a-hdfs-path/part-nnnnn
258+
* }}}
259+
*
260+
* Do `JavaPairRDD<String, DataInputStream> rdd = sparkContext.binaryFiles("hdfs://a-hdfs-path")`,
261+
*
262+
* <p> then `rdd` contains
263+
* {{{
264+
* (a-hdfs-path/part-00000, its content)
265+
* (a-hdfs-path/part-00001, its content)
266+
* ...
267+
* (a-hdfs-path/part-nnnnn, its content)
268+
* }}}
269+
*
270+
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
271+
*
272+
* @param minPartitions A suggestion value of the minimal splitting number for input data.
273+
*/
274+
def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): JavaPairRDD[String,Array[Byte]] =
275+
new JavaPairRDD(sc.binaryFiles(path,minPartitions))
276+
213277
/**
214278
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
215279
* Hadoop-supported file system URI. Each file is read as a single record and returned in a

core/src/main/scala/org/apache/spark/input/BinaryFileInput.scala

Lines changed: 0 additions & 102 deletions
This file was deleted.

0 commit comments

Comments
 (0)