Skip to content

Commit eacfaa6

Browse files
committed
Added FixedLengthBinaryInputFormat and RecordReader from freeman-lab and added them to both the JavaSparkContext and the SparkContext as fixedLengthBinaryFile
1 parent 1622935 commit eacfaa6

File tree

4 files changed

+290
-27
lines changed

4 files changed

+290
-27
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,40 +17,40 @@
1717

1818
package org.apache.spark
1919

20-
import scala.language.implicitConversions
21-
2220
import java.io._
2321
import java.net.URI
22+
import java.util.UUID.randomUUID
2423
import java.util.concurrent.atomic.AtomicInteger
2524
import java.util.{Properties, UUID}
26-
import java.util.UUID.randomUUID
27-
import scala.collection.{Map, Set}
28-
import scala.collection.JavaConversions._
29-
import scala.collection.generic.Growable
30-
import scala.collection.mutable.HashMap
31-
import scala.reflect.{ClassTag, classTag}
25+
3226
import org.apache.hadoop.conf.Configuration
3327
import org.apache.hadoop.fs.Path
3428
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
3529
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat, TextInputFormat}
36-
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
3730
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
31+
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
3832
import org.apache.mesos.MesosNativeLibrary
39-
4033
import org.apache.spark.annotation.{DeveloperApi, Experimental}
4134
import org.apache.spark.broadcast.Broadcast
4235
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
43-
import org.apache.spark.input.{StreamInputFormat, StreamFileInputFormat, WholeTextFileInputFormat, ByteInputFormat}
36+
import org.apache.spark.input.{ByteInputFormat, FixedLengthBinaryInputFormat, StreamInputFormat, WholeTextFileInputFormat}
4437
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
4538
import org.apache.spark.rdd._
4639
import org.apache.spark.scheduler._
47-
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkDeploySchedulerBackend, SimrSchedulerBackend}
4840
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
41+
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SimrSchedulerBackend, SparkDeploySchedulerBackend}
4942
import org.apache.spark.scheduler.local.LocalBackend
5043
import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
5144
import org.apache.spark.ui.SparkUI
5245
import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
5346

47+
import scala.collection.JavaConversions._
48+
import scala.collection.generic.Growable
49+
import scala.collection.mutable.HashMap
50+
import scala.collection.{Map, Set}
51+
import scala.language.implicitConversions
52+
import scala.reflect.{ClassTag, classTag}
53+
5454
/**
5555
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
5656
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
@@ -557,6 +557,20 @@ class SparkContext(config: SparkConf) extends Logging {
557557
minPartitions).setName(path)
558558
}
559559

560+
/**
561+
* Load data from a flat binary file, assuming each record is a set of numbers
562+
* with the specified numerical format (see ByteBuffer), and the number of
563+
* bytes per record is constant (see FixedLengthBinaryInputFormat)
564+
*
565+
* @param path Directory to the input data files
566+
* @return An RDD of data with values, RDD[(Array[Byte])]
567+
*/
568+
def fixedLengthBinaryFiles(path: String): RDD[Array[Byte]] = {
569+
val lines = newAPIHadoopFile[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](path)
570+
val data = lines.map{ case (k, v) => v.getBytes}
571+
data
572+
}
573+
560574
/**
561575
* Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other
562576
* necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable),

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,25 @@
1717

1818
package org.apache.spark.api.java
1919

20+
import java.io.DataInputStream
2021
import java.util
2122
import java.util.{Map => JMap}
2223

23-
import java.io.DataInputStream
24-
25-
import scala.collection.JavaConversions
26-
import scala.collection.JavaConversions._
27-
import scala.language.implicitConversions
28-
import scala.reflect.ClassTag
29-
3024
import com.google.common.base.Optional
3125
import org.apache.hadoop.conf.Configuration
3226
import org.apache.hadoop.mapred.{InputFormat, JobConf}
3327
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
34-
35-
import org.apache.spark._
3628
import org.apache.spark.SparkContext.{DoubleAccumulatorParam, IntAccumulatorParam}
29+
import org.apache.spark._
3730
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
3831
import org.apache.spark.broadcast.Broadcast
3932
import org.apache.spark.rdd.{EmptyRDD, RDD}
4033

34+
import scala.collection.JavaConversions
35+
import scala.collection.JavaConversions._
36+
import scala.language.implicitConversions
37+
import scala.reflect.ClassTag
38+
4139
/**
4240
* A Java-friendly version of [[org.apache.spark.SparkContext]] that returns
4341
* [[org.apache.spark.api.java.JavaRDD]]s and works with Java collections instead of Scala ones.
@@ -214,6 +212,16 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
214212
def wholeTextFiles(path: String, minPartitions: Int): JavaPairRDD[String, String] =
215213
new JavaPairRDD(sc.wholeTextFiles(path, minPartitions))
216214

215+
/**
216+
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
217+
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
218+
* key-value pair, where the key is the path of each file, the value is the content of each file.
219+
*
220+
* @see `wholeTextFiles(path: String, minPartitions: Int)`.
221+
*/
222+
def wholeTextFiles(path: String): JavaPairRDD[String, String] =
223+
new JavaPairRDD(sc.wholeTextFiles(path))
224+
217225
/**
218226
* Read a directory of binary files from HDFS, a local file system (available on all nodes),
219227
* or any Hadoop-supported file system URI as a byte array. Each file is read as a single
@@ -279,14 +287,16 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
279287
JavaPairRDD[String,Array[Byte]] = new JavaPairRDD(sc.binaryFiles(path,minPartitions))
280288

281289
/**
282-
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
283-
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
284-
* key-value pair, where the key is the path of each file, the value is the content of each file.
290+
* Load data from a flat binary file, assuming each record is a set of numbers
291+
* with the specified numerical format (see ByteBuffer), and the number of
292+
* bytes per record is constant (see FixedLengthBinaryInputFormat)
285293
*
286-
* @see `wholeTextFiles(path: String, minPartitions: Int)`.
294+
* @param path Directory to the input data files
295+
* @return An RDD of data with values, JavaRDD[(Array[Byte])]
287296
*/
288-
def wholeTextFiles(path: String): JavaPairRDD[String, String] =
289-
new JavaPairRDD(sc.wholeTextFiles(path))
297+
def fixedLengthBinaryFiles(path: String): JavaRDD[Array[Byte]] = {
298+
new JavaRDD(sc.fixedLengthBinaryFiles(path))
299+
}
290300

