Skip to content

Commit 327b718

Browse files
committed
Merge branch 'master' into expand-nest-join
2 parents 73cbbb1 + d815654 commit 327b718

File tree

43 files changed

+652
-104
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+652
-104
lines changed

R/pkg/inst/tests/test_sparkSQL.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1147,7 +1147,7 @@ test_that("describe() and summarize() on a DataFrame", {
11471147
stats <- describe(df, "age")
11481148
expect_equal(collect(stats)[1, "summary"], "count")
11491149
expect_equal(collect(stats)[2, "age"], "24.5")
1150-
expect_equal(collect(stats)[3, "age"], "5.5")
1150+
expect_equal(collect(stats)[3, "age"], "7.7781745930520225")
11511151
stats <- describe(df)
11521152
expect_equal(collect(stats)[4, "name"], "Andy")
11531153
expect_equal(collect(stats)[5, "age"], "30")

bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.apache.spark.SparkContext._
2222
import org.apache.spark.rdd.RDD
2323
import org.apache.spark.storage.StorageLevel
2424

25+
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
2526
object Bagel extends Logging {
2627
val DEFAULT_STORAGE_LEVEL = StorageLevel.MEMORY_AND_DISK
2728

@@ -270,18 +271,21 @@ object Bagel extends Logging {
270271
}
271272
}
272273

274+
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
273275
trait Combiner[M, C] {
274276
def createCombiner(msg: M): C
275277
def mergeMsg(combiner: C, msg: M): C
276278
def mergeCombiners(a: C, b: C): C
277279
}
278280

281+
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
279282
trait Aggregator[V, A] {
280283
def createAggregator(vert: V): A
281284
def mergeAggregators(a: A, b: A): A
282285
}
283286

284287
/** Default combiner that simply appends messages together (i.e. performs no aggregation) */
288+
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
285289
class DefaultCombiner[M: Manifest] extends Combiner[M, Array[M]] with Serializable {
286290
def createCombiner(msg: M): Array[M] =
287291
Array(msg)
@@ -297,6 +301,7 @@ class DefaultCombiner[M: Manifest] extends Combiner[M, Array[M]] with Serializab
297301
* Subclasses may store state along with each vertex and must
298302
* inherit from java.io.Serializable or scala.Serializable.
299303
*/
304+
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
300305
trait Vertex {
301306
def active: Boolean
302307
}
@@ -307,6 +312,7 @@ trait Vertex {
307312
* Subclasses may contain a payload to deliver to the target vertex
308313
* and must inherit from java.io.Serializable or scala.Serializable.
309314
*/
315+
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
310316
trait Message[K] {
311317
def targetId: K
312318
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -858,7 +858,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
858858
// Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
859859
// comma separated files as input. (see SPARK-7155)
860860
NewFileInputFormat.setInputPaths(job, path)
861-
val updateConf = job.getConfiguration
861+
val updateConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
862862
new WholeTextFileRDD(
863863
this,
864864
classOf[WholeTextFileInputFormat],
@@ -910,7 +910,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
910910
// Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile in taking
911911
// comma separated files as input. (see SPARK-7155)
912912
NewFileInputFormat.setInputPaths(job, path)
913-
val updateConf = job.getConfiguration
913+
val updateConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
914914
new BinaryFileRDD(
915915
this,
916916
classOf[StreamInputFormat],
@@ -1092,7 +1092,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
10921092
// Use setInputPaths so that newAPIHadoopFile aligns with hadoopFile/textFile in taking
10931093
// comma separated files as input. (see SPARK-7155)
10941094
NewFileInputFormat.setInputPaths(job, path)
1095-
val updatedConf = job.getConfiguration
1095+
val updatedConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
10961096
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf).setName(path)
10971097
}
10981098

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,9 @@ class SparkHadoopUtil extends Logging {
192192
* while it's interface in Hadoop 2.+.
193193
*/
194194
def getConfigurationFromJobContext(context: JobContext): Configuration = {
195+
// scalastyle:off jobconfig
195196
val method = context.getClass.getMethod("getConfiguration")
197+
// scalastyle:on jobconfig
196198
method.invoke(context).asInstanceOf[Configuration]
197199
}
198200

@@ -204,7 +206,9 @@ class SparkHadoopUtil extends Logging {
204206
*/
205207
def getTaskAttemptIDFromTaskAttemptContext(
206208
context: MapReduceTaskAttemptContext): MapReduceTaskAttemptID = {
209+
// scalastyle:off jobconfig
207210
val method = context.getClass.getMethod("getTaskAttemptID")
211+
// scalastyle:on jobconfig
208212
method.invoke(context).asInstanceOf[MapReduceTaskAttemptID]
209213
}
210214

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -996,8 +996,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
996996
job.setOutputKeyClass(keyClass)
997997
job.setOutputValueClass(valueClass)
998998
job.setOutputFormatClass(outputFormatClass)
999-
job.getConfiguration.set("mapred.output.dir", path)
1000-
saveAsNewAPIHadoopDataset(job.getConfiguration)
999+
val jobConfiguration = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
1000+
jobConfiguration.set("mapred.output.dir", path)
1001+
saveAsNewAPIHadoopDataset(jobConfiguration)
10011002
}
10021003

10031004
/**
@@ -1064,7 +1065,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10641065
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
10651066
val jobtrackerID = formatter.format(new Date())
10661067
val stageId = self.id
1067-
val wrappedConf = new SerializableConfiguration(job.getConfiguration)
1068+
val jobConfiguration = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
1069+
val wrappedConf = new SerializableConfiguration(jobConfiguration)
10681070
val outfmt = job.getOutputFormatClass
10691071
val jobFormat = outfmt.newInstance
10701072

core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
8686
if (isDriverSide) {
8787
initDriverSideJobFuncOpt.map(f => f(job))
8888
}
89-
job.getConfiguration
89+
SparkHadoopUtil.get.getConfigurationFromJobContext(job)
9090
}
9191

9292
private val jobTrackerId: String = {

core/src/test/scala/org/apache/spark/FileSuite.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark
1919

2020
import java.io.{File, FileWriter}
2121

22+
import org.apache.spark.deploy.SparkHadoopUtil
2223
import org.apache.spark.input.PortableDataStream
2324
import org.apache.spark.storage.StorageLevel
2425

@@ -506,8 +507,9 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
506507
job.setOutputKeyClass(classOf[String])
507508
job.setOutputValueClass(classOf[String])
508509
job.setOutputFormatClass(classOf[NewTextOutputFormat[String, String]])
509-
job.getConfiguration.set("mapred.output.dir", tempDir.getPath + "/outputDataset_new")
510-
randomRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)
510+
val jobConfig = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
511+
jobConfig.set("mapred.output.dir", tempDir.getPath + "/outputDataset_new")
512+
randomRDD.saveAsNewAPIHadoopDataset(jobConfig)
511513
assert(new File(tempDir.getPath + "/outputDataset_new/part-r-00000").exists() === true)
512514
}
513515

docs/bagel-programming-guide.md

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ displayTitle: Bagel Programming Guide
44
title: Bagel
55
---
66

7-
**Bagel will soon be superseded by [GraphX](graphx-programming-guide.html); we recommend that new users try GraphX instead.**
7+
**Bagel is deprecated, and superseded by [GraphX](graphx-programming-guide.html).**
88

99
Bagel is a Spark implementation of Google's [Pregel](http://portal.acm.org/citation.cfm?id=1807184) graph processing framework. Bagel currently supports basic graph computation, combiners, and aggregators.
1010

@@ -157,11 +157,3 @@ trait Message[K] {
157157
def targetId: K
158158
}
159159
{% endhighlight %}
160-
161-
# Where to Go from Here
162-
163-
Two example jobs, PageRank and shortest path, are included in `examples/src/main/scala/org/apache/spark/examples/bagel`. You can run them by passing the class name to the `bin/run-example` script included in Spark; e.g.:
164-
165-
./bin/run-example org.apache.spark.examples.bagel.WikipediaPageRank
166-
167-
Each example program prints usage help when run without any arguments.

docs/index.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,6 @@ options for deployment:
9090
* [Spark SQL and DataFrames](sql-programming-guide.html): support for structured data and relational queries
9191
* [MLlib](mllib-guide.html): built-in machine learning library
9292
* [GraphX](graphx-programming-guide.html): Spark's new API for graph processing
93-
* [Bagel (Pregel on Spark)](bagel-programming-guide.html): older, simple graph processing model
9493

9594
**API Docs:**
9695

examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717

1818
// scalastyle:off println
19+
// scalastyle:off jobcontext
1920
package org.apache.spark.examples
2021

2122
import java.nio.ByteBuffer
@@ -81,6 +82,7 @@ object CassandraCQLTest {
8182

8283
val job = new Job()
8384
job.setInputFormatClass(classOf[CqlPagingInputFormat])
85+
val configuration = job.getConfiguration
8486
ConfigHelper.setInputInitialAddress(job.getConfiguration(), cHost)
8587
ConfigHelper.setInputRpcPort(job.getConfiguration(), cPort)
8688
ConfigHelper.setInputColumnFamily(job.getConfiguration(), KeySpace, InputColumnFamily)
@@ -135,3 +137,4 @@ object CassandraCQLTest {
135137
}
136138
}
137139
// scalastyle:on println
140+
// scalastyle:on jobcontext

0 commit comments

Comments
 (0)