Skip to content

Commit 5c376e0

Browse files
committed
Merge branch 'master' of https://github.com/apache/spark into allocateExecutors
2 parents 1faf4f4 + b77c19b commit 5c376e0

File tree

26 files changed

+976
-415
lines changed

26 files changed

+976
-415
lines changed

core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,28 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
108108
def sample(withReplacement: Boolean, fraction: Double, seed: Long): JavaRDD[T] =
109109
wrapRDD(rdd.sample(withReplacement, fraction, seed))
110110

111+
112+
/**
113+
* Randomly splits this RDD with the provided weights.
114+
*
115+
* @param weights weights for splits, will be normalized if they don't sum to 1
116+
*
117+
* @return split RDDs in an array
118+
*/
119+
def randomSplit(weights: Array[Double]): Array[JavaRDD[T]] =
120+
randomSplit(weights, Utils.random.nextLong)
121+
122+
/**
123+
* Randomly splits this RDD with the provided weights.
124+
*
125+
* @param weights weights for splits, will be normalized if they don't sum to 1
126+
* @param seed random seed
127+
*
128+
* @return split RDDs in an array
129+
*/
130+
def randomSplit(weights: Array[Double], seed: Long): Array[JavaRDD[T]] =
131+
rdd.randomSplit(weights, seed).map(wrapRDD)
132+
111133
/**
112134
* Return the union of this RDD and another one. Any identical elements will appear multiple
113135
* times (use `.distinct()` to eliminate them).

core/src/main/scala/org/apache/spark/util/FileLogger.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,14 @@ private[spark] class FileLogger(
6161
// Only defined if the file system scheme is not local
6262
private var hadoopDataStream: Option[FSDataOutputStream] = None
6363

64+
// The Hadoop APIs have changed over time, so we use reflection to figure out
65+
// the correct method to use to flush a hadoop data stream. See SPARK-1518
66+
// for details.
67+
private val hadoopFlushMethod = {
68+
val cls = classOf[FSDataOutputStream]
69+
scala.util.Try(cls.getMethod("hflush")).getOrElse(cls.getMethod("sync"))
70+
}
71+
6472
private var writer: Option[PrintWriter] = None
6573

6674
/**
@@ -149,13 +157,13 @@ private[spark] class FileLogger(
149157
/**
150158
* Flush the writer to disk manually.
151159
*
152-
* If the Hadoop FileSystem is used, the underlying FSDataOutputStream (r1.0.4) must be
153-
* sync()'ed manually as it does not support flush(), which is invoked by when higher
154-
* level streams are flushed.
160+
* When using a Hadoop filesystem, we need to invoke the hflush or sync
161+
* method. In HDFS, hflush guarantees that the data gets to all the
162+
* DataNodes.
155163
*/
156164
def flush() {
157165
writer.foreach(_.flush())
158-
hadoopDataStream.foreach(_.sync())
166+
hadoopDataStream.foreach(hadoopFlushMethod.invoke(_))
159167
}
160168

161169
/**

0 commit comments

Comments
 (0)