@@ -657,6 +657,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
657657 *
658658 * Load data from a flat binary file, assuming the length of each record is constant.
659659 *
660+ * '''Note:''' We ensure that the byte array for each record in the resulting RDD
661+ * has the provided record length.
662+ *
660663 * @param path Directory to the input data files
661664 * @param recordLength The length at which to split the records
662665 * @return An RDD of data with values, represented as byte arrays
@@ -671,7 +674,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
671674 classOf [LongWritable ],
672675 classOf [BytesWritable ],
673676 conf= conf)
674- val data = br.map{ case (k, v) => v.getBytes}
677+ val data = br.map { case (k, v) =>
678+ val bytes = v.getBytes
679+ assert(bytes.length == recordLength, " Byte array does not have correct length" )
680+ bytes
681+ }
675682 data
676683 }
677684
@@ -1224,7 +1231,19 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
12241231 null
12251232 }
12261233 } else {
1227- env.httpFileServer.addJar(new File (uri.getPath))
1234+ try {
1235+ env.httpFileServer.addJar(new File (uri.getPath))
1236+ } catch {
1237+ case exc : FileNotFoundException =>
1238+ logError(s " Jar not found at $path" )
1239+ null
1240+ case e : Exception =>
1241+ // For now just log an error but allow to go through so spark examples work.
1242+ // The spark examples don't really need the jar distributed since its also
1243+ // the app jar.
1244+ logError(" Error adding jar (" + e + " ), was the --addJars option used?" )
1245+ null
1246+ }
12281247 }
12291248 // A JAR file which exists locally on every worker node
12301249 case " local" =>
@@ -1749,8 +1768,14 @@ object SparkContext extends Logging {
17491768 @ deprecated(" Replaced by implicit functions in the RDD companion object. This is " +
17501769 " kept here only for backward compatibility." , " 1.3.0" )
17511770 def rddToSequenceFileRDDFunctions [K <% Writable : ClassTag , V <% Writable : ClassTag ](
1752- rdd : RDD [(K , V )]) =
1771+ rdd : RDD [(K , V )]) = {
1772+ val kf = implicitly[K => Writable ]
1773+ val vf = implicitly[V => Writable ]
1774+ // Set the Writable class to null and `SequenceFileRDDFunctions` will use Reflection to get it
1775+ implicit val keyWritableFactory = new WritableFactory [K ](_ => null , kf)
1776+ implicit val valueWritableFactory = new WritableFactory [V ](_ => null , vf)
17531777 RDD .rddToSequenceFileRDDFunctions(rdd)
1778+ }
17541779
17551780 @ deprecated(" Replaced by implicit functions in the RDD companion object. This is " +
17561781 " kept here only for backward compatibility." , " 1.3.0" )
@@ -1767,20 +1792,35 @@ object SparkContext extends Logging {
17671792 def numericRDDToDoubleRDDFunctions [T ](rdd : RDD [T ])(implicit num : Numeric [T ]) =
17681793 RDD .numericRDDToDoubleRDDFunctions(rdd)
17691794
1770- // Implicit conversions to common Writable types, for saveAsSequenceFile
1795+ // The following deprecated functions have already been moved to `object WritableFactory` to
1796+ // make the compiler find them automatically. They are still kept here for backward compatibility.
17711797
1798+ @ deprecated(" Replaced by implicit functions in the WritableFactory companion object. This is " +
1799+ " kept here only for backward compatibility." , " 1.3.0" )
17721800 implicit def intToIntWritable (i : Int ): IntWritable = new IntWritable (i)
17731801
1802+ @ deprecated(" Replaced by implicit functions in the WritableFactory companion object. This is " +
1803+ " kept here only for backward compatibility." , " 1.3.0" )
17741804 implicit def longToLongWritable (l : Long ): LongWritable = new LongWritable (l)
17751805
1806+ @ deprecated(" Replaced by implicit functions in the WritableFactory companion object. This is " +
1807+ " kept here only for backward compatibility." , " 1.3.0" )
17761808 implicit def floatToFloatWritable (f : Float ): FloatWritable = new FloatWritable (f)
17771809
1810+ @ deprecated(" Replaced by implicit functions in the WritableFactory companion object. This is " +
1811+ " kept here only for backward compatibility." , " 1.3.0" )
17781812 implicit def doubleToDoubleWritable (d : Double ): DoubleWritable = new DoubleWritable (d)
17791813
1814+ @ deprecated(" Replaced by implicit functions in the WritableFactory companion object. This is " +
1815+ " kept here only for backward compatibility." , " 1.3.0" )
17801816 implicit def boolToBoolWritable (b : Boolean ): BooleanWritable = new BooleanWritable (b)
17811817
1818+ @ deprecated(" Replaced by implicit functions in the WritableFactory companion object. This is " +
1819+ " kept here only for backward compatibility." , " 1.3.0" )
17821820 implicit def bytesToBytesWritable (aob : Array [Byte ]): BytesWritable = new BytesWritable (aob)
17831821
1822+ @ deprecated(" Replaced by implicit functions in the WritableFactory companion object. This is " +
1823+ " kept here only for backward compatibility." , " 1.3.0" )
17841824 implicit def stringToText (s : String ): Text = new Text (s)
17851825
17861826 private implicit def arrayToArrayWritable [T <% Writable : ClassTag ](arr : Traversable [T ])
@@ -2070,7 +2110,7 @@ object WritableConverter {
20702110 new WritableConverter [T ](_ => wClass, x => convert(x.asInstanceOf [W ]))
20712111 }
20722112
2073- // The following implicit functions were in SparkContext before 1.2 and users had to
2113+ // The following implicit functions were in SparkContext before 1.3 and users had to
20742114 // `import SparkContext._` to enable them. Now we move them here to make the compiler find
20752115 // them automatically. However, we still keep the old functions in SparkContext for backward
20762116 // compatibility and forward to the following functions directly.
@@ -2103,3 +2143,46 @@ object WritableConverter {
21032143 implicit def writableWritableConverter [T <: Writable ](): WritableConverter [T ] =
21042144 new WritableConverter [T ](_.runtimeClass.asInstanceOf [Class [T ]], _.asInstanceOf [T ])
21052145}
2146+
2147+ /**
2148+ * A class encapsulating how to convert some type T to Writable. It stores both the Writable class
2149+ * corresponding to T (e.g. IntWritable for Int) and a function for doing the conversion.
2150+ * The Writable class will be used in `SequenceFileRDDFunctions`.
2151+ */
2152+ private [spark] class WritableFactory [T ](
2153+ val writableClass : ClassTag [T ] => Class [_ <: Writable ],
2154+ val convert : T => Writable ) extends Serializable
2155+
2156+ object WritableFactory {
2157+
2158+ private [spark] def simpleWritableFactory [T : ClassTag , W <: Writable : ClassTag ](convert : T => W )
2159+ : WritableFactory [T ] = {
2160+ val writableClass = implicitly[ClassTag [W ]].runtimeClass.asInstanceOf [Class [W ]]
2161+ new WritableFactory [T ](_ => writableClass, convert)
2162+ }
2163+
2164+ implicit def intWritableFactory : WritableFactory [Int ] =
2165+ simpleWritableFactory(new IntWritable (_))
2166+
2167+ implicit def longWritableFactory : WritableFactory [Long ] =
2168+ simpleWritableFactory(new LongWritable (_))
2169+
2170+ implicit def floatWritableFactory : WritableFactory [Float ] =
2171+ simpleWritableFactory(new FloatWritable (_))
2172+
2173+ implicit def doubleWritableFactory : WritableFactory [Double ] =
2174+ simpleWritableFactory(new DoubleWritable (_))
2175+
2176+ implicit def booleanWritableFactory : WritableFactory [Boolean ] =
2177+ simpleWritableFactory(new BooleanWritable (_))
2178+
2179+ implicit def bytesWritableFactory : WritableFactory [Array [Byte ]] =
2180+ simpleWritableFactory(new BytesWritable (_))
2181+
2182+ implicit def stringWritableFactory : WritableFactory [String ] =
2183+ simpleWritableFactory(new Text (_))
2184+
2185+ implicit def writableWritableFactory [T <: Writable : ClassTag ]: WritableFactory [T ] =
2186+ simpleWritableFactory(w => w)
2187+
2188+ }
0 commit comments