291301
/** Get an RDD for a Hadoop SequenceFile with given key and value types.
292302
*
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.input
19+
20+
import org.apache.hadoop.fs.Path
21+
import org.apache.hadoop.io.{BytesWritable, LongWritable}
22+
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
23+
import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext}
24+
25+
/**
26+
* Custom Input Format for reading and splitting flat binary files that contain records, each of which
27+
* are a fixed size in bytes. The fixed record size is specified through a parameter recordLength
28+
* in the Hadoop configuration.
29+
*/
30+
31+
object FixedLengthBinaryInputFormat {
32+
33+
/**
34+
* This function retrieves the recordLength by checking the configuration parameter
35+
*
36+
*/
37+
def getRecordLength(context: JobContext): Int = {
38+
39+
// retrieve record length from configuration
40+
context.getConfiguration.get("recordLength").toInt
41+
}
42+
43+
}
44+
45+
class FixedLengthBinaryInputFormat extends FileInputFormat[LongWritable, BytesWritable] {
46+
47+
48+
/**
49+
* Override of isSplitable to ensure initial computation of the record length
50+
*/
51+
override def isSplitable(context: JobContext, filename: Path): Boolean = {
52+
53+
if (recordLength == -1) {
54+
recordLength = FixedLengthBinaryInputFormat.getRecordLength(context)
55+
}
56+
if (recordLength <= 0) {
57+
println("record length is less than 0, file cannot be split")
58+
false
59+
} else {
60+
true
61+
}
62+
63+
}
64+
65+
/**
66+
* This input format overrides computeSplitSize() to make sure that each split
67+
* only contains full records. Each InputSplit passed to FixedLengthBinaryRecordReader
68+
* will start at the first byte of a record, and the last byte will the last byte of a record.
69+
*/
70+
override def computeSplitSize(blockSize: Long, minSize: Long, maxSize: Long): Long = {
71+
72+
val defaultSize = super.computeSplitSize(blockSize, minSize, maxSize)
73+
74+
// If the default size is less than the length of a record, make it equal to it
75+
// Otherwise, make sure the split size is as close to possible as the default size,
76+
// but still contains a complete set of records, with the first record
77+
// starting at the first byte in the split and the last record ending with the last byte
78+
79+
defaultSize match {
80+
case x if x < recordLength => recordLength.toLong
81+
case _ => (Math.floor(defaultSize / recordLength) * recordLength).toLong
82+
}
83+
}
84+
85+
/**
86+
* Create a FixedLengthBinaryRecordReader
87+
*/
88+
override def createRecordReader(split: InputSplit, context: TaskAttemptContext):
89+
RecordReader[LongWritable, BytesWritable] = {
90+
new FixedLengthBinaryRecordReader
91+
}
92+
93+
var recordLength = -1
94+
95+
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.input
19+
20+
import java.io.IOException
21+
22+
import org.apache.hadoop.fs.FSDataInputStream
23+
import org.apache.hadoop.io.compress.CompressionCodecFactory
24+
import org.apache.hadoop.io.{BytesWritable, LongWritable}
25+
import org.apache.hadoop.mapreduce.lib.input.FileSplit
26+
import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext}
27+
28+
/**
29+
*
30+
* FixedLengthBinaryRecordReader is returned by FixedLengthBinaryInputFormat.
31+
* It uses the record length set in FixedLengthBinaryInputFormat to
32+
* read one record at a time from the given InputSplit.
33+
*
34+
* Each call to nextKeyValue() updates the LongWritable KEY and BytesWritable VALUE.
35+
*
36+
* KEY = record index (Long)
37+
* VALUE = the record itself (BytesWritable)
38+
*
39+
*/
40+
class FixedLengthBinaryRecordReader extends RecordReader[LongWritable, BytesWritable] {
41+
42+
override def close() {
43+
if (fileInputStream != null) {
44+
fileInputStream.close()
45+
}
46+
}
47+
48+
override def getCurrentKey: LongWritable = {
49+
recordKey
50+
}
51+
52+
override def getCurrentValue: BytesWritable = {
53+
recordValue
54+
}
55+
56+
override def getProgress: Float = {
57+
splitStart match {
58+
case x if x == splitEnd => 0.0.toFloat
59+
case _ => Math.min(((currentPosition - splitStart) / (splitEnd - splitStart)).toFloat, 1.0).toFloat
60+
}
61+
}
62+
63+
override def initialize(inputSplit: InputSplit, context: TaskAttemptContext) {
64+
65+
// the file input
66+
val fileSplit = inputSplit.asInstanceOf[FileSplit]
67+
68+
// the byte position this fileSplit starts at
69+
splitStart = fileSplit.getStart
70+
71+
// splitEnd byte marker that the fileSplit ends at
72+
splitEnd = splitStart + fileSplit.getLength
73+
74+
// the actual file we will be reading from
75+
val file = fileSplit.getPath
76+
77+
// job configuration
78+
val job = context.getConfiguration
79+
80+
// check compression
81+
val codec = new CompressionCodecFactory(job).getCodec(file)
82+
if (codec != null) {
83+
throw new IOException("FixedLengthRecordReader does not support reading compressed files")
84+
}
85+
86+
// get the record length
87+
recordLength = FixedLengthBinaryInputFormat.getRecordLength(context)
88+
89+
// get the filesystem
90+
val fs = file.getFileSystem(job)
91+
92+
// open the File
93+
fileInputStream = fs.open(file)
94+
95+
// seek to the splitStart position
96+
fileInputStream.seek(splitStart)
97+
98+
// set our current position
99+
currentPosition = splitStart
100+
101+
}
102+
103+
override def nextKeyValue(): Boolean = {
104+
105+
if (recordKey == null) {
106+
recordKey = new LongWritable()
107+
}
108+
109+
// the key is a linear index of the record, given by the
110+
// position the record starts divided by the record length
111+
recordKey.set(currentPosition / recordLength)
112+
113+
// the recordValue to place the bytes into
114+
if (recordValue == null) {
115+
recordValue = new BytesWritable(new Array[Byte](recordLength))
116+
}
117+
118+
// read a record if the currentPosition is less than the split end
119+
if (currentPosition < splitEnd) {
120+
121+
// setup a buffer to store the record
122+
val buffer = recordValue.getBytes
123+
124+
fileInputStream.read(buffer, 0, recordLength)
125+
126+
// update our current position
127+
currentPosition = currentPosition + recordLength
128+
129+
// return true
130+
return true
131+
}
132+
133+
false
134+
}
135+
136+
var splitStart: Long = 0L
137+
var splitEnd: Long = 0L
138+
var currentPosition: Long = 0L
139+
var recordLength: Int = 0
140+
var fileInputStream: FSDataInputStream = null
141+
var recordKey: LongWritable = null
142+
var recordValue: BytesWritable = null
143+
144+
}

0 commit comments

Comments
 (0)